I'm trying out testing extracting a repeating signal from a larger and
there is a bug that looks debuggable!
I generated a repeating signal by indexing the random vector with the
floor of modular time, and added a custom wavelet parameter to the
fourier functions to model it, and passed a square wave as the wavelet
so as to model sampling by indexing with floors.
It currently throws an assertion failure, but the mismatching vectors
have the same values in them, just in different spots, which should
really help figure the bug out!
> /shared/src/scratch/fourier.py(208)test()
-> assert np.allclose(longvec, inserting_spectrum @ inserting_ift)
(Pdb) p longvec
array([0.73764179, 0.73764179, 0.73764179, ..., 0.85448175, 0.85448175,
0.85448175])
(Pdb) p inserting_spectrum @ inserting_ift
array([0.73764179, 0.85448175, 0.23715969, ..., 0.330343 , 0.82712795,
0.66150281])
I was thinking I would just go through the composition of sums via the
two code paths and compare them, like earlier. The first sample seems
to generally match, so it would be the second sample that's of
interest.
The previous fourier.py did not have this test and so does not raise
an assertion failure.
# AGPL-3 Karl Semich 2022
import numpy as np
def fftfreq(freq_count, sample_rate = None, min_freq = None, max_freq = None,
dc_offset = True, complex = True, sample_time = None,
repetition_rate = None, repetition_time = None, repetition_samples = None,
freq_sample_rate = None, freq_sample_time = None):
'''
Calculates and returns the frequency bins used to convert between the time
and frequency domains with a discrete Fourier transform.
With no optional arguments, this function should be equivalent to numpy.fft.fftfreq .
To specify frequencies in terms of repetitions / sample_count, set
sample_rate to sample_count. If frequencies were in Hz (cycles/sec), this
implies that sample_count should be considered to last 1 second, so the
frequency becomes equal to the cycle count.
Parameters:
- freq_count: the number of frequency bins to generate
- sample_rate: the time-domain sample rate (default: 1 sample/time_unit)
- min_freq: the minimum frequency the signal contains
- max_fraq: the maximum frequency the signal contains
- dc_offset: whether to include a DC offset component (0 Hz)
- complex: whether to generate negative frequencies
- sample_time: sample_rate as the duration of a single sample
- repetition_rate: min_freq as the repetition rate of a subsignal
- repetition_time: min_freq as the period time of a subsignal
- repetition_samples: min_freq as the period size of a subsignal in samples
- freq_sample_rate: convert to or from a different frequency-domain sample rate
- freq_sample_time: freq_sample_rate as the duration of a single sample
Returns a vector of sinusoid time scalings that can be used to perform or
analyse a discrete Fourier transform. Multiply this vector by the time-domain
sample rate to get the frequencies.
'''
assert not sample_time or not sample_rate
assert not freq_sample_time or not freq_sample_rate
assert (min_freq, repetition_rate, repetition_time, repetition_samples).count(None) >= 2
if sample_time is not None:
sample_rate = 1 / sample_time
if freq_sample_time is not None:
freq_sample_rate = 1 / freq_sample_time
sample_rate = sample_rate or freq_sample_rate or 1
freq_sample_rate = freq_sample_rate or sample_rate or 1
if not dc_offset:
freq_count += 1
if not complex:
freq_count = int((freq_count - 1) * 2)
if not min_freq:
if repetition_rate:
min_freq = repetition_rate
elif repetition_time:
min_freq = 1 / repetition_time
elif repetition_samples:
min_freq = freq_sample_rate / repetition_samples
else:
min_freq = freq_sample_rate / freq_count
min_freq /= sample_rate
if freq_count % 2 == 0:
if max_freq is None:
max_freq = freq_sample_rate / 2
max_freq /= sample_rate
if complex:
neg_freqs = np.linspace(-max_freq, -min_freq, num=freq_count // 2, endpoint=True)
pos_freqs = -neg_freqs[:0:-1]
else:
pos_freqs = np.linspace(min_freq, max_freq, num=freq_count // 2, endpoint=True)
neg_freqs = pos_freqs[:0]
else:
if max_freq is None:
max_freq = freq_sample_rate * (freq_count - 1) / 2 / freq_count
max_freq /= sample_rate
pos_freqs = np.linspace(min_freq, max_freq, num=freq_count // 2, endpoint=True)
neg_freqs = -pos_freqs[::-1] if complex else pos_freqs[:0]
if complex:
neg_freqs = -pos_freqs[::-1]
else:
neg_freqs = pos_freqs[:0]
return np.concatenate([
np.array([0] if dc_offset else []),
pos_freqs,
neg_freqs
])
def create_freq2time(time_count = None, freqs = None, wavelet = lambda x: np.exp(2j * np.pi * x)):
'''
Creates a matrix that will perform an inverse discrete Fourier transform
when it post-multiplies a vector of complex frequency magnitudes.
Example
time_data = spectrum @ create_freq2time(len(spectrum))
Parameters:
- time_count: size of the output vector, defaults to the frequency bincount
- freqs: frequency bins to convert, defaults to traditional FFT frequencies
Returns:
- an inverse discrete Fourier matrix of shape (len(freqs), time_count)
'''
assert (time_count is not None) or (freqs is not None)
if freqs is None:
freqs = fftfreq(time_count)
else:
time_count = time_count or len(freqs)
offsets = np.arange(time_count)
#mat = np.exp(2j * np.pi * np.outer(freqs, offsets))
mat = wavelet(np.outer(freqs, offsets))
return mat / len(freqs) # scaled to match numpy convention
def create_time2freq(time_count = None, freqs = None, wavelet = lambda x: np.exp(2j * np.pi * x)):
'''
Creates a matrix that will perform a forward discrete Fourier transform
when it post-multiplies a vector of time series data.
If time_count is too small or large, the minimal least squares solution
over all the data passed will be produced.
This function is equivalent to calling .pinv() on the return value of
create_freq2time. If the return value is single-use, it is more efficient and
accurate to use numpy.linalg.lstsq .
Example
spectrum = time_data @ create_time2freq(len(time_data))
Parameters:
- time_count: size of the input vector, defaults to the frequency bincount
- freqs: frequency bins to produce, defaults to traditional FFT frequencies
Returns:
- a discrete Fourier matrix of shape (time_count, len(freqs))
'''
forward_mat = create_freq2time(time_count, freqs, wavelet)
reverse_mat = np.linalg.pinv(forward_mat)
return reverse_mat
def peak_pair_idcs(freq_data, dc_offset=True, sum=True):
dc_offset = int(dc_offset)
freq_heights = abs(freq_data) # squares and sums the components
if sum:
while len(freq_heights).shape > 1:
freq_heights = freq_height.sum(axis=0)
paired_heights = freq_heights[...,dc_offset:-1] + freq_heights[...,dc_offset+1:]
peak_idx = paired_heights.argmax(axis=-1, keepdims=True) + dc_offset
return np.concatenate(peak_idx, peak_idx + 1, axis=-1)
def improve_peak(time_data, min_freq, max_freq):
freqs = fftfreq(time_data.shape[-1], min_freq = min_freq, max_freq = max_freq)
freq_data = time_data @ np.linalg.inv(create_freq2time(freqs))
left, right = peak_pair_idcs(freq_data)
return freq_data[left], freq_data[right]
def test():
randvec = np.random.random(16)
ift16 = create_freq2time(16)
ft16 = create_time2freq(16)
randvec2time = randvec @ ift16
randvec2freq = randvec @ ft16
randvec2ifft = np.fft.ifft(randvec)
randvec2fft = np.fft.fft(randvec)
assert np.allclose(randvec2ifft, randvec2time)
assert np.allclose(randvec2fft, randvec2freq)
assert np.allclose(randvec2ifft, np.linalg.solve(ft16.T, randvec))
assert np.allclose(randvec2fft, np.linalg.solve(ift16.T, randvec))
# [ 0, 1, 2, 3, 4, 3, 2, 1]
#irft9_16 = create_freq2time(16, fftfreq(9, complex = False))
#rft16_30 = create_time2freq(16, fftfreq(30, complex = False))
#randvec2
#assert np.allclose((randvec @ rft16) @ irft16, randvec)
# sample data at a differing rate
time_rate = np.random.random() * 2
freq_rate = 1.0
freqs = np.fft.fftfreq(len(randvec))
rescaling_freqs = fftfreq(len(randvec), freq_sample_rate = freq_rate, sample_rate = time_rate)
rescaling_ift = create_freq2time(freqs = rescaling_freqs)
rescaling_ft = create_time2freq(freqs = rescaling_freqs)
rescaled_time_data = np.array([
np.mean([
randvec[freqidx] * np.exp(2j * np.pi * freqs[freqidx] * sampleidx / time_rate)
for freqidx in range(len(randvec))
])
for sampleidx in range(len(randvec))
])
assert np.allclose(rescaled_time_data, randvec @ rescaling_ift)
assert np.allclose(rescaled_time_data, np.linalg.solve(rescaling_ft.T, randvec))
unscaled_freq_data = rescaled_time_data @ rescaling_ft
unscaled_time_data = unscaled_freq_data @ ift16
assert np.allclose(unscaled_freq_data, randvec)
assert np.allclose(unscaled_time_data, randvec2time)
assert np.allclose(np.linalg.solve(rescaling_ift.T, rescaled_time_data), randvec)
# extract a repeating wave
longvec = np.empty(len(randvec)*100)
short_count = np.random.random() * 4 + 1
short_duration = len(longvec) / short_count
for idx in range(len(longvec)):
longvec[idx] = randvec[int((idx / len(longvec) * short_duration) % len(randvec))]
wavelet = lambda x: (np.floor(x * 2) % 2) * 2 - 1
inserting_ift = create_freq2time(len(longvec), fftfreq(len(randvec), complex = False, dc_offset = True, repetition_samples = short_duration), wavelet = wavelet)
extracting_ft = create_time2freq(len(longvec), fftfreq(len(randvec), complex = False, dc_offset = True, repetition_samples = short_duration), wavelet = wavelet)
extracting_ift = create_freq2time(freqs = fftfreq(len(randvec), complex = False, dc_offset = True), wavelet = wavelet)
unextracting_ft = create_time2freq(freqs = fftfreq(len(randvec), complex = False, dc_offset = True), wavelet = wavelet)
inserting_spectrum = randvec @ unextracting_ft
assert np.allclose(longvec, inserting_spectrum @ inserting_ift)
assert np.allclose((randvec @ unextracting_ft) @ extracting_ift, randvec)
extracted_freqs = longvec @ extracting_ft
extracted_randvec = extracted_freqs @ extracting_ift
assert np.allclose(extracted_randvec, randvec)
# what kind of a frequency goes in so many samples at a rate
# if the sample rate is 44k, then a 44k signal will be 1 sample/period
# meanwhile a 22k signal will be 2 sample/period
# so the number per data are ...?
# this would be a 2 Hz signal in 1 second of 44k sampled data
# that's 22k sample/period, 2 Hz
# 2 Hz is the frequency for 1 second, for fitting 2 periods/length
# for 2 seconds, it would be 1 Hz
# [
# thinking of making a formula for frequency as a function of periods/data
# expecting in the path to that, to find the sample rate for freq=periods/data
# .
#
# frequency sample rate data length periods/data
# 1 44k 88k 2s 2
# 2 44k 44k 1s 2
# 4 44k 22k 0.5s 2
# 2 44k 88k 2s 4
# 4 44k 44k 1s 4
# 8 44k 22k 0.5s 4
# linearly proportional to periods/data
# inversely proportional to length
# frequency = periods / datalen = count / time
# it looks like the sample rate doesn't matter
# it's the data length that matters
# ohhhh so we would want the data length to be 1
# datalen = samples / sample_rate
# you'd set the sample rate to the number of samples
# ]
if __name__ == '__main__':
test()