This is an automated email from the git hooks/post-receive script. yoh pushed a commit to annotated tag v0.1 in repository python-mne.
commit 1454b8f1ca88d8a5365267ed290c0afe041a32ac Author: Alexandre Gramfort <[email protected]> Date: Sun Mar 27 22:29:19 2011 -0400 FIX : bug fix in reading of SSP vectors ENH : implementing minimum norm estimate WIP (todo: loose case + tests !) --- examples/plot_minimum_norm_estimate.py | 61 ++++++++ examples/plot_whiten_forward_solution.py | 41 ----- examples/plot_whitened_evoked_data.py | 45 +++++- mne/__init__.py | 2 +- mne/cov.py | 257 +++++++++++++++++-------------- mne/fiff/__init__.py | 2 +- mne/fiff/evoked.py | 3 + mne/fiff/pick.py | 2 +- mne/fiff/proj.py | 114 ++++++++------ mne/inverse.py | 235 ++++++++++++++++++++++++++++ mne/tests/test_cov.py | 7 +- 11 files changed, 546 insertions(+), 223 deletions(-) diff --git a/examples/plot_minimum_norm_estimate.py b/examples/plot_minimum_norm_estimate.py new file mode 100644 index 0000000..79aef87 --- /dev/null +++ b/examples/plot_minimum_norm_estimate.py @@ -0,0 +1,61 @@ +""" +================================================ +Compute MNE-dSPM inverse solution on evoked data +================================================ + +Compute dSPM inverse solution on MNE evoked dataset +and stores the solution in stc files for visualisation. + +""" + +# Author: Alexandre Gramfort <[email protected]> +# +# License: BSD (3-clause) + +print __doc__ + +import numpy as np +import pylab as pl +import mne +from mne.datasets import sample +from mne.fiff import Evoked + +data_path = sample.data_path('.') +fname_fwd = data_path + '/MEG/sample/sample_audvis-meg-eeg-oct-6-fwd.fif' +fname_cov = data_path + '/MEG/sample/sample_audvis-cov.fif' +fname_evoked = data_path + '/MEG/sample/sample_audvis-ave.fif' + +setno = 0 +snr = 3.0 +lambda2 = 1.0 / snr**2 +dSPM = True + +# Load data +evoked = Evoked(fname_evoked, setno=setno, baseline=(None, 0)) +forward = mne.read_forward_solution(fname_fwd) +noise_cov = mne.Covariance() +noise_cov.load(fname_cov) + +# Compute inverse solution +stc, K, W = mne.minimum_norm(evoked, forward, noise_cov, orientation='fixed', + method='dspm', snr=3, loose=0.2, pca=True) + +# Save result in stc files +lh_vertices = stc['inv']['src'][0]['vertno'] +rh_vertices = stc['inv']['src'][1]['vertno'] +lh_data = stc['sol'][:len(lh_vertices)] +rh_data = stc['sol'][-len(rh_vertices):] + +mne.write_stc('mne_dSPM_inverse-lh.stc', tmin=stc['tmin'], tstep=stc['tstep'], + vertices=lh_vertices, data=lh_data) +mne.write_stc('mne_dSPM_inverse-rh.stc', tmin=stc['tmin'], tstep=stc['tstep'], + vertices=rh_vertices, data=rh_data) + +############################################################################### +# View activation time-series +times = stc['tmin'] + stc['tstep'] * np.arange(stc['sol'].shape[1]) +pl.close('all') +pl.plot(1e3*times, stc['sol'][::100,:].T) +pl.xlabel('time (ms)') +pl.ylabel('dSPM value') +pl.show() diff --git a/examples/plot_whiten_forward_solution.py b/examples/plot_whiten_forward_solution.py deleted file mode 100644 index 828240e..0000000 --- a/examples/plot_whiten_forward_solution.py +++ /dev/null @@ -1,41 +0,0 @@ -""" -======================================================== -Whiten a forward operator with a noise covariance matrix -======================================================== -""" -# Author: Alexandre Gramfort <[email protected]> -# -# License: BSD (3-clause) - -print __doc__ - -import mne -from mne import fiff -from mne.datasets import sample - -data_path = sample.data_path('.') -fwd_fname = data_path + '/MEG/sample/sample_audvis-meg-eeg-oct-6-fwd.fif' -ave_fname = data_path + '/MEG/sample/sample_audvis-ave.fif' -cov_fname = data_path + '/MEG/sample/sample_audvis-cov.fif' - -# Reading -ave = fiff.read_evoked(ave_fname, setno=0, baseline=(None, 0)) -fwd = mne.read_forward_solution(fwd_fname) - -cov = mne.Covariance() -cov.load(cov_fname) - -ave_whiten, fwd_whiten, W = cov.whiten_evoked_and_forward(ave, fwd, eps=0.2) - -leadfield = fwd_whiten['sol']['data'] - -print "Leadfield size : %d x %d" % leadfield.shape - -############################################################################### -# Show result -import pylab as pl -pl.matshow(leadfield[:306,:500]) -pl.xlabel('sources') -pl.ylabel('sensors') -pl.title('Lead field matrix') -pl.show() diff --git a/examples/plot_whitened_evoked_data.py b/examples/plot_whitened_evoked_data.py index 6d105bd..ab7a47e 100644 --- a/examples/plot_whitened_evoked_data.py +++ b/examples/plot_whitened_evoked_data.py @@ -10,24 +10,53 @@ Whiten evoked data using a noise covariance matrix print __doc__ +import numpy as np import mne from mne import fiff -from mne.viz import plot_evoked from mne.datasets import sample data_path = sample.data_path('.') -fname = data_path + '/MEG/sample/sample_audvis-ave.fif' +raw_fname = data_path + '/MEG/sample/sample_audvis_filt-0-40_raw.fif' +# raw_fname = data_path + '/MEG/sample/sample_audvis_raw.fif' cov_fname = data_path + '/MEG/sample/sample_audvis-cov.fif' -# Reading -evoked = fiff.read_evoked(fname, setno=0, baseline=(None, 0)) +############################################################################### +# Set epochs parameters +event_id = 1 +tmin = -0.2 +tmax = 0.5 + +############################################################################### +# Create evoked data + +# Setup for reading the raw data +raw = fiff.Raw(raw_fname) +events = mne.find_events(raw) + +# pick EEG channels - bad channels (modify to your needs) +exclude = raw.info['bads'] + ['EEG 053'] # bads + 1 more +picks = fiff.pick_types(raw.info, meg=False, eeg=True, stim=False, + exclude=exclude) +epochs = mne.Epochs(raw, events, event_id, + tmin, tmax, picks=picks, baseline=(None, 0)) +evoked = epochs.average() # average epochs and get an Evoked dataset. + cov = mne.Covariance() cov.load(cov_fname) -evoked_whiten, W = cov.whiten_evoked(evoked) +# Whiten data +W, ch_names = cov.whitener(evoked.info, pca=False) # get whitening matrix +sel = mne.fiff.pick_channels(evoked.ch_names, include=ch_names) # channels id +whitened_data = np.dot(W, evoked.data[sel]) # apply whitening ############################################################################### # Show result -picks = fiff.pick_types(evoked_whiten.info, meg=True, eeg=True, - exclude=evoked_whiten.info['bads']) # Pick channels to view -plot_evoked(evoked_whiten, picks, unit=False) # plot without SI unit of data +times = 1e3 * epochs.times # in ms +import pylab as pl +pl.clf() +pl.plot(times, whitened_data.T) +pl.xlim([times[0], times[-1]]) +pl.xlabel('time (ms)') +pl.ylabel('data (NA)') +pl.title('Whitened EEG data') +pl.show() diff --git a/mne/__init__.py b/mne/__init__.py index a7ecf59..c78f117 100644 --- a/mne/__init__.py +++ b/mne/__init__.py @@ -6,7 +6,7 @@ from .forward import read_forward_solution from .stc import read_stc, write_stc from .bem_surfaces import read_bem_surfaces from .source_space import read_source_spaces -from .inverse import read_inverse_operator, compute_inverse +from .inverse import read_inverse_operator, compute_inverse, minimum_norm from .epochs import Epochs from .label import label_time_courses, read_label import fiff diff --git a/mne/cov.py b/mne/cov.py index 7c9ca10..41683ae 100644 --- a/mne/cov.py +++ b/mne/cov.py @@ -16,11 +16,36 @@ from .fiff.channels import _read_bad_channels from .fiff.write import start_block, end_block, write_int, write_name_list, \ write_double, write_float_matrix, start_file, end_file -from .fiff.proj import write_proj +from .fiff.proj import write_proj, make_projector from .fiff import fiff_open from .fiff.pick import pick_types, pick_channels_forward +def rank(A, tol=1e-8): + s = linalg.svd(A, compute_uv=0) + return np.sum(np.where(s > s[0]*tol, 1, 0)) + + +def _get_whitener(A, rnk, pca, ch_type): + # whitening operator + D, V = linalg.eigh(A, overwrite_a=True) + I = np.argsort(D)[::-1] + D = D[I] + V = V[:, I] + D = 1.0 / D + if not pca: # No PCA case. + print 'Not doing PCA for %s.' % ch_type + W = np.sqrt(D)[:, None] * V.T + else: # Rey's approach. MNE has been changed to implement this. + print 'Setting small %s eigenvalues to zero.' % ch_type + D[rnk:] = 0.0 + W = np.sqrt(D)[:, None] * V.T + # This line will reduce the actual number of variables in data + # and leadfield to the true rank. + W = W[:rnk] + return W + + class Covariance(object): """Noise covariance matrix""" @@ -46,11 +71,125 @@ class Covariance(object): self._cov = cov self.data = cov['data'] + self.ch_names = cov['names'] def save(self, fname): """save covariance matrix in a FIF file""" write_cov_file(fname, self._cov) + def whitener(self, info, mag_reg=0.1, grad_reg=0.1, eeg_reg=0.1, pca=True): + """Compute whitener based on a list of channels + + Parameters + ---------- + info : dict + Measurement info of data to apply the whitener. + Defines data channels and which are the bad channels + to be ignored. + mag_reg : float + Regularization of the magnetometers. + Recommended between 0.05 and 0.2 + grad_reg : float + Regularization of the gradiometers. + Recommended between 0.05 and 0.2 + eeg_reg : float + Regularization of the EGG channels. + Recommended between 0.05 and 0.2 + pca : bool + If True, whitening is restricted to the space of + the data. It makes sense when data have a low rank + due to SSP or maxfilter. + + Returns + ------- + W : array + Whitening matrix + ch_names : list of strings + List of channel names on which to apply the whitener. + It corresponds to the columns of W. + """ + bads = info['bads'] + C_idx = [k for k, name in enumerate(self.ch_names) + if name in info['ch_names'] and name not in bads] + ch_names = [self.ch_names[k] for k in C_idx] + C_noise = self.data[np.ix_(C_idx, C_idx)] # take covariance submatrix + + # # Create the projection operator + # proj, ncomp, _ = make_projector(info['projs'], ch_names) + # if ncomp > 0: + # print '\tCreated an SSP operator (subspace dimension = %d)' % ncomp + # C_noise = np.dot(proj, np.dot(C_noise, proj.T)) + + # Regularize Noise Covariance Matrix. + variances = np.diag(C_noise) + ind_meg = pick_types(info, meg=True, eeg=False, exclude=bads) + names_meg = [info['ch_names'][k] for k in ind_meg] + C_ind_meg = [ch_names.index(name) for name in names_meg] + + ind_grad = pick_types(info, meg='grad', eeg=False, exclude=bads) + names_grad = [info['ch_names'][k] for k in ind_grad] + C_ind_grad = [ch_names.index(name) for name in names_grad] + + ind_mag = pick_types(info, meg='mag', eeg=False, exclude=bads) + names_mag = [info['ch_names'][k] for k in ind_mag] + C_ind_mag = [ch_names.index(name) for name in names_mag] + + ind_eeg = pick_types(info, meg=False, eeg=True, exclude=bads) + names_eeg = [info['ch_names'][k] for k in ind_eeg] + C_ind_eeg = [ch_names.index(name) for name in names_eeg] + + has_meg = len(ind_meg) > 0 + has_eeg = len(ind_eeg) > 0 + + if self.kind == 'diagonal': + C_noise = np.diag(variances) + rnkC_noise = len(variances) + print 'Rank of noise covariance is %d' % rnkC_noise + else: + # estimate noise covariance matrix rank + # Loop on all the required data types (MEG MAG, MEG GRAD, EEG) + + if has_meg: # Separate rank of MEG + rank_meg = rank(C_noise[C_ind_meg][:, C_ind_meg]) + print 'Rank of MEG part of noise covariance is %d' % rank_meg + if has_eeg: # Separate rank of EEG + rank_eeg = rank(C_noise[C_ind_eeg][:, C_ind_eeg]) + print 'Rank of EEG part of noise covariance is %d' % rank_eeg + + for ind, reg in zip([C_ind_grad, C_ind_mag, C_ind_eeg], + [grad_reg, mag_reg, eeg_reg]): + if len(ind) > 0: + # add constant on diagonal + C_noise[ind, ind] += reg * np.mean(variances[ind]) + + if has_meg and has_eeg: # Sets cross terms to zero + C_noise[np.ix_(C_ind_meg, C_ind_eeg)] = 0.0 + C_noise[np.ix_(C_ind_eeg, C_ind_meg)] = 0.0 + + # whitening operator + if has_meg: + W_meg = _get_whitener(C_noise[C_ind_meg][:, C_ind_meg], rank_meg, + pca, 'MEG') + + if has_eeg: + W_eeg = _get_whitener(C_noise[C_ind_eeg][:, C_ind_eeg], rank_eeg, + pca, 'EEG') + + if has_meg and not has_eeg: # Only MEG case. + W = W_meg + elif has_eeg and not has_meg: # Only EEG case. + W = W_eeg + elif has_eeg and has_meg: # Bimodal MEG and EEG case. + # Whitening of MEG and EEG separately, which assumes zero + # covariance between MEG and EEG (i.e., a block diagonal noise + # covariance). This was recommended by Matti as EEG does not + # measure all the signals from the same environmental noise sources + # as MEG. + W = np.r_[np.c_[W_meg, np.zeros((W_meg.shape[0], W_eeg.shape[1]))], + np.c_[np.zeros((W_eeg.shape[0], W_meg.shape[1])), W_eeg]] + + return W, names_meg + names_eeg + def estimate_from_raw(self, raw, picks=None, quantum_sec=10): """Estimate noise covariance matrix from a raw FIF file """ @@ -82,122 +221,6 @@ class Covariance(object): self.data = cov / n_samples # XXX : check print '[done]' - def _regularize(self, data, variances, ch_names, eps): - """Operates inplace in data - """ - if len(ch_names) > 0: - ind = [self._cov['names'].index(name) for name in ch_names] - reg = eps * np.mean(variances[ind]) - for ii in ind: - data[ind,ind] += reg - - def whiten_evoked(self, evoked, eps=0.2): - """Whiten an evoked data file - - The whitening matrix is estimated and then multiplied to data. - It makes the additive white noise assumption of MNE - realistic. - - Parameters - ---------- - evoked : Evoked object - A evoked data set - eps : float - The regularization factor used. - - Returns - ------- - evoked_whiten : Evoked object - Evoked data set after whitening. - W : array of shape [n_channels, n_channels] - The whitening matrix - """ - - data = self.data.copy() # will be the regularized covariance - variances = np.diag(data) - - # Add (eps x identity matrix) to magnetometers only. - # This is based on the mean magnetometer variance like MNE C-code does it. - mag_ind = pick_types(evoked.info, meg='mag', eeg=False, stim=False) - mag_names = [evoked.info['chs'][k]['ch_name'] for k in mag_ind] - self._regularize(data, variances, mag_names, eps) - - # Add (eps x identity matrix) to gradiometers only. - grad_ind = pick_types(evoked.info, meg='grad', eeg=False, stim=False) - grad_names = [evoked.info['chs'][k]['ch_name'] for k in grad_ind] - self._regularize(data, variances, grad_names, eps) - - # Add (eps x identity matrix) to eeg only. - eeg_ind = pick_types(evoked.info, meg=False, eeg=True, stim=False) - eeg_names = [evoked.info['chs'][k]['ch_name'] for k in eeg_ind] - self._regularize(data, variances, eeg_names, eps) - - d, V = linalg.eigh(data) # Compute eigen value decomposition. - - # Compute the unique square root inverse, which is a whitening matrix. - # This matrix can be multiplied with data and leadfield matrix to get - # whitened inverse solutions. - d = 1.0 / np.sqrt(d) - W = d[:,None] * V.T - - # Get all channel indices - n_channels = len(evoked.info['chs']) - ave_ch_names = [evoked.info['chs'][k]['ch_name'] - for k in range(n_channels)] - ind = [ave_ch_names.index(name) for name in self._cov['names']] - - evoked_whiten = copy.copy(evoked) - evoked_whiten.data[ind] = np.dot(W, evoked.data[ind]) - - return evoked_whiten, W - - def whiten_evoked_and_forward(self, evoked, fwd, eps=0.2): - """Whiten an evoked data set and a forward solution - - The whitening matrix is estimated and then multiplied to - forward solution a.k.a. the leadfield matrix. - It makes the additive white noise assumption of MNE - realistic. - - Parameters - ---------- - evoked : Evoked object - A evoked data set - fwd : forward data - A forward solution read with mne.read_forward - eps : float - The regularization factor used. - - Returns - ------- - ave : Evoked object - The whitened evoked data set - fwd : dict - The whitened forward solution. - W : array of shape [n_channels, n_channels] - The whitening matrix - """ - # handle evoked - evoked_whiten, W = self.whiten_evoked(evoked, eps=eps) - - evoked_ch_names = [ch['ch_name'] for ch in evoked_whiten.info['chs']] - - # handle forward (keep channels in covariance matrix) - fwd_whiten = copy.copy(fwd) - ind = [fwd_whiten['sol']['row_names'].index(name) - for name in self._cov['names']] - fwd_whiten['sol']['data'][ind] = np.dot(W, - fwd_whiten['sol']['data'][ind]) - fwd_whiten['sol']['row_names'] = [fwd_whiten['sol']['row_names'][k] - for k in ind] - fwd_whiten['chs'] = [fwd_whiten['chs'][k] for k in ind] - - # keep in forward the channels in the evoked dataset - fwd_whiten = pick_channels_forward(fwd, include=evoked_ch_names, - exclude=evoked.info['bads']) - - return evoked_whiten, fwd_whiten, W - def __repr__(self): s = "kind : %s" % self.kind s += ", size : %s x %s" % self.data.shape diff --git a/mne/fiff/__init__.py b/mne/fiff/__init__.py index 79c481e..60aef75 100644 --- a/mne/fiff/__init__.py +++ b/mne/fiff/__init__.py @@ -10,6 +10,6 @@ from .open import fiff_open from .evoked import Evoked, read_evoked, write_evoked from .raw import Raw, read_raw_segment, read_raw_segment_times, \ start_writing_raw, write_raw_buffer, finish_writing_raw -from .pick import pick_types +from .pick import pick_types, pick_channels from .compensator import get_current_comp diff --git a/mne/fiff/evoked.py b/mne/fiff/evoked.py index c85692b..9d2871e 100644 --- a/mne/fiff/evoked.py +++ b/mne/fiff/evoked.py @@ -396,6 +396,9 @@ class Evoked(object): s += ", n_channels x n_times : %s x %s" % self.data.shape return "Evoked (%s)" % s + @property + def ch_names(self): + return self.info['ch_names'] def read_evoked(fname, setno=0, baseline=None): """Read an evoked dataset diff --git a/mne/fiff/pick.py b/mne/fiff/pick.py index c7df02e..97edd61 100644 --- a/mne/fiff/pick.py +++ b/mne/fiff/pick.py @@ -36,7 +36,7 @@ def channel_type(info, idx): return 'stim' -def pick_channels(ch_names, include, exclude): +def pick_channels(ch_names, include, exclude=[]): """Pick channels by names Returns the indices of the good channels in ch_names. diff --git a/mne/fiff/proj.py b/mne/fiff/proj.py index b3b7685..5483346 100644 --- a/mne/fiff/proj.py +++ b/mne/fiff/proj.py @@ -35,7 +35,7 @@ def read_proj(fid, node): # Locate the projection data nodes = dir_tree_find(node, FIFF.FIFFB_PROJ) if len(nodes) == 0: - return projdata + return projdata tag = find_tag(fid, nodes[0], FIFF.FIFF_NCHAN) if tag is not None: @@ -64,19 +64,19 @@ def read_proj(fid, node): tag = find_tag(fid, item, FIFF.FIFF_PROJ_ITEM_CH_NAME_LIST) if tag is not None: - namelist = tag.data; + namelist = tag.data else: raise ValueError, 'Projection item channel list missing' - tag = find_tag(fid, item,FIFF.FIFF_PROJ_ITEM_KIND); + tag = find_tag(fid, item, FIFF.FIFF_PROJ_ITEM_KIND) if tag is not None: - kind = tag.data; + kind = tag.data else: raise ValueError, 'Projection item kind missing' tag = find_tag(fid, item, FIFF.FIFF_PROJ_ITEM_NVEC) if tag is not None: - nvec = tag.data + nvec = int(tag.data) else: raise ValueError, 'Number of projection vectors not specified' @@ -86,20 +86,21 @@ def read_proj(fid, node): else: raise ValueError, 'Projection item channel list missing' - tag = find_tag(fid, item, FIFF.FIFF_PROJ_ITEM_VECTORS); + tag = find_tag(fid, item, FIFF.FIFF_PROJ_ITEM_VECTORS) if tag is not None: - data = tag.data; + data = tag.data else: raise ValueError, 'Projection item data missing' - tag = find_tag(fid, item, FIFF.FIFF_MNE_PROJ_ITEM_ACTIVE); + tag = find_tag(fid, item, FIFF.FIFF_MNE_PROJ_ITEM_ACTIVE) if tag is not None: - active = tag.data; + active = True else: - active = False; + active = False if data.shape[1] != len(names): - raise ValueError, 'Number of channel names does not match the size of data matrix' + raise ValueError, ('Number of channel names does not match the ' + 'size of data matrix') # Use exactly the same fields in data as in a named matrix one = Bunch(kind=kind, active=active, desc=desc, @@ -128,6 +129,7 @@ def read_proj(fid, node): from .write import write_int, write_float, write_string, write_name_list, \ write_float_matrix, end_block, start_block + def write_proj(fid, projs): """Write a projection operator to a file. @@ -156,7 +158,7 @@ def write_proj(fid, projs): proj['data']['col_names']) write_float_matrix(fid, FIFF.FIFF_PROJ_ITEM_VECTORS, proj['data']['data']) - end_block(fid,FIFF.FIFFB_PROJ_ITEM) + end_block(fid, FIFF.FIFFB_PROJ_ITEM) end_block(fid, FIFF.FIFFB_PROJ) @@ -164,32 +166,37 @@ def write_proj(fid, projs): # Utils def make_projector(projs, ch_names, bads=[]): - """ - % - % [proj,nproj,U] = mne_make_projector(projs,ch_names,bads) - % - % proj - The projection operator to apply to the data - % nproj - How many items in the projector - % U - The orthogonal basis of the projection vectors (optional) - % - % Make an SSP operator - % - % projs - A set of projection vectors - % ch_names - A cell array of channel names - % bads - Bad channels to exclude - % + """Create an SSP operator from SSP projection vectors + + Parameters + ---------- + projs : list + List of projection vectors + ch_names : list of strings + List of channels to include in the projection matrix + bads : list of strings + Some bad channels to exclude + + Returns + ------- + proj : array of shape [n_channels, n_channels] + The projection operator to apply to the data + nproj : int + How many items in the projector + U : array + The orthogonal basis of the projection vectors (optional) """ nchan = len(ch_names) - if len(ch_names) == 0: + if nchan == 0: raise ValueError, 'No channel names specified' - proj = np.eye(nchan, nchan) - nproj = 0; - U = []; + proj = np.eye(nchan, nchan) + nproj = 0 + U = [] # Check trivial cases first if projs is None: - return proj, nproj, U + return proj, nproj, U nactive = 0 nvec = 0 @@ -225,11 +232,11 @@ def make_projector(projs, ch_names, bads=[]): # If there is something to pick, pickit if len(sel) > 0: for v in range(one['data']['nrow']): - vecs[sel, nvec+v] = one['data']['data'][v,vecsel].T + vecs[sel, nvec+v] = one['data']['data'][v, vecsel].T - # Rescale for more straightforward detection of small singular values + # Rescale for better detection of small singular values for v in range(one['data']['nrow']): - onesize = sqrt(np.sum(vecs[:,nvec+v] * vecs[:, nvec + v])) + onesize = sqrt(np.sum(vecs[:, nvec + v] * vecs[:, nvec + v])) if onesize > 0: vecs[:, nvec+v] /= onesize nonzero += 1 @@ -240,27 +247,36 @@ def make_projector(projs, ch_names, bads=[]): if nonzero == 0: return proj, nproj, U - # Reorthogonalize the vectors + # Reorthogonalize the vectors U, S, V = linalg.svd(vecs[:,:nvec], full_matrices=False) - # Throw away the linearly dependent guys - nvec = np.sum((S / S[0]) < 1e-2) - U = U[:,:nvec] - # Here is the celebrated result - proj -= np.dot(U, U.T) - nproj = nvec + # Throw away the linearly dependent guys + nproj = np.sum((S / S[0]) > 1e-2) + U = U[:,:nproj] + + # Here is the celebrated result + proj -= np.dot(U, U.T) return proj, nproj, U def make_projector_info(info): + """Make an SSP operator using the measurement info + + Calls make_projector on good channels. + + Parameters + ---------- + info : dict + Measurement info + + Returns + ------- + proj : array of shape [n_channels, n_channels] + The projection operator to apply to the data + nproj : int + How many items in the projector """ - % - % [proj,nproj] = mne_make_projector_info(info) - % - % Make an SSP operator using the meas info - % - """ - proj, nproj, _ = make_projector(info['projs'], info['ch_names'], info['bads']) + proj, nproj, _ = make_projector(info['projs'], info['ch_names'], + info['bads']) return proj, nproj - diff --git a/mne/inverse.py b/mne/inverse.py index 532fd51..a6681d6 100644 --- a/mne/inverse.py +++ b/mne/inverse.py @@ -1,10 +1,12 @@ # Authors: Alexandre Gramfort <[email protected]> # Matti Hamalainen <[email protected]> +# Rey Ramirez # # License: BSD (3-clause) from math import sqrt import numpy as np +from scipy import linalg from .fiff.constants import FIFF from .fiff.open import fiff_open @@ -485,3 +487,236 @@ def compute_inverse(evoked, inverse_operator, lambda2, dSPM=True): print '[done]' return res + + +def minimum_norm(evoked, forward, cov, picks=None, method='dspm', + orientation='fixed', snr=3, loose=0.2, depth=True, + weightexp=0.5, weightlimit=10, magreg=0.1, + gradreg=0.1, eegreg=0.1, fMRI=None, fMRIthresh=None, + fMRIoff=0.1, pca=True): + """Minimum norm estimate (MNE) + + Compute MNE, dSPM and sLORETA on evoked data starting from + a forward operator. + + Parameters + ---------- + evoked : Evoked + Evoked data to invert + forward : dict + Forward operator + cov : Covariance + Noise covariance matrix + picks : array-like + List of indices of channels to include + method : 'wmne' | 'dspm' | 'sloreta' + The method to use + orientation : 'fixed' | 'free' | 'loose' + Type of orientation constraints 'fixed'. + snr : float + Signal-to noise ratio defined as in MNE (default: 3). + loose : float in [0, 1] + Value that weights the source variances of the dipole components + defining the tangent space of the cortical surfaces. + depth : bool + Flag to do depth weighting (default: True). + weightexp : float + Order of the depth weighting. {0=no, 1=full normalization, default=0.8} + weightlimit : float + Maximal amount depth weighting (default: 10). + magreg : float + Amount of regularization of the magnetometer noise covariance matrix + gradreg : float + Amount of regularization of the gradiometer noise covariance matrix. + eegreg : float + Amount of regularization of the EEG noise covariance matrix. + fMRI : array of shape [n_sources] + Vector of fMRI values are the source points. + fMRIthresh : float + fMRI threshold. The source variances of source points with fMRI smaller + than fMRIthresh will be multiplied by OPTIONS.fMRIoff. + fMRIoff : float + Weight assigned to non-active source points according to fMRI and fMRIthresh. + + Returns + ------- + stc : dict + Source time courses + """ + + # %% ===== CHECK FOR INVALID VALUES ===== + # if OPTIONS.diagnoise + # OPTIONS.pca=0; % Rey added this. If OPTIONS.diagnoise is 1, then OPTIONS.pca=0; 3/23/11 + # display('wMNE> If using diagonal noise covariance, PCA option should be off. Setting PCA option off.') + # end + + assert method in ['wmne', 'dspm', 'sloreta'] + assert orientation in ['fixed', 'free', 'loose'] + + if not 0 <= loose <= 1: + raise ValueError('loose value should be smaller than 1 and bigger than ' + '0, or empty for no loose orientations.') + if not 0 <= weightexp <= 1: + raise ValueError('weightexp should be a scalar between 0 and 1') + if not 0 <= gradreg <= 1: + raise ValueError('gradreg should be a scalar between 0 and 1') + if not 0 <= magreg <= 1: + raise ValueError('magreg should be a scalar between 0 and 1') + if not 0 <= eegreg <= 1: + raise ValueError('eegreg should be a scalar between 0 and 1') + + # Set regularization parameter based on SNR + lambda2 = 1.0 / snr**2 + + normals = [] + for s in forward['src']: + normals.append(s['nn'][s['inuse'] != 0]) + normals = np.concatenate(normals) + + W, ch_names = cov.whitener(evoked.info, magreg, gradreg, eegreg, pca) + + gain = forward['sol']['data'] + fwd_ch_names = [forward['chs'][k]['ch_name'] for k in range(gain.shape[0])] + fwd_idx = [fwd_ch_names.index(name) for name in ch_names] + gain = gain[fwd_idx] + + print "Computing inverse solution with %d channels." % len(ch_names) + + rank_noise = len(W) + print 'Total rank is %d' % rank_noise + + # processing lead field matrices, weights, areas & orientations + # Initializing. + n_positions = gain.shape[1] / 3 + + if orientation is 'fixed': + n_dip_per_pos = 1 + elif orientation in ['free', 'loose']: + n_dip_per_pos = 3 + + n_dipoles = n_positions * n_dip_per_pos + + w = np.ones(n_dipoles) + + # compute power + if depth: + w = np.sum(gain**2, axis=0) + w = w.reshape(-1, 3).sum(axis=1) + w = w[:,None] * np.ones((1, n_dip_per_pos)) + w = w.ravel() + + if orientation is 'fixed': + print 'Appying fixed dipole orientations.' + gain = gain * _block_diag(normals.ravel()[None,:], 3).T + elif orientation is 'free': + print 'Using free dipole orientations. No constraints.' + elif orientation is 'loose': + print 'Transforming lead field matrix to cortical coordinate system.' + 1/0 + # gain, Q_Cortex = bst_xyz2lf(gain, normals) # XXX + # # Getting indices for tangential dipoles. + # itangentialtmp = start:endd; + # itangentialtmp(1:3:end) = []; + # itangential = [itangential itangentialtmp]; %#ok<AGROW> + + # Whiten lead field. + print 'Whitening lead field matrix.' + gain = np.dot(W, gain) + + # Computing reciprocal of power. + w = 1.0 / w + + # apply areas + # if ~isempty(areas) + # display('wMNE> Applying areas to compute current source density.') + # areas = areas.^2; + # w = w .* areas; + # end + # clear areas + + # apply depth weighthing + if depth: + # apply weight limit + # Applying weight limit. + print 'Applying weight limit.' + weightlimit2 = weightlimit**2 + # limit = min(w(w>min(w) * weightlimit2)); % This is the Matti way. + # we do the Rey way (robust to possible weight discontinuity). + limit = np.min(w) * weightlimit2 + w[w > limit] = limit + + # apply weight exponent + # Applying weight exponent. + print 'Applying weight exponent.' + w = w ** weightexp + + # apply loose orientations + if orientation is 'loose': + print 'Applying loose dipole orientations. Loose value of %d.' % loose + w[itangential] *= loose + + # Apply fMRI Priors + if fMRI is not None: + print 'Applying fMRI priors.' + w[fMRI < fMRIthresh] *= fMRIoff + + # Adjusting Source Covariance matrix to make trace of L*C_J*L' equal + # to number of sensors. + print 'Adjusting source covariance matrix.' + source_std = np.sqrt(w) # sqrt(C_J) + trclcl = linalg.norm(gain * source_std[None,:], ord='fro') + source_std *= sqrt(rank_noise) / trclcl # correct C_J + gain *= source_std[None,:] + + # Compute SVD. + print 'Computing SVD of whitened and weighted lead field matrix.' + U, s, Vh = linalg.svd(gain, full_matrices=False) + ss = s / (s**2 + lambda2) + + # Compute whitened MNE operator. + Kernel = source_std[:,None] * np.dot(Vh.T, ss[:,None] * U.T) + + # Compute dSPM operator. + if method is 'dspm': + print 'Computing dSPM inverse operator.' + dspm_diag = np.sum(Kernel**2, axis=1) + if n_dip_per_pos == 1: + dspm_diag = np.sqrt(dspm_diag) + elif n_dip_per_pos in [2, 3]: + dspm_diag = dspm_diag.reshape(-1, n_dip_per_pos) + dspm_diag = np.sqrt(np.sum(dspm_diag, axis=1)) + dspm_diag = (np.ones((1, n_dip_per_pos)) * dspm_diag[:,None]).ravel() + + Kernel /= dspm_diag[:,None] + + # whitened sLORETA imaging kernel + elif method is 'sloreta': + print 'Computing sLORETA inverse operator.' + if n_dip_per_pos == 1: + sloreta_diag = np.sqrt(np.sum(Kernel * gain.T, axis=1)) + Kernel /= sloreta_diag[:,None] + elif n_dip_per_pos in [2, 3]: + for k in n_positions: + start = k*n_dip_per_pos + stop = (k+1)*n_dip_per_pos + R = np.dot(Kernel[start:stop,:], gain[:,start:stop]) + SIR = linalg.matfuncs.sqrtm(R, linalg.pinv(R)) + Kernel[start:stop] = np.dot(SIR, Kernel[start:stop]) + + # Multiply inverse operator by whitening matrix, so no need to whiten data + Kernel = np.dot(Kernel, W) + sel = [evoked.ch_names.index(name) for name in ch_names] + sol = np.dot(Kernel, evoked.data[sel]) + + stc = dict() + stc['inv'] = dict() + stc['inv']['src'] = forward['src'] + # stc['vertices'] = np.concatenate(forward['src'][0]['vertno'], + # forward['src'][1]['vertno']) + stc['sol'] = sol + stc['tmin'] = float(evoked.first) / evoked.info['sfreq'] + stc['tstep'] = 1.0 / evoked.info['sfreq'] + print '[done]' + + return stc, Kernel, W + diff --git a/mne/tests/test_cov.py b/mne/tests/test_cov.py index b25ab50..08cf415 100644 --- a/mne/tests/test_cov.py +++ b/mne/tests/test_cov.py @@ -53,19 +53,16 @@ def test_whitening_cov(): """Whitening of evoked data and leadfields """ data_path = sample.data_path('.') - fwd_fname = op.join(data_path, 'MEG', 'sample', - 'sample_audvis-meg-eeg-oct-6-fwd.fif') ave_fname = op.join(data_path, 'MEG', 'sample', 'sample_audvis-ave.fif') cov_fname = op.join(data_path, 'MEG', 'sample', 'sample_audvis-cov.fif') # Reading - ave = read_evoked(ave_fname, setno=0, baseline=(None, 0)) - fwd = mne.read_forward_solution(fwd_fname) + evoked = read_evoked(ave_fname, setno=0, baseline=(None, 0)) cov = mne.Covariance() cov.load(cov_fname) + cov.whitener(evoked.info) - ave_whiten, fwd_whiten, W = cov.whiten_evoked_and_forward(ave, fwd) # XXX : test something -- Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/python-mne.git _______________________________________________ debian-med-commit mailing list [email protected] http://lists.alioth.debian.org/cgi-bin/mailman/listinfo/debian-med-commit
