I have the attached codes which were working and the uhd_cc2420_rxtest.py was
decoding packets albeit with high error rate. I upgraded to gnuradio 3.6 and
had to change a few commands like changing packet_utils to digital and
clock_clock_recovery_mm_ff to digital_clock_recovery_mm_ff.
Both codes are executing without errors and I can see that packets are being
sent using an independent spectrum analyser. Yet I am not receiving any packets
from uhd_cc2420_rxtest.py.
Is there any change that I need to do to at least get a few packets
irrespective of the error rate?
H.E. Baidoo-Williams
GRA, University of Iowa
#!/usr/bin/env python
#
# Transmitter of IEEE 802.15.4 RADIO Packets.
#
# Modified by: Thomas Schmid, Sanna Leidelof, Kresimir Dabcevic
#
from gnuradio import gr, eng_notation, uhd
from gnuradio import ucla
from gnuradio.ucla_blks import ieee802_15_4_pkt
from gnuradio.eng_option import eng_option
from optparse import OptionParser
import math, struct, time
class transmit_path(gr.top_block):
def __init__(self, options):
gr.top_block.__init__(self)
self.normal_gain = 8000
self.u = uhd.usrp_sink(device_addr=options.address,
io_type=uhd.io_type.COMPLEX_FLOAT32,
num_channels=1)
self.u.set_clock_config(uhd.clock_config.internal(), uhd.ALL_MBOARDS)
u = self.u
self._data_rate = 2000000
self._spb = 2
# Set and print sampling rate
#self._samp_rate = 200000
self._samp_rate = 2000000
self.u.set_samp_rate(self._samp_rate)
input_rate = self.u.get_samp_rate()
print "Sampling rate: %d" %(input_rate)
# Set and print center frequency
self.u.set_center_freq(options.cordic_freq)
frekva = self.u.get_center_freq()
print "Center frequency: %d" %(frekva)
# transmitter
self.packet_transmitter = ieee802_15_4_pkt.ieee802_15_4_mod_pkts(self, spb=self._spb, msgq_limit=2)
self.gain = gr.multiply_const_cc (self.normal_gain)
self.connect(self.packet_transmitter, self.gain, self.u)
self.filesink = gr.file_sink(gr.sizeof_gr_complex, 'tx_test.dat')
self.connect(self.gain, self.filesink)
def set_gain(self, gain):
self.gain = gain
self.u.set_gain(gain, 0)
def send_pkt(self, payload='', eof=False):
return self.packet_transmitter.send_pkt(0xe5, struct.pack("HHHH", 0xFFFF, 0xFFFF, 0x10, 0x10), payload, eof)
def main ():
parser = OptionParser (option_class=eng_option)
parser.add_option("-a", "--address", type="string", default="addr=192.168.10.2",
help="Address of UHD device, [default=%default]")
parser.add_option ("-c", "--cordic-freq", type="eng_float", default=2475000000,
help="set Tx cordic frequency to FREQ", metavar="FREQ")
parser.add_option ("-r", "--data-rate", type="eng_float", default=2000000)
parser.add_option ("-f", "--filename", type="string",
default="rx.dat", help="write data to FILENAME")
parser.add_option ("-g", "--gain", type="eng_float", default=0,
help="set Rx PGA gain in dB [0,20]")
parser.add_option ("-N", "--no-gui", action="store_true", default=False)
(options, args) = parser.parse_args ()
tb = transmit_path(options)
tb.start()
for i in range(1000):
print "send message %d:"%(i+1,)
tb.send_pkt(payload=struct.pack('9B', 0x1, 0x80, 0x80, 0xff, 0xff, 0x10, 0x0, 0x20, 0x0))
#this is an other example packet we could send.
#tb.send_pkt(struct.pack('BBBBBBBBBBBBBBBBBBBBBBBBBBB', 0x1, 0x8d, 0x8d, 0xff, 0xff, 0xbd, 0x0, 0x22, 0x12, 0xbd, 0x0, 0x1, 0x0, 0xff, 0xff, 0x8e, 0xff, 0xff, 0x0, 0x3, 0x3, 0xbd, 0x0, 0x1, 0x0, 0x0, 0x0))
#print "Payload: " + str(map(hex, map(ord, payload)))
time.sleep(0.1)
# tb.wait()
if __name__ == '__main__':
# insert this in your test code...
#import os
#print 'Blocked waiting for GDB attach (pid = %d)' % (os.getpid(),)
#raw_input ('Press Enter to continue: ')
main ()
#!/usr/bin/env python
#
# Decoder of IEEE 802.15.4 RADIO Packets.
#
# Modified by: Thomas Schmid, Leslie Choong, Mikhail Tadjikov, Kresimir Dabcevic
#
from gnuradio import gr, eng_notation, uhd
from gnuradio.ucla_blks import ieee802_15_4_pkt
from gnuradio.eng_option import eng_option
from optparse import OptionParser
import struct, sys, time, math
n2s = eng_notation.num_to_str
class stats(object):
def __init__(self):
self.npkts = 0
self.nright = 0
class oqpsk_rx_graph (gr.top_block):
def __init__(self, options, rx_callback):
gr.top_block.__init__(self)
print "cordic_freq = %s" % (eng_notation.num_to_str (options.cordic_freq))
# ----------------------------------------------------------------
self.data_rate = options.data_rate
self.samples_per_symbol = 2
self._samp_rate = 200000
self.fs = self.data_rate * self.samples_per_symbol
payload_size = 128 # bytes
print "data_rate = ", eng_notation.num_to_str(self.data_rate)
print "samples_per_symbol = ", self.samples_per_symbol
self.u = uhd.usrp_source (device_addr=options.address,
io_type=uhd.io_type.COMPLEX_FLOAT32,
num_channels=1)
self.u.set_clock_config(uhd.clock_config.internal(), uhd.ALL_MBOARDS)
# Set the antenna
self.u.set_antenna(options.antenna, 0)
# Set sampling rate
self.u.set_samp_rate(self._samp_rate)
input_rate = self.u.get_samp_rate
# Set and then read center frequency
self.u.set_center_freq(options.cordic_freq)
frekva = self.u.get_center_freq()
print "Center frequency: %d " %(frekva)
self.packet_receiver = ieee802_15_4_pkt.ieee802_15_4_demod_pkts(self,
callback=rx_callback,
sps=self.samples_per_symbol,
symbol_rate=self.data_rate,
threshold=-1)
#self.squelch = gr.pwr_squelch_cc(50, 1, 0, True)
#self.connect(self.u, self.squelch, self.packet_receiver)
self.connect(self.u, self.packet_receiver)
def main ():
def rx_callback(ok, payload):
print "hello there"
st.npkts += 1
if ok:
st.nright += 1
(pktno,) = struct.unpack('!H', payload[0:2])
print "ok = %5r pktno = %4d len(payload) = %4d %d/%d" % (ok, pktno, len(payload),
st.nright, st.npkts)
print " payload: " + str(map(hex, map(ord, payload)))
print " ------------------------"
sys.stdout.flush()
#else:
# print " Bad packet. %d/%d"%(st.nright, st.npkts)
# pass
parser = OptionParser (option_class=eng_option)
parser.add_option("-a", "--address", type="string", default="addr=192.168.10.2",
help="Address of UHD device, [default=%default]")
parser.add_option("-A", "--antenna", type="string", default="TX/RX",
help="select Rx Antenna where appropriate")
parser.add_option ("-c", "--cordic-freq", type="eng_float", default=2475000000,
help="set rx cordic frequency to FREQ", metavar="FREQ")
parser.add_option ("-r", "--data_rate", type="eng_float", default=2000000)
parser.add_option ("-f", "--filename", type="string",
default="rx.dat", help="write data to FILENAME")
parser.add_option ("-g", "--gain", type="eng_float", default=20,
help="set Rx PGA gain in dB [0,20]")
(options, args) = parser.parse_args ()
st = stats()
tb = oqpsk_rx_graph(options, rx_callback)
tb.start()
time.sleep(10)
print " Statistics: good %d received %d"%(st.nright, st.npkts)
# tb.wait()
if __name__ == '__main__':
#insert this in your test code...
#import os
#print 'Blocked waiting for GDB attach (pid = %d)' % (os.getpid(),)
#raw_input ('Press Enter to continue: ')
main ()
_______________________________________________
Discuss-gnuradio mailing list
[email protected]
https://lists.gnu.org/mailman/listinfo/discuss-gnuradio