On Wed, 2006-07-05 at 09:39 -0700, Eric Blossom wrote:
> On Wed, Jul 05, 2006 at 11:25:55AM -0400, Chuck Swiger wrote:
> >
> > Because: One of my simple test apps works 600% faster using 4k buffer
> > instead of 32k, since it doesn't have to wait for stale data to clear.
>
> Chuck, can you point me at the code for the application?
> I wouldn't expect the size of buffer to matter as long as each block
> returned all the data asked for on each call to work.
>
It's not an issue in continuous flow - the testing app had a problem
after changing DUC/DDC and restarting everything, it took about 750mSec
between samples to get a clean transition. I just tried it with a
buffer of 4k and was able to use 100mSec between samples.
Just curious if there could be easy way to specify the buffer size in
flow_graph.py - the comments seem to suggest there is a use for
'automatic optimization' but just another knob to tweak would be ok
with me. Other applications for minimal working buffer would be
shortning the delay between speaking into a mic and the transmitted
audio, or adjusting an antenna and seeing/hearing the results.
Script attached - the 'problem' shows up in the long 'else' clause
of the OnUpdate() function. Too small timer delay and the next few
data points are repeats of the one right before changing cordic freq.
--Chuck
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# generated by wxGlade 0.3.5.1 on Wed Aug 17 11:24:48 2005
#
# Copyright 2005 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# GNU Radio is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2, or (at your option)
# any later version.
#
# GNU Radio is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with GNU Radio; see the file COPYING. If not, write to
# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
# Boston, MA 02111-1307, USA.
#
import wx
from gnuradio import gr, gru, usrp
import usrp_siggen
from gnuradio.eng_option import eng_option
from optparse import OptionParser
from gnuradio import eng_notation
import math, time
ID_ABOUT = wx.NewId()
ID_EXIT = wx.NewId()
ID_AMPLITUDE = wx.NewId()
ID_REPORT = wx.NewId()
ID_START = wx.NewId()
ID_STOP = wx.NewId()
ID_STARTF = wx.NewId()
ID_STEPF = wx.NewId()
ID_STOPF = wx.NewId()
ID_STEPT = wx.NewId()
class MyFrame(wx.Frame):
def __init__(self, *args, **kwds):
# begin wxGlade: MyFrame.__init__
kwds["style"] = wx.DEFAULT_FRAME_STYLE
wx.Frame.__init__(self, *args, **kwds)
# Menu Bar
self.frame_1_menubar = wx.MenuBar()
self.SetMenuBar(self.frame_1_menubar)
wxglade_tmp_menu = wx.Menu()
self.About = wx.MenuItem(wxglade_tmp_menu, ID_ABOUT, "About", "About", wx.ITEM_NORMAL)
wxglade_tmp_menu.AppendItem(self.About)
self.Exit = wx.MenuItem(wxglade_tmp_menu, ID_EXIT, "Exit", "Exit", wx.ITEM_NORMAL)
wxglade_tmp_menu.AppendItem(self.Exit)
self.frame_1_menubar.Append(wxglade_tmp_menu, "File")
# Menu Bar end
self.label_5 = wx.StaticText(self, -1, " Start Freq: ")
self.text_ctrl_4 = wx.TextCtrl(self, ID_STARTF, "", style=wx.TE_PROCESS_ENTER|wx.TE_PROCESS_TAB)
self.label_8 = wx.StaticText(self, -1, " Step Freq:")
self.text_ctrl_7 = wx.TextCtrl(self, ID_STEPF, "", style=wx.TE_PROCESS_ENTER|wx.TE_PROCESS_TAB)
self.label_6 = wx.StaticText(self, -1, " Stop Freq: ")
self.text_ctrl_5 = wx.TextCtrl(self, ID_STOPF, "", style=wx.TE_PROCESS_ENTER|wx.TE_PROCESS_TAB)
self.label_9 = wx.StaticText(self, -1, " Step Time (ms): ")
self.text_ctrl_8 = wx.TextCtrl(self, ID_STEPT, "", style=wx.TE_PROCESS_ENTER|wx.TE_PROCESS_TAB)
self.label_7 = wx.StaticText(self, -1, " Amplitude: ")
self.text_ctrl_6 = wx.TextCtrl(self, ID_AMPLITUDE, "", style=wx.TE_PROCESS_ENTER|wx.TE_PROCESS_TAB)
self.label_11 = wx.StaticText(self, -1, " DDC/DUC:")
self.text_ctrl_10 = wx.TextCtrl(self, -1, "", style=wx.TE_READONLY)
self.label_4 = wx.StaticText(self, -1, " Report file: ")
self.text_ctrl_3 = wx.TextCtrl(self, ID_REPORT, "", style=wx.TE_PROCESS_ENTER|wx.TE_PROCESS_TAB)
self.button_1 = wx.Button(self, ID_START, "Start")
self.button_2 = wx.Button(self, ID_STOP, "Stop")
self.label_1 = wx.StaticText(self, -1, " Status: ")
self.label_2 = wx.StaticText(self, -1, "\nFrequency Out:\n")
self.text_ctrl_1 = wx.TextCtrl(self, -1, "", style=wx.TE_READONLY)
self.label_3 = wx.StaticText(self, -1, "\nAmplitude In:\n")
self.text_ctrl_2 = wx.TextCtrl(self, -1, "", style=wx.TE_READONLY)
self.label_10 = wx.StaticText(self, -1, "\nPhase in:\n")
self.text_ctrl_9 = wx.TextCtrl(self, -1, "")
self.__set_properties()
self.__do_layout()
# end wxGlade
wx.EVT_MENU(self, ID_EXIT, self.MenuExit)
wx.EVT_MENU(self, ID_ABOUT, self.MenuAbout)
wx.EVT_TEXT_ENTER(self, ID_AMPLITUDE, self.SetAmplitude)
wx.EVT_TEXT_ENTER(self, ID_REPORT, self.OpenReportFile)
wx.EVT_BUTTON(self, ID_START, self.Start)
wx.EVT_BUTTON(self, ID_STOP, self.Stop)
wx.EVT_TEXT_ENTER(self, ID_STARTF, self.SetStartFreq)
wx.EVT_TEXT_ENTER(self, ID_STOPF, self.SetStopFreq)
wx.EVT_TEXT_ENTER(self, ID_STEPF, self.SetStepFreq)
wx.EVT_TEXT_ENTER(self, ID_STEPT, self.SetStepTime)
parser = OptionParser (option_class=eng_option)
parser.add_option ("-l", "--f_low", type="eng_float", default=1e6,
help="set low scan frequency")
parser.add_option ("-i", "--f_high", type="eng_float", default=30e6,
help="set high scan frequency")
parser.add_option ("-a", "--amplitude", type="eng_float", default=16e3,
help="set waveform amplitude to AMPLITUDE", metavar="AMPL")
(options, args) = parser.parse_args ()
self.run = False
self.rpt_open = False
#setup usrp
decim = 64
self.dut_out = usrp.source_c(0, decim, 2, gru.hexint(0xf0f0f1f0), 0)
# self.sg = usrp_siggen.siggen ()
#call our function to set up the values
self.amplitude = options.amplitude
self.reinit(options.f_low, options.f_high, 50e3, 750)
#now update controls
self.text_ctrl_6.SetValue(eng_notation.num_to_str(self.amplitude))
self.text_ctrl_4.SetValue(eng_notation.num_to_str(self.f_low))
self.text_ctrl_5.SetValue(eng_notation.num_to_str(self.f_high))
self.text_ctrl_7.SetValue(eng_notation.num_to_str(self.step_freq))
self.text_ctrl_8.SetValue(eng_notation.num_to_str(self.step_time))
self.text_ctrl_10.SetValue(eng_notation.num_to_str(self.f_ducddc))
def reinit(self, opt_flow, opt_fhigh, opt_stepf, opt_stept):
print("TEST\n");
self.f_low = opt_flow
self.f_high = opt_fhigh
self.step_freq = opt_stepf
self.step_time = opt_stept
self.timer = UpdateTimer(self,self.step_time)
self.f_ducddc = self.f_low + 275e3
self.text_ctrl_10.SetValue(eng_notation.num_to_str(self.f_ducddc))
self.fg = gr.flow_graph()
self.sample_freq = 1e6
# setup usrp
self.dut_out.set_rx_freq(0,-self.f_ducddc)
self.dut_out.set_rx_freq(1,-self.f_ducddc)
rx_error = -self.f_ducddc - self.dut_out.rx_freq(0)
self.dut_out.set_pga(0,20) # max gain to compensate for -19 coupler
self.dut_out.set_pga(1,0)
# signal generator
self.sg = usrp_siggen.siggen ()
self.interp = 128
self.type = gr.GR_SIN_WAVE
self.offset = 0
self.nchannels = 1
self.mux = 0x0098
self.sg.fg.stop ();
self.sg.set_waveform_ampl (self.amplitude)
self.sg.set_waveform_freq (-275e3)
self.sg.set_interpolator (self.interp)
self.sg.set_waveform_type (self.type)
self.sg.set_waveform_offset (self.offset)
self.sg.set_duc_freq (self.f_ducddc)
self.sg.usrp.set_nchannels (self.nchannels)
self.sg.usrp.set_mux (self.mux)
self.sg.fg.start ();
head = gr.stream_to_streams(gr.sizeof_gr_complex,2)
mul1 = gr.multiply_ff()
c2f1 = gr.complex_to_float ()
self.meter = gr.probe_signal_f()
lp_taps = gr.firdes.low_pass ( .001, self.sample_freq, 5e3, 10e3, gr.firdes.WIN_HAMMING )
self.lp = gr.fir_filter_fff(1,lp_taps)
# agc
self.i_meter = gr.probe_avg_mag_sqrd_c(1,.0001)
self.o_meter = gr.probe_avg_mag_sqrd_c(1,.0001)
self.pga1 = gr.multiply_const_cc(complex(1,0))
self.pga2 = gr.multiply_const_cc(complex(1,0))
c2f2 = gr.complex_to_float ()
c2f4 = gr.complex_to_float ()
self.pshift = gr.multiply_const_cc(complex(1,0))
#
# (gain control in python script)
# (o_meter)------+
# 0,J19 | V
# +------------------+-------(pga1)--(c2f4)--+
# usrp | |
# (dut_out)--(head) (mul1)--(lp)-->
# | |
# +---(pshift)-------+-------(pga2)--(c2f1)--+
# 1,J18 | ^
# (i_meter)------|
# (gain control in python script)
#
# >---(meter)
#
#
#
self.fg.connect ( self.dut_out, head )
self.fg.connect ( (head, 1), self.pshift, self.pga2, c2f1, (mul1, 0) )
self.fg.connect ( self.pshift, self.i_meter)
self.fg.connect ( (head, 0), self.pga1, c2f4, (mul1, 1) )
self.fg.connect ( (head, 0), self.o_meter )
self.fg.connect ( mul1, self.lp, self.meter )
self.fg.start()
def __set_properties(self):
# begin wxGlade: MyFrame.__set_properties
self.SetTitle("Automatic Testing - Sweep")
self.text_ctrl_3.SetSize((200, 25))
# end wxGlade
def __do_layout(self):
# begin wxGlade: MyFrame.__do_layout
sizer_1 = wx.BoxSizer(wx.VERTICAL)
sizer_2 = wx.BoxSizer(wx.VERTICAL)
sizer_3 = wx.BoxSizer(wx.HORIZONTAL)
sizer_5 = wx.BoxSizer(wx.VERTICAL)
sizer_4 = wx.BoxSizer(wx.VERTICAL)
sizer_6 = wx.BoxSizer(wx.HORIZONTAL)
sizer_7 = wx.BoxSizer(wx.VERTICAL)
sizer_8 = wx.BoxSizer(wx.HORIZONTAL)
grid_sizer_1 = wx.FlexGridSizer(3, 4, 0, 0)
grid_sizer_1.Add(self.label_5, 0, wx.FIXED_MINSIZE, 0)
grid_sizer_1.Add(self.text_ctrl_4, 0, wx.FIXED_MINSIZE, 0)
grid_sizer_1.Add(self.label_8, 0, wx.FIXED_MINSIZE, 0)
grid_sizer_1.Add(self.text_ctrl_7, 0, wx.FIXED_MINSIZE, 0)
grid_sizer_1.Add(self.label_6, 0, wx.FIXED_MINSIZE, 0)
grid_sizer_1.Add(self.text_ctrl_5, 0, wx.FIXED_MINSIZE, 0)
grid_sizer_1.Add(self.label_9, 0, wx.FIXED_MINSIZE, 0)
grid_sizer_1.Add(self.text_ctrl_8, 0, wx.FIXED_MINSIZE, 0)
grid_sizer_1.Add(self.label_7, 0, wx.FIXED_MINSIZE, 0)
grid_sizer_1.Add(self.text_ctrl_6, 0, wx.FIXED_MINSIZE, 0)
grid_sizer_1.Add(self.label_11, 0, wx.FIXED_MINSIZE, 0)
grid_sizer_1.Add(self.text_ctrl_10, 0, wx.FIXED_MINSIZE, 0)
sizer_7.Add(grid_sizer_1, 1, wx.EXPAND, 0)
sizer_8.Add(self.label_4, 0, wx.ALIGN_CENTER_VERTICAL|wx.FIXED_MINSIZE, 0)
sizer_8.Add(self.text_ctrl_3, 0, wx.ALIGN_CENTER_VERTICAL|wx.FIXED_MINSIZE, 0)
sizer_7.Add(sizer_8, 1, wx.EXPAND, 0)
sizer_2.Add(sizer_7, 1, wx.EXPAND, 0)
sizer_6.Add(self.button_1, 0, wx.ALIGN_CENTER_VERTICAL|wx.FIXED_MINSIZE, 0)
sizer_6.Add(self.button_2, 0, wx.ALIGN_CENTER_VERTICAL|wx.FIXED_MINSIZE, 0)
sizer_2.Add(sizer_6, 1, wx.EXPAND, 0)
sizer_3.Add(self.label_1, 0, wx.ALIGN_CENTER_VERTICAL|wx.FIXED_MINSIZE, 0)
sizer_4.Add(self.label_2, 0, wx.ALIGN_CENTER_HORIZONTAL|wx.FIXED_MINSIZE, 0)
sizer_4.Add(self.text_ctrl_1, 0, wx.ALIGN_CENTER_HORIZONTAL|wx.FIXED_MINSIZE, 0)
sizer_3.Add(sizer_4, 1, wx.EXPAND, 0)
sizer_5.Add(self.label_3, 0, wx.ALIGN_CENTER_HORIZONTAL|wx.FIXED_MINSIZE, 0)
sizer_5.Add(self.text_ctrl_2, 0, wx.ALIGN_CENTER_HORIZONTAL|wx.FIXED_MINSIZE, 0)
sizer_5.Add(self.label_10, 0, wx.ALIGN_CENTER_HORIZONTAL|wx.ALIGN_CENTER_VERTICAL|wx.FIXED_MINSIZE, 0)
sizer_5.Add(self.text_ctrl_9, 0, wx.ALIGN_CENTER_HORIZONTAL|wx.ALIGN_CENTER_VERTICAL|wx.FIXED_MINSIZE, 0)
sizer_3.Add(sizer_5, 1, wx.EXPAND, 0)
sizer_2.Add(sizer_3, 1, wx.EXPAND, 0)
sizer_1.Add(sizer_2, 1, wx.EXPAND, 0)
self.SetAutoLayout(True)
self.SetSizer(sizer_1)
sizer_1.Fit(self)
sizer_1.SetSizeHints(self)
self.Layout()
# end wxGlade
def MenuExit(self, event):
if self.run:
self.fg.stop()
if self.rpt_open:
self.rpt.close()
self.Close()
def MenuAbout(self, event):
dlg = wx.MessageDialog(self, "Automatic Testing\n"
"with GnuRadio / USRP\n",
"Automatic Testing", wx.OK | wx.ICON_INFORMATION)
dlg.ShowModal()
dlg.Destroy()
def SetAmplitude(self,event):
self.amplitude = eng_notation.str_to_num(self.text_ctrl_6.GetValue())
def OpenReportFile(self,event):
self.rpt = open(self.text_ctrl_3.GetValue(),"w")
self.rpt_open = True
def Start(self,event):
self.f_ducddc = self.f_low + 275e3
self.f_losc = -275e3
self.run = True
def Stop(self,event):
self.run = False
def SetStepFreq(self, event):
freq = eng_notation.str_to_num(self.text_ctrl_7.GetValue())
self.sg.fg.stop()
del (self.sg)
self.reinit(self.f_low, self.f_high, freq, self.step_time)
def SetStartFreq(self, event):
freq = eng_notation.str_to_num(self.text_ctrl_4.GetValue())
self.sg.fg.stop()
del (self.sg)
self.reinit(freq, self.f_high, self.step_freq, self.step_time)
def SetStopFreq(self, event):
freq = eng_notation.str_to_num(self.text_ctrl_5.GetValue())
self.sg.fg.stop()
del (self.sg)
self.reinit(self.f_low, freq, self.step_freq, self.step_time)
def SetStepTime(self, event):
time = eng_notation.str_to_num(self.text_ctrl_8.GetValue())
self.sg.fg.stop()
del (self.sg)
self.reinit(self.f_low, self.f_high, self.step_freq, time)
def OnUpdate(self):
if self.run:
if self.f_ducddc + self.f_losc < self.f_high :
if self.f_losc < 275e3 :
in_level = self.i_meter.level()
out_level = self.o_meter.level()
if out_level > .000001 :
gain = 141/math.sqrt(out_level)
self.pga1.set_k(complex(gain,0))
if in_level > .000001 :
gain = 141/math.sqrt(in_level)
self.pga2.set_k(complex(gain,0))
io_ratio = 10 * math.sqrt((out_level / in_level)) # scaled to match phase plot
self.text_ctrl_2.SetValue(str(io_ratio))
time.sleep(.01)
phase = self.meter.level()
self.text_ctrl_9.SetValue(str(phase))
self.pshift.set_k(complex(0,1))
time.sleep(.01)
sphase = self.meter.level()
self.pshift.set_k(complex(1,0))
freq = self.f_losc + self.f_ducddc
if self.rpt_open == True:
self.rpt.write(str(freq)+" "+str(io_ratio)+" "+str(phase)+" "+str(sphase)+"\n")
self.f_losc += self.step_freq
self.sg.set_waveform_freq(self.f_losc)
freq = self.f_losc + self.f_ducddc
self.text_ctrl_1.SetValue(eng_notation.num_to_str(freq))
else:
self.f_ducddc += 550e3
self.text_ctrl_10.SetValue(eng_notation.num_to_str(self.f_ducddc))
self.timer.Stop()
self.fg.stop()
self.sg.fg.stop()
self.dut_out.stop()
self.dut_out.set_rx_freq(0,-self.f_ducddc)
self.dut_out.set_rx_freq(1,-self.f_ducddc)
self.dut_out.start()
del (self.sg)
self.sg = usrp_siggen.siggen ()
self.sg.set_waveform_ampl (self.amplitude)
self.sg.set_waveform_freq (-275e3)
self.sg.set_interpolator (self.interp)
self.sg.set_waveform_type (self.type)
self.sg.set_waveform_offset (self.offset)
self.sg.set_duc_freq (self.f_ducddc)
self.sg.usrp.set_nchannels (self.nchannels)
self.sg.usrp.set_mux (self.mux)
self.sg.fg.start()
self.fg.start()
time.sleep(.5)
self.timer.Start()
self.f_losc = -275e3
else:
self.run = False
# end of class MyFrame
class UpdateTimer(wx.Timer):
def __init__(self, target, dur=1000):
wx.Timer.__init__(self)
self.target = target
self.Start(dur)
def Notify(self):
"""Called every timer interval"""
if self.target:
self.target.OnUpdate()
class MyApp(wx.App):
def OnInit(self):
frame = MyFrame(None, -1, "Automatic Testing")
frame.Show(True)
self.SetTopWindow(frame)
return True
app = MyApp(0)
app.MainLoop()
_______________________________________________
Discuss-gnuradio mailing list
[email protected]
http://lists.gnu.org/mailman/listinfo/discuss-gnuradio