Yesterday I asked a question about a failing flowgraph when connecting the copy block to a message source and running the flowgraph multiple times.
I'm still trying to understand what's going on, so I decided to write a version of "copy" in python to try and better understand things. Even though I've successfully written basic blocks in Python before, I am really struggling to make this work. As it's implemented, it could (should) be a sync_block (and works if it is), but copy needs to be a basic block to handle disabled copying. The test hangs as if "py_copy" is never passing on the WORK_DONE from the vector_sink. Does anyone see the error? import numpy as np from gnuradio import gr, gr_unittest, blocks class py_copy(gr.basic_block): """A python version of the copy block for comparison testing.""" def __init__(self, itemtype): gr.basic_block.__init__( self, name="py_copy", in_sig=[itemtype], out_sig=[itemtype] ) def general_work(self, input_items, output_items): n = min(len(input_items[0]), len(output_items[0])) output_items[0][:n] = input_items[0][:n] self.consume_each(n) return n class qa_copy(gr_unittest.TestCase): def setUp(self): self.tb = gr.top_block() def tearDown(self): self.tb = None def test_copy(self): src_data = np.arange(1000) src = blocks.vector_source_f(src_data) #copy_block = blocks.copy(gr.sizeof_float) copy_block = py_copy(np.float32) null_sink = blocks.null_sink(gr.sizeof_float) self.tb.connect(src, copy_block, null_sink) self.tb.run() self.assertEqual(1000, copy_block.nitems_written(0)) if __name__ == '__main__': gr_unittest.run(qa_copy)
_______________________________________________ Discuss-gnuradio mailing list Discuss-gnuradio@gnu.org https://lists.gnu.org/mailman/listinfo/discuss-gnuradio