Hi!
thanks for the answers!
Here is the runtime error message:
python read_sweep_C.py
LstFileReader_FAST('/home/jfmoulin/Lab/Experiments/refsans/07_11_PMB/BPSi-JFM-B1-087.lst')
Traceback (most recent call last):
File "read_sweep_C.py", line 278, in <module>
ui = exple2()
File "read_sweep_C.py", line 225, in exple2
tof_resol=.05
File "delay_line.pyx", line 103, in delay_line_C.UI.__init__
(delay_line.c:1212)
self.det_reader = DetectorReader(fname=self.fname,reader=reader,
File "delay_line.pyx", line 328, in
delay_line_C.DetectorReader.__init__ (delay_line.c:3445)
self.list_mode_file = [ListModeFile(fname=fname,reader=reader)
File "delay_line.pyx", line 719, in
delay_line_C.ListModeFile.__init__ (delay_line.c:7367)
self.reader = eval('%s(\'%s\')'%(reader,self.fname))
File "<string>", line 1, in <module>
NameError: name 'LstFileReader_FAST' is not defined
So it seems not to be a type error for an object which would have
a LstFileReader_FAST property or method, right ?
Okay, you asked for it..., here is the full code (yes it is still very
messy and full of FIXMEs but the python version does run...albeit
sooooo slow, hence Cython)
Cheers, JF
from __future__ import division
import numpy as np
cimport numpy as np
import scipy # FIXME??? cimport?
import exceptions
import types
import struct
import string
import sys
import subprocess
import cPickle
import time
import exceptions
from os.path import getsize
import struct
DTYPE_int = np.int
DTYPE_long = np.long
DTYPE_double = np.double
ctypedef np.int_t DTYPE_int_t
ctypedef np.long_t DTYPE_long_t
ctypedef np.double_t DTYPE_double_t
# Forward class declaration
cdef class UI
cdef class DetectorReader
cdef class TofSlabber
cdef class LstFileReader_FAST
cdef class DelayLine
cdef class UI:
"""
A class dealing with the setup of a list mode file reader :
fname: a filename or a list of filenames (lst files)
reader: the name of the low-level reader class to use for this lst file
delay_line_pars: is a dictonary or list of dicts describing the
delay line, keys are:
'name': 'X' or 'Y' is the name of the delay line
'anode': [devicenum,channelnum] is a list of 2 values telling where
to look for the anode signal (the files are numbered in
the same order as in fname)
'cathodes': [[devnum1,channum1],[devnum2,channum2]] is a list of
one or two signal adresses in the format as above.
Each of these one/two addresses corresponds to one
end of the cathode
'length': 342.7 is a value in ns for the length of the delay line
'tol': 20 a tolerance in ns for the discrimination of events
'npix': 256 the number of pixels for the full delay line length
tof_resol : .1 the tof resolution used for histogramming
tof_lims : the tof limits used for the histogram
(None goes to a 0-Tmax default)
timing_accuracy : maximum possible mesuered missmatch in ns between two
events which are to be considered simultaneous
(mismatch could be due to cabling or timing of the
different cards, 3 ns should be safe)
"""
cdef list fname # FIXME
cdef list delay_line_pars
cdef int timing_accuracy
cdef DetectorReader det_reader
cdef TofSlabber slabber
cdef dict strobe_seq
cdef str tof_histo
cdef public np.ndarray slab, histo
cdef public nsweep
def __init__(self,\
fname,\
reader='LstFileReader_FAST',\
delay_line_pars=[{'name':'X',
'anode':[0,0],
'cathodes':[[0,1],[0,2]],
'length':342.7,
'tol':20,
'npix':256
},
{'name':'Y',
'anode':[0,0],
'cathodes':[0,3],
'length':342.7,
'tol':20,
'npix':256
}
],\
tof_resol = .1,\
tof_lims = None,\
timing_accuracy = 3
):
self.fname = fname
self.delay_line_pars = delay_line_pars
self.timing_accuracy = timing_accuracy
self.det_reader = DetectorReader(fname=self.fname,reader=reader,
delay_line_pars=self.delay_line_pars,
timing_accuracy = self.timing_accuracy
)
self.slabber = TofSlabber(resol=tof_resol,tof_lims=tof_lims,
npix=(delay_line_pars[0]['npix'],
delay_line_pars[1]['npix']
)
)
#self.reporter = Lst_reporter(self.slabber)
self.strobe_seq = {}
tof_histo = 'tof_histo'
def old_read(self,nsweep=1):
"""reads nsweep of data and does the tof analysis."""
for i in range(nsweep):
sweep = self.det_reader.next_sweep()
cleaned = self.det_reader.clean_sweep(sweep)
self.slabber._incr(cleaned)
cdef bint _is_ok_sweep(self, tuple sweep):
"""
Tests if a sweep is valid (full time between two starts) and increments
the slabber.
Returns False if sweep not valid, True if valid
"""
cdef bint sweep_empty
cdef int count, swc
cdef str fi
cdef tuple sw
cdef tuple cleaned
sweep_empty = False
# For each subfile we test if we got a complete sweep (i.e not None)
# and if it contains events or not
for count,fi in enumerate(self.fname):
# print 'uuuu',sweep[count]
for swc,sw in enumerate(sweep):
if sw is None:
print 'reached EOF for %s'%count
return False
# A valid sweep maybe empty...
if sw == ():
print 'No counts in last read() for line %s'\
%swc
sweep_empty = True
# A valid non empty sweep needs cleaning and in either case
# we should increment the slabber with the adequate values
if not sweep_empty:
cleaned = self.det_reader.clean_sweep(sweep)
self.slabber._incr(cleaned)
# let us make the slabs and histo directly
# accessible from the UI (...in cython seems to be too deep)
self.slab = self.slabber.slab.view()
self.histo = self.slabber.tof_histo.view()
self.nsweep = self.slabber.nsweep
else:
self.slabber._incr(())
# let us make the slabs and histo directly
# accessible from the UI (...in cython seems to be too
deep) FIXME property
self.slab = self.slabber.slab.view()
self.histo = self.slabber.tof_histo.view()
self.nsweep = self.slabber.nsweep
return True
cpdef int read(self,int nsweep=0):
"""
read nsweep sweeps.
nsweep > 0 reads from present position in file.
nsweep == 0 or None reads all remaining sweeps till EOF.
nsweep <0 reads last nsweep sweeps in file.
Returns the number of successfuly read sweeps
"""
cdef int nsweeps_count
cdef int answ
cdef tuple sweep
nsweeps_count = 0
if nsweep == 0:
answ = 1
while answ :#is not None:
answ = self.read(1)# FAILS HERE!
return nsweeps_count
else:
if nsweep > 0:
for i in range(nsweep):
sweep = self.det_reader.next_sweep()
nsweeps_count += 1
if not self._is_ok_sweep(sweep):
return nsweeps_count - 1
else:
# To read the last sweeps we first need to go to EOF
self.det_reader.goto_end()
for i in range(-nsweep):
sweep = self.det_reader.prev_sweep()
nsweeps_count += 1
if not self._is_ok_sweep(sweep):
return nsweeps_count - 1
return nsweeps_count
def strobe(self,nrepeat=1,nsweep=100):
"""
Stroboscopic measurement over nrepeat blocks of nsweep sweeps
Updates self.strobe_seq into {'slab','tof_histo','intensity'}
where each key leads to a list of corresponding values
"""
li_intensity = []
li_slab = []
li_histo = []
n_slabs = len(self.slabber.li_tof)+1
npix = self.slabber.npix
for repeat in range(nrepeat):
self.slabber.slab = scipy.zeros((n_slabs,npix[0],npix[1]))
self.slabber.tof_histo = scipy.zeros((n_slabs))
for i in range(nsweep):
sweep = self.det_reader.next_sweep()
if sweep is not None:
cleaned = self.det_reader.clean_sweep(sweep)
self.slabber._incr(cleaned)
try:
li_slab.append(self.slabber.slab)
li_histo.append(self.slabber.tof_histo)
li_intensity.append(self.slabber.slab[0].sum())
except:
pass
self.strobe_seq = {'slab':li_slab,
'tof_histo':li_histo,
'intensity':li_intensity
}
def follow(self, intervall=5, begin_at_EOF=False):
"""
reads up to EOF every intervall seconds.
if begin_at_EOF is True waits for one intervall and then reads
the data acquired since follow() was called (not since startMeas())
"""
if begin_at_EOF:
self.det_reader.goto_end()
time.sleep(intervall)
while True:
print 'I start reading again!'
self.read()
print 'Done! I\'ll have a nap for %s seconds.'%intervall
time.sleep(intervall)
def analyze(self, nsweep=0, out='full'):
"""
A convenience function which reads nsweep and returns data only
(unlike read() which just updates the UI properties)
out == 'full' returns {'intensity','histo','nsweep'}
out == 'intensity' returns a Numpy array of intensities
out == 'histo' returns a list of histogramms
"""
# input sanity check
#FIXME
self.read(nsweep=nsweep)
if out == 'full':
out = {'intensity':self.slabber.slab,
'histo':self.slabber.tof_histo,
'nsweep':self.slabber.nsweep
}
elif out == 'intensity':
out = self.slabber.slab
elif out == 'histo':
out = self.slabber.tof_histo,
else:
raise exceptions.InputError
return out
def save_data(self,filename,gzip=True):
"""
save the slab array to a file in [gziped] cPickle format
"""
fiout = open(filename,'w')
cPickle.dump(self.slabber.slab,fiout)
fiout.close()
if gzip:
subprocess.call(['gzip',filename])
# p.save(filename+'.gz',self.slabber.slab[0])
#def display(self,slicenum=0):
# """
# Minimalistic display function
# """
#
p.imshow(p.log10(self.slabber.slab[slicenum]+1),interpolation='nearest')
# p.colorbar()
# p.figure()
# p.semilogy(self.slabber.tof_histo + 1)
cdef class DetectorReader:
"""
Reads a list mode file and returns the 2D slices using
delay line parameters
"""
cdef list fname # FIXME enforce it
cdef list list_mode_file
cdef int n_devices
cdef list delay_line_pars # FIXME
cdef list delay_line
cdef int timing_accuracy
def __init__(self,fname=None,reader='LstFileReader_FAST',\
delay_line_pars={'name':'X',\
'anode':[0,0],\
'cathodes':[[0,1],[0,2]] ,\
'length':300,\
'tol':4.00,\
'npix':256\
},\
timing_accuracy =3\
):
self.fname = list(fname)
# create a list of ListModeFile readers (one per input file)
self.list_mode_file = [ListModeFile(fname=fname,reader=reader)
for fname in self.fname
]
self.n_devices = len(self.list_mode_file)
if type(delay_line_pars) != types.ListType:
delay_line_pars = [delay_line_pars]
self.delay_line_pars = delay_line_pars
# create a list of DelayLine instances
# (one per dictionary of parameters given as input to __init__)
self.delay_line = [DelayLine(dlp) for dlp in self.delay_line_pars]
self.timing_accuracy = timing_accuracy
def goto_end(self):
"""
Goes to the last full sweep in lst
"""
for fi in self.list_mode_file:
fi.reader.goto_end()
cdef tuple next_sweep(self):
"""
Returns a raw sweep in the form [[signals dev0][signal dev1]...].
Useful for sweep analysis (counter behaviour)
"""
cdef tuple sweep
cdef ListModeFile fi
sweep = tuple([fi.next_sweep() for fi in self.list_mode_file])
return sweep
def prev_sweep(self):
"""
Returns a raw sweep in the form [[signals dev0][signal dev1]...].
Useful for sweep analysis (counter behaviour)
"""
sweep = tuple([fi.prev_sweep() for fi in self.list_mode_file])
return sweep
cdef dict format_sweep(self,tuple raw_sweep,tuple
li_signals=('anode','cathodes')):
"""
Returns a dictionary of signals according to the definitions
given in self.delay_line_pars.
This output is more human-readable
"""
cdef dict out_sweep
cdef dict dlp
cdef str dl_name
cdef str signal
out_sweep = {}
for dlp in self.delay_line_pars:
dl_name = dlp['name']
out_sweep[dl_name] = {}
for signal in li_signals:
#print 'tttttt',type(dlp[signal][0])
#print 'dlp[sig]',type(dlp[signal][0])
if type(dlp[signal][0]) != types.IntType:
try:
out_sweep[dl_name][signal] =
tuple([raw_sweep[dlp[signal]
[terminal]
[0]
]
[dlp[signal]
[terminal]
[1]
]
for terminal in [0,1]
])
except:
print 'Signal not available'
return None # FIXME dangerous for a tuple???
#except:
else:
try:
out_sweep[dl_name][signal] = raw_sweep[dlp[signal][0]]\
[dlp[signal][1]]
except:
print 'Signal not available'
return None
#except: #exceptions.TypeError:
## we got an empty raw_sweep!
#print 'we got an empty raw sweep',raw_sweep
#return None
return out_sweep
cpdef tuple clean_sweep(self, tuple sweep):
"""
Performs the discri on the sweep and tries to reconstruct missing
events as much as possible
"""
cdef dict formated_sweep
cdef np.ndarray discriminatedX, discriminatedY,evx # FIXME USE
buffer nottation !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!11
cdef list ev#FIXME!!!!!!!!!!!!evx to tuple??
cdef np.ndarray index_evy
formated_sweep = self.format_sweep(sweep)
if formated_sweep is None:
return ()
discriminatedX = self.delay_line[0].do_discri(formated_sweep)
discriminatedY = self.delay_line[1].do_discri(formated_sweep)
#print 'discriX',discriminatedX
#print 'discriY',discriminatedY
# !!!! FIXME should avoid the search by being cleverer
ev = []
for evx in discriminatedX:
try:
index_evy = scipy.nonzero(abs(discriminatedY[:,0]-evx[0]) <
self.timing_accuracy
)[0]
if len(index_evy)!= 0:
ev.append(tuple([evx[0],self.delay_line[0]._locate(evx),
self.delay_line[1]._locate(
discriminatedY[index_evy][0]
)
])
)# FIXME !!!be sure that there coulld be
only one evy
except:
pass
# print 'fixme with user defined error npixrange 11'
#except:
#print 'ahaaaaa!'
#pass
return tuple(ev)
cdef class DelayLine:
"""
A class describing a delay line (addresses and length) as well
as providing the discrimination functions
"""
cdef public dict delay_line_pars # FIXME is this optimal
cdef bint has_anode #bool fails withdelay_line.c:443: warning:
implicit declaration of function ‘PyBool_CheckExact’ at compile and
<type 'exceptions.ImportError'>:
/home/jfmoulin/My_Progs/refsans/refsans_cython/read_sweep/C/delay_line_C.so:
undefined symbol: PyBool_CheckExact at runtime
cdef int has_cathodes
cdef double length
cdef double tol
cdef public tuple window
cdef list li_sig #FIXME
cdef int npix
cdef double npix_2, npix_length, npix_length_2
cdef int (*locate)(DelayLine, np.ndarray) #FIXME
http://www.mail-archive.com/[email protected]/msg02700.html)
cdef bint (*discri)(DelayLine, tuple)
#------------------------
def __init__(self,dict delay_line_pars):
self.delay_line_pars = delay_line_pars
self.has_anode = self.delay_line_pars.has_key('anode')
self.has_cathodes = 0
self.length = self.delay_line_pars['length']
self.tol = self.delay_line_pars['tol']
self.window = (self.length - self.tol , self.length + self.tol)
if self.delay_line_pars.has_key('cathodes'):
try:
self.delay_line_pars['cathodes'][1][0]
self.has_cathodes = 2
except:
self.has_cathodes = 1
self.locate = self.locate_single_terminal
if self.has_anode and self.has_cathodes == 2:
self.li_sig = ['anode','cathodes']
self.locate = self.locate_two_terminals
self.npix = self.delay_line_pars['npix']
self.npix_2 = self.npix * .5
self.npix_length = self.npix / self.length
self.npix_length_2 = self.npix_2 / self.length
cpdef int _locate(self, np.ndarray signals):
return self.locate(self, signals) # FIXME cython strangeness cdef fails
#even if call is from cython code!
cdef bint discri_complete(self, tuple signals):
"""
The easy case where we have exactly one anode and two cathodes signals
"""
cdef int calc_length
cdef bint discri_sum
calc_length = signals[1] + signals[2] - 2 * signals[0]
discri_sum = calc_length >= self.window[0] and \
calc_length <= self.window[1]
return discri_sum
cdef bint discri_single_cath(self, tuple signals):
#FIXME is tuplethe best here???
cdef int calc_pos
cdef bint discri_pos
calc_pos = signals[1] - signals[0]
discri_pos = calc_pos <= self.window[1]
return discri_pos
cpdef np.ndarray do_discri(self,dict sweep):
cdef list li_discriminated
cdef int count_anode
cdef int anode
cdef tuple li_anode,sig # FIXME NAME??
cdef bint only_one_ev
cdef tuple li_cathodes
self.discri = self.discri_complete
# extract what is relevant to this delay line only
sweep = sweep[self.delay_line_pars['name']]
li_discriminated = []
for count_anode , anode in enumerate(sweep['anode']):
li_anode = self.within_delay(sweep['anode'],anode)
only_one_ev = len(li_anode) == 1
if self.has_cathodes == 2:
li_cathodes =
tuple([self.within_delay(sweep['cathodes'][cathcount],
anode
)
for cathcount in [0,1]
])
only_one_ev = only_one_ev and \
len(li_cathodes[0]) == 1 and \
len(li_cathodes[1]) == 1
else:
li_cathodes = self.within_delay(sweep['cathodes'],anode)
only_one_ev = only_one_ev and len(li_cathodes) == 1
self.discri = self.discri_single_cath
if only_one_ev:
if self.has_cathodes==2:
sig = (li_anode[0],li_cathodes[0][0],li_cathodes[1][0])
else:
sig = (li_anode[0],li_cathodes[0])
if self.discri(self,sig):# FIXME why must I put self
in the args!?
li_discriminated.append(sig)
#else:
# print 'rej',sig
discriminated = scipy.array(li_discriminated)
return discriminated
cdef tuple within_delay(self,tuple li_signals,int anode):
cdef int tmax
cdef int ev
tmax = self.window[1] + anode
return tuple([ev for ev in li_signals if ev>=anode and ev<=tmax])
cdef int locate_single_terminal(self,np.ndarray signals):
"""
Position the event on a self.pix scale (if self.npix is None,
the return is continuous over [0,1]
Input should be an array [anode, c1]
"""
cdef int pos
pos = int(round((signals[1] - signals[0]) * self.npix_length))
return pos
cdef int locate_two_terminals(self,np.ndarray signals):
"""
Position the event on a self.pix scale (if self.npix is None,
the return is continuous over [0,1]
Input should be an array [anode, c1, c2]
"""
cdef int pos
try:
pos = int(round(self.npix_2 + \
(signals[:,2] - signals[:,1]) * self.npix_length_2)
)
except:
pos = int(round(self.npix_2 + \
(signals[2] - signals[1]) * self.npix_length_2)
)
return pos
cdef class TofSlabber:
"""A class implementing the TOF histogramming """
# NOTE: for performance reasons defined directly only for 2D images
# do it for 1D counters...
# FIXME add a check to avoid incrementing severall times for
# the same sweep?????
cdef tuple tof_lims
cdef double resol
cdef tuple npix
cdef public int nsweep
cdef np.ndarray li_tof, li_tof_low, li_tof_high
cdef public np.ndarray slab, tof_histo
def __init__(self,tof_lims=None,resol=.1,npix=(256,256)):
self.tof_lims = tof_lims
self.resol = resol
self.npix = npix
self.nsweep = 0
if self.tof_lims is not None:
self.init_tof_array()
def init_tof_array(self):
"""
Initializes the tof histogram
"""
n_slabs =
round(scipy.log(self.tof_lims[1]/self.tof_lims[0])/scipy.log(1 +
self.resol)) + 1
#print 'tof_lims/resol (init_tof_array)',self.tof_lims,self.resol
#print 'n_slabs (init_tof_array)',n_slabs
self.li_tof = scipy.append(scipy.array([0]),
self.tof_lims[0] *
scipy.logspace(0,
n_slabs,
num=n_slabs,
base=1 + self.resol
)
)
#print 'li_tof (init_tof_array)',self.li_tof
self.li_tof_low = (1 - .5 * self.resol) * self.li_tof
self.li_tof_high = (1 + .5 * self.resol) * self.li_tof
self.slab = scipy.zeros((n_slabs,self.npix[0],self.npix[1]))
self.tof_histo = scipy.zeros((n_slabs))
def _incr(self,sweep):
self.incr(sweep)
cdef void incr(self,tuple sweep) except *:
"""
Increments the tof histogramm and the different channels with the
events found in a sweep.
sweep should be a tuple.
"""
cdef tuple ev
cdef int slab_num
#print 'sweep (input to incr)',sweep
self.nsweep += 1
if sweep == () :
return
if self.tof_lims is None:
# print (s.array(sweep[0]).min() *
.9,s.array(sweep[:0]).max() * 1.15)
self.tof_lims = (int(scipy.array(sweep)[:,0].min() * .9),\
1 + int(scipy.array(sweep)[:,0].max() * 1.5)
)
self.init_tof_array()
for ev in sweep:
slab_num = scipy.nonzero(ev[0]>=self.li_tof_low)[0][-1]
# try to increment the correct tof slice
# try:
#print 'slab_num (incr)', slab_num
if slab_num > len(self.li_tof)-2:
print 'Event rejected, tof>tmax'
else:
try:
self.slab[slab_num][ev[1],ev[2]] += 1
# increment the integrated signal
self.slab[0][ev[1],ev[2]] += 1
# increment the tof histogram
self.tof_histo[slab_num] += 1
except exceptions.IndexError:
print 'fixme with user error about range'#,sweep
# pass
cdef class ListModeFile:
"""
Implementation of all operations on list mode files.
This class provides a level of abstraction to separate from the
details of the file structure.
fname: name of the file to be read
reader: the name of a class defining a reader for this file.
See LstFileReader_FAST for details of necessary methods.
"""
cdef str fname
cdef LstFileReader_FAST reader# FIXME!!!!
def __init__(self,fname,reader='LstFileReader_FAST'):
self.fname = fname
print '%s(\'%s\')'%(reader,self.fname)
self.reader = eval('%s(\'%s\')'%(reader,self.fname))
# self.reader = eval('low_level_reader.%s(\'%s\')'\
# %(reader,self.fname)
# )#FIXME into something less harmful
cpdef tuple next_sweep(self):
"""
Returns the next available full sweep starting from the present
position in the file
"""
return self.reader.next_sweep()
def prev_sweep(self):
"""
Reads the previous full sweep starting from present pos in file
"""
return self.reader.prev_sweep()
def goto_start(self):
"""
Goes to first sweep in file.
"""
self.reader.goto_start()
def goto_end(self):
"""
Goes to last full sweep in file.
"""
self.reader.goto_end()
def nevents(self):
"""
Returns the number of events in the file (incl start signals)
"""
return self.reader.nevents()
cdef class LstFileReader_FAST:
"""
Implement operations on a FAST list mode file (.lst).
This is the low level reader
"""
cdef str fname
cdef tuple (*do_read_ev)(LstFileReader_FAST)
cdef file lstfile # FIXME
cdef list header
def __init__(self,fname):
self.fname = fname
self.goto_start()
# we define a function pointer to the reader.next in order to be
# able to change the reading direction easily
self.do_read_ev = self._next_ev
def __iter__(self):
return self
def goto_start(self):
"""
Read the header. Goto first event in file.
"""
try:
self.lstfile.close()
except exceptions.AttributeError:
pass
self.lstfile = open(self.fname,'rb')
self.header = []
while True:
line = self.lstfile.readline()
#print line
if '[DATA]' in line:
break
else:
self.header.append(line)
def goto_end(self):
"""
Goto the position of the last fully written event in file.
"""
# Count the events in file, there are 4 bytes per event...
nev = self.nevents()
end_pos = nev * 4
last_ev_pos = (nev - 1) * 4
self.lstfile.seek(last_ev_pos,1)
def nevents(self):
"""
return the number of events in file (incl starts)
"""
self.goto_start()
start_offset = self.lstfile.tell()
nev = round((getsize(self.fname) - start_offset)/4)
return nev
cdef tuple prev(self):
return self._prev()
cdef tuple _prev(self):
"""
read previous event in file, jump back before its position and
return [channel,value].
Use to read a file backwards.
"""
cdef tuple ev
try:
self.lstfile.seek(-4,1)
except exceptions.IOError:
return None
ev = self.next_ev()
self.lstfile.seek(-4,1)
return ev
cdef tuple next_ev(self):
return self._next_ev()
cdef tuple _next_ev(self):
"""
Read the next event in file and decode it. Return [chan_number, value]
"""
cdef int input_chan
cdef int value
cdef str st_data
cdef int data
cdef int bit30
cdef int bit31
input_chan = -1#None
value = -1#None
# cython trick for strings from python (see manual p15))!
# s = self.lstfile.read(4)
#print self.lstfile.tell()
#print 's',type(s),s
st_data = self.lstfile.read(4)
#print 'st_data',st_data
#print data
try:
data = struct.unpack('i',st_data)[0]
# print 'data', data
except struct.error:
# print 'struct_error!return None'
return (None,)
#Reading bits 30 and 31 for input channel:
#struct.unpack('>i','\x40\x00\x00\x00')[0] != 0)
bit30 = int(data & 1073741824!=0)
#struct.unpack('>i','\x80\x00\x00\x00')[0] !=0)
bit31 = int(data & -2147483648!=0)
#print 'bits', bit30, bit31
input_chan = 1 * bit30 + 2 * bit31
# key = str(bit30) + str(bit31)
#print 'key',key
#print bit30, bit31, 'event @',dict_input[key]
#value of int(bits0_29): turn the bits 30 and 31
# off and convert the value
#struct.unpack('>i','\x3f\xff\xff\xff')[0]
value = 32 + data & 1073741823
#print value
#vals[dict_input[key]-1].append(value)
#input_chan = LstFileReader_FAST.dict_input[key]
#print 'chan,val',input_chan, value
return (input_chan, value)
cdef tuple next_sweep(self):
return self._next_sweep()
cdef tuple _next_sweep(self):
"""
Returns the next available full sweep starting from the present
position in the file (implement premature eof!!!) FIXME
"""
cdef list channels#, last_ev
cdef tuple last_ev
cdef list chan
channels = []
last_ev = (None, None)
# last_ev = [[],[]]
# if necessary look for start event
try:
while last_ev[1] != 32:
last_ev = self.do_read_ev(self)# FIXME
# print 'last_ev',last_ev
except exceptions.TypeError:
return None
last_ev = (None,None)
# then accumulate data up to next start
while last_ev[1] != 32:
last_ev = self.do_read_ev(self) # FIXME
# in case of EOF before START signal,
# we return None (uncomplete sweep)
if last_ev[0] is None : return None
# if we got a start signal we are done
if last_ev[1] == 32:
return tuple([tuple(chan) for chan in channels])
else:
# auto adjusting the number of channels
try:
channels[last_ev[0]].append(last_ev[1])
except:
channels.extend([[] for count in
range(last_ev[0]-len(channels)+1)
]
)
channels[last_ev[0]].append(last_ev[1])
return tuple([tuple(chan) for chan in channels])
cdef tuple prev_sweep(self):
"""
Reads previous full sweep and goes back to its starting pos.
Use to read backwards.
"""
cdef tuple channels
self.do_read_ev = self.prev
channels = self.next_sweep()
if channels is not None:
# we read backwards... so we have to reorder the events
# in increasing time
channels = [chan[::-1] for chan in channels]
# restore the normal reading direction of the file
self.do_read_ev = self.next_ev
return channels
def progress(self):
"""
progress() returns (Mbytes_read,file_progress(%))
"""
return(self.lstfile.tell()/(1024*1024))#,fprogress(self.lstfile))
On Tue, Apr 21, 2009 at 10:23 AM, Robert Bradshaw
<[email protected]> wrote:
> On Apr 21, 2009, at 12:45 AM, Jean-Francois Moulin wrote:
>
>> Hi,
>>
>> first of all, thanks for the replies.
>>
>> As for the first answer by Lisandro, I do not understand then why my
>> first code fragment at all works...
>> The only difference is that the function called has two params, one
>> instance of one of my classes and a numpy array, while the faulty
>> piece of code should be called on an instance
>> of one of my classes only...
>>
>> As for the second, from Robert, now, very well... how do I specify a
>> value for this implicit parameter then!? It is a bint I guess... I
>> tried to set it to 1 and got new error messages:
>> delay_line.pyx:961:30: Cannot assign type 'tuple (LstFileReader_FAST,
>> int __pyx_skip_dispatch, struct
>> __pyx_opt_args_12delay_line_C_18LstFileReader_FAST_next_ev
>> *__pyx_optional_args)' to 'tuple (*)(LstFileReader_FAST)'
>
> You can't. I agree the errors are more cryptic than they should be,
> but C functions have no concept of runtime dispatching or default
> arguments, so it doesn't work to cast such a function to a C function
> (well, maybe one could hack it, but it'd be ugly and fragile).
>
>> I also tried to use cdef in all cases, I get no compile time error but
>> at runtime my object LstFileReader_FAST appears no longer to be
>> defined!? I do not understand because all the calls are made from
>> within Cython code... (so that I expect the cpdef is not really
>> needed)
>
> I am guessing you're doing something like
>
> a. LstFileReader_FAST(...)
>
> and it's complaining that "a has not attribute 'LstFileReader_FAST'".
> Usually this indicates that a is not typed correctly (e.g. it thinks
> a is an ordinary object). You can run cython with the -a option to
> easily see this kind of thing.
>
> - Robert
>
> _______________________________________________
> Cython-dev mailing list
> [email protected]
> http://codespeak.net/mailman/listinfo/cython-dev
>
_______________________________________________
Cython-dev mailing list
[email protected]
http://codespeak.net/mailman/listinfo/cython-dev