This is an automated email from the git hooks/post-receive script. yoh pushed a commit to tag 0.4 in repository python-mne.
commit 67922cd5b1074d161ff8714a0c0645f65956f6a4 Author: Martin Luessi <[email protected]> Date: Thu Nov 10 15:18:09 2011 -0500 ENH: added filter_length parameter, improved test, cleaned up code --- mne/filter.py | 109 ++++++++++++++++++++++++++--------------------- mne/tests/test_filter.py | 33 ++++++++++---- 2 files changed, 86 insertions(+), 56 deletions(-) diff --git a/mne/filter.py b/mne/filter.py index d91c5c8..1b50fe0 100644 --- a/mne/filter.py +++ b/mne/filter.py @@ -3,11 +3,11 @@ from scipy import signal from scipy.fftpack import fft, ifft -def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): +def _overlap_add_filter(x, h, n_fft=None, zero_phase=True): """ Filter using overlap-add FFTs. Filters the signal x using a filter with the impulse response h. - If zero_phase==True, the amplitude response is scaled and the filter is + If zero_phase==True, the amplitude response is scaled and the filter is applied in forward and backward direction, resulting in a zero-phase filter. @@ -17,9 +17,9 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): Signal to filter h : 1d array Filter impule response (FIR filter coefficients) - N_fft : int + n_fft : int Length of the FFT. If None, the best size is determined automatically. - zero_phase : boolean + zero_phase : bool If True: the filter is applied in forward and backward direction, resulting in a zero-phase filter @@ -28,38 +28,38 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): xf : 1d array x filtered """ - - N_h = len(h) + n_h = len(h) # Extend the signal by mirroring the edges to reduce transient filter # response - N_edge = min(N_h, len(x)) + n_edge = min(n_h, len(x)) - x_ext = np.r_[2*x[0]-x[N_edge-1:0:-1], x, 2*x[-1]-x[-2:-N_edge-1:-1]] + x_ext = np.r_[2 * x[0] - x[n_edge - 1:0:-1],\ + x, 2 * x[-1] - x[-2:-n_edge - 1:-1]] - N_x = len(x_ext) + n_x = len(x_ext) # Determine FFT length to use - if N_fft == None: - if N_x > N_h: - N_tot = 2*N_x if zero_phase else N_x - - N = 2**np.arange(np.ceil(np.log2(N_h)), - np.floor(np.log2(N_tot))) - cost = np.ceil(N_tot / (N - N_h + 1)) * N * (np.log2(N) + 1) - N_fft = N[np.argmin(cost)] + if n_fft == None: + if n_x > n_h: + n_tot = 2 * n_x if zero_phase else n_x + + N = 2 ** np.arange(np.ceil(np.log2(n_h)), + np.floor(np.log2(n_tot))) + cost = np.ceil(n_tot / (N - n_h + 1)) * N * (np.log2(N) + 1) + n_fft = N[np.argmin(cost)] else: # Use only a single block - N_fft = 2**np.ceil(np.log2(N_x + N_h - 1)) + n_fft = 2 ** np.ceil(np.log2(n_x + n_h - 1)) - if N_fft <= 0: + if n_fft <= 0: raise ValueError('N_fft is too short, has to be at least len(h)') # Filter in frequency domain - h_fft = fft(np.r_[h, np.zeros(N_fft - N_h)]) + h_fft = fft(np.r_[h, np.zeros(n_fft - n_h)]) if zero_phase: - # We will apply the filter in forward and backward direction: Scale + # We will apply the filter in forward and backward direction: Scale # frequency response of the filter so that the shape of the amplitude # response stays the same when it is applied twice @@ -70,10 +70,10 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): x_filtered = np.zeros_like(x_ext) # Segment length for signal x - N_seg = N_fft - N_h + 1 + n_seg = n_fft - n_h + 1 # Number of segements (including fractional segments) - num_segments = int(np.ceil(N_x / float(N_seg))) + n_segments = int(np.ceil(n_x / float(n_seg))) filter_input = x_ext @@ -84,20 +84,20 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): filter_input = np.flipud(x_filtered) x_filtered = np.zeros_like(x_ext) - for seg_ind in range(num_segments): - seg = filter_input[seg_ind*N_seg:(seg_ind+1)*N_seg] - seg_fft = fft(np.r_[seg, np.zeros(N_fft - len(seg))]) + for seg_idx in range(n_segments): + seg = filter_input[seg_idx * n_seg:(seg_idx + 1) * n_seg] + seg_fft = fft(np.r_[seg, np.zeros(n_fft - len(seg))]) - if seg_ind*N_seg+N_fft < N_x: - x_filtered[seg_ind*N_seg:seg_ind*N_seg+N_fft] \ + if seg_idx * n_seg + n_fft < n_x: + x_filtered[seg_idx * n_seg:seg_idx * n_seg + n_fft]\ += np.real(ifft(h_fft * seg_fft)) else: # Last segment - x_filtered[seg_ind*N_seg:] \ - += np.real(ifft(h_fft * seg_fft))[:N_x-seg_ind*N_seg] + x_filtered[seg_idx * n_seg:] \ + += np.real(ifft(h_fft * seg_fft))[:n_x - seg_idx * n_seg] # Remove mirrored edges that we added - x_filtered = x_filtered[N_edge-1:-N_edge+1] + x_filtered = x_filtered[n_edge - 1:-n_edge + 1] if zero_phase: # flip signal back @@ -106,16 +106,12 @@ def _overlap_add_filter(x, h, N_fft=None, zero_phase=True): return x_filtered -def _filter(x, Fs, freq, gain): +def _filter(x, Fs, freq, gain, filter_length=None): """ Filter signal using gain control points in the frequency domain. The filter impulse response is constructed from a Hamming window (window used in "firwin2" function) to avoid ripples in the frequency reponse - (windowing is a smoothing in frequency domain). - - If the signal is shorter than 1 minute, it is filtered directly using - FFTs (zero-phase). If the signal is longer, zero-phase overlap-add - filtering is used. + (windowing is a smoothing in frequency domain). The filter is zero-phase. Parameters ---------- @@ -127,6 +123,10 @@ def _filter(x, Fs, freq, gain): Frequency sampling points in Hz gain : 1d array Filter gain at frequency sampling points + filter_length : int (default: None) + Length of the filter to use. If None or "len(x) < filter_length", the + filter length used is len(x). Otherwise, overlap-add filtering with a + filter of the specified length is used (faster for long signals). Returns ------- @@ -138,7 +138,7 @@ def _filter(x, Fs, freq, gain): # normalize frequencies freq = [f / (Fs / 2) for f in freq] - if len(x) < 60*Fs: + if filter_length == None or len(x) <= filter_length: # Use direct FFT filtering for short signals Norig = len(x) @@ -159,8 +159,8 @@ def _filter(x, Fs, freq, gain): xf = xf[:Norig] x = x[:Norig] else: - # Use overlap-add filter - N = int(10 * Fs) + # Use overlap-add filter with a fixed length + N = filter_length if (gain[-1] == 0.0 and N % 2 == 1) \ or (gain[-1] == 1.0 and N % 2 != 1): @@ -173,7 +173,7 @@ def _filter(x, Fs, freq, gain): return xf -def band_pass_filter(x, Fs, Fp1, Fp2): +def band_pass_filter(x, Fs, Fp1, Fp2, filter_length=None): """Bandpass filter for the signal x. Applies a zero-phase bandpass filter to the signal x. @@ -188,6 +188,10 @@ def band_pass_filter(x, Fs, Fp1, Fp2): low cut-off frequency Fp2 : float high cut-off frequency + filter_length : int (default: None) + Length of the filter to use. If None or "len(x) < filter_length", the + filter length used is len(x). Otherwise, overlap-add filtering with a + filter of the specified length is used (faster for long signals). Returns ------- @@ -210,7 +214,7 @@ def band_pass_filter(x, Fs, Fp1, Fp2): Fs1 = Fp1 - 0.5 in Hz Fs2 = Fp2 + 0.5 in Hz """ - Fs = float(Fs) + Fs = float(Fs) Fp1 = float(Fp1) Fp2 = float(Fp2) @@ -218,12 +222,13 @@ def band_pass_filter(x, Fs, Fp1, Fp2): Fs1 = Fp1 - 0.5 Fs2 = Fp2 + 0.5 - xf = _filter(x, Fs, [0, Fs1, Fp1, Fp2, Fs2, Fs/2], [0, 0, 1, 1, 0, 0]) + xf = _filter(x, Fs, [0, Fs1, Fp1, Fp2, Fs2, Fs / 2], [0, 0, 1, 1, 0, 0], + filter_length) return xf -def low_pass_filter(x, Fs, Fp): +def low_pass_filter(x, Fs, Fp, filter_length=None): """Lowpass filter for the signal x. Applies a zero-phase lowpass filter to the signal x. @@ -236,6 +241,10 @@ def low_pass_filter(x, Fs, Fp): sampling rate Fp : float cut-off frequency + filter_length : int (default: None) + Length of the filter to use. If None or "len(x) < filter_length", the + filter length used is len(x). Otherwise, overlap-add filtering with a + filter of the specified length is used (faster for long signals). Returns ------- @@ -260,12 +269,12 @@ def low_pass_filter(x, Fs, Fp): Fstop = Fp + 0.5 - xf = _filter(x, Fs, [0, Fp, Fstop, Fs/2], [1, 1, 0, 0]) + xf = _filter(x, Fs, [0, Fp, Fstop, Fs / 2], [1, 1, 0, 0], filter_length) return xf -def high_pass_filter(x, Fs, Fp): +def high_pass_filter(x, Fs, Fp, filter_length=None): """Highpass filter for the signal x. Applies a zero-phase highpass filter to the signal x. @@ -278,6 +287,10 @@ def high_pass_filter(x, Fs, Fp): sampling rate Fp : float cut-off frequency + filter_length : int (default: None) + Length of the filter to use. If None or "len(x) < filter_length", the + filter length used is len(x). Otherwise, overlap-add filtering with a + filter of the specified length is used (faster for long signals). Returns ------- @@ -303,6 +316,6 @@ def high_pass_filter(x, Fs, Fp): Fstop = Fp - 0.5 - xf = _filter(x, Fs, [0, Fstop, Fp, Fs/2], [0, 0, 1, 1]) + xf = _filter(x, Fs, [0, Fstop, Fp, Fs / 2], [0, 0, 1, 1], filter_length) - return xf + return xf \ No newline at end of file diff --git a/mne/tests/test_filter.py b/mne/tests/test_filter.py index c071bba..83d5bab 100644 --- a/mne/tests/test_filter.py +++ b/mne/tests/test_filter.py @@ -3,12 +3,29 @@ from numpy.testing import assert_array_almost_equal from ..filter import band_pass_filter, high_pass_filter, low_pass_filter + def test_filters(): - Fs = 250 - # Test short and long signals (direct FFT and overlap-add FFT filtering) - for sig_len_secs in [10, 90]: - a = np.random.randn(sig_len_secs * Fs) - bp = band_pass_filter(a, Fs, 4, 8) - lp = low_pass_filter(a, Fs, 8) - hp = high_pass_filter(lp, Fs, 4) - assert_array_almost_equal(hp, bp, 2) + Fs = 500 + sig_len_secs = 60 + + # Filtering of short signals (filter length = len(a)) + a = np.random.randn(sig_len_secs * Fs) + bp = band_pass_filter(a, Fs, 4, 8) + lp = low_pass_filter(a, Fs, 8) + hp = high_pass_filter(lp, Fs, 4) + assert_array_almost_equal(hp, bp, 2) + + # Overlap-add filtering with a fixed filter length + filter_length = 8192 + bp_oa = band_pass_filter(a, Fs, 4, 8, filter_length) + lp_oa = low_pass_filter(a, Fs, 8, filter_length) + hp_oa = high_pass_filter(lp_oa, Fs, 4, filter_length) + assert_array_almost_equal(hp_oa, bp_oa, 2) + + # The two methods should give the same result + # As filtering for short signals uses a circular convolution (FFT) and + # the overlap-add filter implements a linear convolution, the signal + # boundary will be slightly different and we ignore it + n_edge_ignore = 1000 + assert_array_almost_equal(hp[n_edge_ignore:-n_edge_ignore], + hp_oa[n_edge_ignore:-n_edge_ignore], 2) \ No newline at end of file -- Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/python-mne.git _______________________________________________ debian-med-commit mailing list [email protected] http://lists.alioth.debian.org/cgi-bin/mailman/listinfo/debian-med-commit
