Hello GnuRadio fans, I’m afraid I’m a complete failure at creating my own vector average with decimate block. This is my 4th week at this endeavor...
I’ve run through a number of c++ and python web examples and each time am getting hung up somewhere. I’ve been following the examples from many gnu radio web sites and trying to get them to complete Including: https://wiki.gnuradio.org/index.php/OutOfTreeModules I got the C++ example to work but could not get the python qa test to pass. In any case, the vector average with decimate must be an easy example but I can not get the qa part to work. Is there anyone out there who could create the very rudimentary vector average with decimate that they could share? Please find my code attached. The python part is simple. The QA part is not running yet. I’ve started with: gr_modtool create vave cd gr-vave gr_modtool add -t decimator -l python vave
#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright 2018 <+YOU OR YOUR COMPANY+>. # # This is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3, or (at your option) # any later version. # # This software is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this software; see the file COPYING. If not, write to # the Free Software Foundation, Inc., 51 Franklin Street, # Boston, MA 02110-1301, USA. # import numpy from gnuradio import gr class vave(gr.decim_block): """ docstring for block vave """ def __init__(self, vsize, vdecimate): gr.decim_block.__init__(self, name="vave", in_sig=[(numpy.float32, int(vsize))], out_sig=[(numpy.float32, int(vsize))], decim=int(vdecimate)) self.vsize = int(vsize) self.vdecimate = int(vdecimate) self.sum = numpy.zeros(self.vsize) self.count = 0 self.oneovern = 1./float(self.vdecimate) def forecast( self, noutput_items, ninput_items): return self.vdecimate def work(self, input_items, output_items): """ Work averages all input vectors and outputs one vector for all inputs """ out = output_items[0] # get the number of input vectors n = len( input_items) print 'Number of work input vectors: ',n for j in range(n): # get the lenght of one input in0 = input_items[j] print 'Lenght of input ', len(in0),' for vectors: ',j # indicate consumption of a vector from input self.consume(0,len(in0)) # now save this vector until all are received self.sum = self.sum + in0 self.count = self.count + 1 if self.count >= self.vdecimate: # normalize output average out[:] = self.oneovern * self.sum[:] # now reset the count and restart the sum self.count = 0 self.sum = numpy.zeros( self.vdecimate) return len(output_items[0]) # end for all input vectors # if here, then not enough vectors input to produce an output return 0 # end vave()
#!/usr/bin/env python # -*- coding: utf-8 -*- # # Copyright 2018 <+YOU OR YOUR COMPANY+>. # # This is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3, or (at your option) # any later version. # # This software is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this software; see the file COPYING. If not, write to # the Free Software Foundation, Inc., 51 Franklin Street, # Boston, MA 02110-1301, USA. # import numpy from gnuradio import gr, gr_unittest from gnuradio import blocks from vave import vave class qa_vave (gr_unittest.TestCase): def setUp (self): self.tb = gr.top_block () def tearDown (self): self.tb = None def test_001_t (self): vsize = 1024 vdecimate = 4 vin = numpy.zeros(vsize*vdecimate) # create a set of vectors src = blocks.vector_source_f( vin.tolist()) s2v = blocks.stream_to_vector(gr.sizeof_float, vsize) # block we're testing vblock = vave( vsize, vdecimate) v2s = blocks.vector_to_stream( gr.sizeof_float, vsize) snk = blocks.vector_sink_f(vsize) self.tb.connect (src, s2v) self.tb.connect (s2v, vblock) self.tb.connect (vblock, snk) # self.tb.connect (v2s, snk) expected = vin[0:vsize] outdata = None waittime = 0.01 self.tb.run () outdata = snk.data() # check data self.assertFloatTuplesAlmostEqual (expected, outdata, 6) if __name__ == '__main__': gr_unittest.run(qa_vave, "qa_vave.xml")
_______________________________________________ Discuss-gnuradio mailing list Discuss-gnuradio@gnu.org https://lists.gnu.org/mailman/listinfo/discuss-gnuradio