The work() functions are for stream handling. You may have already read https://wiki.gnuradio.org/index.php/Message_Passing
The approach here should be to create a separate thread in your init, and have that thread periodically public a message. There is a Message Strobe block that does this (in C++) and you could take a look at that code as an example. On Mon, Nov 29, 2021 at 10:06 AM Michelle <[email protected]> wrote: > Good morning, > > I want to generate a source controlled by voltage. Basically, I send > messages to a source in order to change its amplitude. I have built a block > to generate messages and pass them to the source. To do so, I created an > "embedded python block " that generates and transmits a message every > second, here is the python code of my block: > > from gnuradio import gr, blocks > import pmt > import numpy > import time > > #The block receives input data (the amplitude of a signal) then generates > a message to be passed to a constant source so that the latter modifies its > amplitude" > > class message_generator(gr.sync_block): > def __init__(self): > gr.sync_block.__init__( > self, > name="message generator", > in_sig=[numpy.float32], #amplitude that the signal must have > out_sig=None > ) > self.message_port_register_out(pmt.intern('out_port')) > > > def work(self, input_items, output_items): > self.msg_port_pub(pmt.intern('out_port'), > pmt.cons(pmt.intern("ampl"), pmt.from_double(input_items[0][0]))) > time.sleep(1) > return 1 > > > when I execute my flowgraph in gnuradio I have the error : > self.msg_port_pub(pmt.intern('out_port'), pmt.cons(pmt.intern("ampl"), > pmt.from_double(input_items[0][0]))) > AttributeError: 'message_generator' object has no attribute 'msg_port_pub' > . And I can't understand why if in the qa_python_message_passing.py > <https://github.com/gnuradio/gnuradio/blob/master/gr-blocks/python/blocks/qa_python_message_passing.py> > (below ) the function message_port_pub is called the same way as I do in > mine. > > class message_generator(gr.sync_block): > def __init__(self, msg_interval = 10): > gr.sync_block.__init__( > self, > name="message generator", > in_sig=[numpy.float32], #amplitude that the signal must have > out_sig=None > ) > self.msg_list = [] > #self.message_port_register_in(pmt.intern('in_port')) > self.message_port_register_out(pmt.intern('out_port')) > self.msg_interval = msg_interval > self.msg_ctr = 0 > > > def work(self, input_items, output_items): > > inLen = len(input_items[0]) > > # Create a new PMT for each input value and put it in the message > list > self.msg_list.append(pmt.from_double(input_items[0][0])) > > while self.msg_ctr < len(self.msg_list) and \ > (self.msg_ctr * self.msg_interval) < \ > (self.nitems_read(0) + inLen): > self.message_port_pub(pmt.intern('out_port'), > > pmt.cons(pmt.intern("ampl"),self.msg_list[self.msg_ctr])) > self.msg_ctr += 1 > return inLen > > > PLease can you explain to me what is wrong on my code? I even tried to put > an attribute "msg_port_pub" when declaring the class but I still have the > same error. > > from gnuradio import gr, blocks > import pmt > import numpy > import time > > class message_generator(gr.sync_block): > def __init__(self): > gr.sync_block.__init__( > self, > name="message generator", > in_sig=[numpy.float32], #amplitude that the signal must have > out_sig=None > ) > d_port= self.message_port_register_out(pmt.intern('out_port')) > self.msg_port_pub(d_port, self.buildMessage) > > def buildMessage(self, data): > msg = pmt.cons(pmt.intern("ampl"), pmt.from_double(data)) > return msg > > > def work(self, input_items, output_items): > messageToSend = builMessage(input_items=[0][0]) > self.msg_port_pub(d_port, messageToSend) > time.sleep(1) > return 1 > > > You can find attached my flowgraph and the resulting python code. > > Thank you in advance and have a good day. > > >
