The work() functions are for stream handling. You may have already read

https://wiki.gnuradio.org/index.php/Message_Passing

The approach here should be to create a separate thread in your init, and
have that thread periodically public a message. There is a Message Strobe
block that does this (in C++) and you could take a look at that code as an
example.

On Mon, Nov 29, 2021 at 10:06 AM Michelle <[email protected]> wrote:

> Good morning,
>
> I want to generate a source controlled by voltage. Basically, I send
> messages to a source in order to change its amplitude. I have built a block
> to generate messages and pass them to the source. To do so, I created an
> "embedded python block " that generates and transmits a message every
> second, here is the python code of my block:
>
> from gnuradio import gr, blocks
> import pmt
> import numpy
> import time
>
> #The block receives input data (the amplitude of a signal) then generates
> a message to be passed to a constant source so that the latter modifies its
> amplitude"
>
> class message_generator(gr.sync_block):
>     def __init__(self):
>         gr.sync_block.__init__(
>             self,
>             name="message generator",
>             in_sig=[numpy.float32], #amplitude that the signal must have
>             out_sig=None
>         )
>         self.message_port_register_out(pmt.intern('out_port'))
>
>
>     def work(self, input_items, output_items):
>        self.msg_port_pub(pmt.intern('out_port'),
> pmt.cons(pmt.intern("ampl"), pmt.from_double(input_items[0][0])))
>        time.sleep(1)
>        return 1
>
>
> when I execute my flowgraph in gnuradio I have the error :
> self.msg_port_pub(pmt.intern('out_port'), pmt.cons(pmt.intern("ampl"),
> pmt.from_double(input_items[0][0])))
> AttributeError: 'message_generator' object has no attribute 'msg_port_pub'
> . And I can't understand why if in the  qa_python_message_passing.py
> <https://github.com/gnuradio/gnuradio/blob/master/gr-blocks/python/blocks/qa_python_message_passing.py>
> (below ) the function message_port_pub is called the same way as I do in
> mine.
>
> class message_generator(gr.sync_block):
>     def __init__(self, msg_interval = 10):
>         gr.sync_block.__init__(
>             self,
>             name="message generator",
>             in_sig=[numpy.float32], #amplitude that the signal must have
>             out_sig=None
>         )
>         self.msg_list = []
>         #self.message_port_register_in(pmt.intern('in_port'))
>         self.message_port_register_out(pmt.intern('out_port'))
>         self.msg_interval = msg_interval
>         self.msg_ctr = 0
>
>
>     def work(self, input_items, output_items):
>
>         inLen = len(input_items[0])
>
>         # Create a new PMT for each input value and put it in the message
> list
>         self.msg_list.append(pmt.from_double(input_items[0][0]))
>
>         while self.msg_ctr < len(self.msg_list) and \
>                 (self.msg_ctr * self.msg_interval) < \
>                 (self.nitems_read(0) + inLen):
>             self.message_port_pub(pmt.intern('out_port'),
>
> pmt.cons(pmt.intern("ampl"),self.msg_list[self.msg_ctr]))
>             self.msg_ctr += 1
>         return inLen
>
>
> PLease can you explain to me what is wrong on my code? I even tried to put
> an attribute "msg_port_pub" when declaring the class but I still have the
> same error.
>
> from gnuradio import gr, blocks
> import pmt
> import numpy
> import time
>
> class message_generator(gr.sync_block):
>     def __init__(self):
>         gr.sync_block.__init__(
>             self,
>             name="message generator",
>             in_sig=[numpy.float32], #amplitude that the signal must have
>             out_sig=None
>         )
>         d_port= self.message_port_register_out(pmt.intern('out_port'))
>         self.msg_port_pub(d_port, self.buildMessage)
>
>     def buildMessage(self, data):
>        msg = pmt.cons(pmt.intern("ampl"), pmt.from_double(data))
>        return msg
>
>
>     def work(self, input_items, output_items):
>        messageToSend = builMessage(input_items=[0][0])
>        self.msg_port_pub(d_port, messageToSend)
>        time.sleep(1)
>        return 1
>
>
> You can find attached my flowgraph and the resulting python code.
>
> Thank you in advance and have a good day.
>
>
>
  • Problems ... Michelle
    • Re: ... Jeff Long
      • ... Jeff Long
    • Re: ... Jeff Long
      • ... Jeff Long
        • ... Jeff Long
          • ... Jeff Long
            • ... GNU Radio, the Free & Open-Source Toolkit for Software Radio

Reply via email to