Hi Pedro,
"detecing drones" feels like an overall complicated problem, and we can't know what you
mean by "nothing is working", I'm afraid!
I see you have multiple out-of-tree blocks (some or all of which you have written
yourself?) in your flow graph – cool!
So, go ahead and make sure they work individually. Especially, build something that allows
you to test your "Energy Detector" block without an SDR, in pure software!
I haven't done an intense reading of your Python code, but there's definitely a syntax
error in there – so you really need to make sure all things work in isolation before
plugging them together.
Best,
Marcus
On 10/16/25 4:13 PM, Pedro Tapia wrote:
hi everyone, i am trying to use gnuradio for drone detection but nothing is working, if
anyone can help import numpy as np
from gnuradio import gr
import pmt
from threading import Timer, Lock
class freq_scanner(gr.sync_block):
def __init__(self, start_freq=2.4e9, stop_freq=2.5e9, step_freq=5e6,
dwell_time=0.5):
gr.sync_block.__init__(self,
name="Frequency Scanner",
in_sig=None,
out_sig=None) # Não precisamos mais da saída de sinal
self.message_port_register_out(pmt.intern("cmd"))
self._start_freq = float(start_freq)
self._stop_freq = float(stop_freq)
self._step_freq = float(step_freq)
self._dwell_time = float(dwell_time)
self.current_freq = self._start_freq
# --- A "Caixa de Correio" agora guarda apenas um número (float) ---
self.freq_to_send = None
self.lock = Lock()
# Iniciamos o timer
self.timer = Timer(self._dwell_time, self.prepare_next_freq)
self.timer.start()
def prepare_next_freq(self):
"""Esta função é chamada pelo timer e APENAS CALCULA a frequência."""
next_freq = self.current_freq + self._step_freq
if next_freq > self._stop_freq:
next_freq = self._start_freq
self.current_freq = next_freq
print(f"Timer preparou! Próxima Freq: {self.current_freq / 1e6:.2f}
MHz", flush=True)
# Coloca APENAS o número float na caixa de correio
with self.lock:
self.freq_to_send = self.current_freq
# Reinicia o timer
self.timer = Timer(self._dwell_time, self.prepare_next_freq)
self.timer.start()
def stop(self):
self.timer.cancel()
return True
def work(self, input_items, output_items):
"""A função work agora faz TODO o trabalho de PMT e publicação."""
local_freq_to_send = None
# Pega o número da caixa de correio
with self.lock:
if self.freq_to_send is not None:
local_freq_to_send = self.freq_to_send
self.freq_to_send = None # Esvazia a caixa
# Se havia um número para enviar...
if local_freq_to_send is not None:
# ...Cria o dicionário PMT AQUI, no thread principal...
msg_dict = pmt.make_dict()
msg_dict = pmt.dict_add(msg_dict, pmt.intern("freq"),
pmt.from_double(local_freq_to_send))
# ...e publica AQUI, no thread principal.
self.message_port_pub(pmt.intern("cmd"), msg_dict)
return 0 import numpy as np
from gnuradio import gr
import pmt
import scipy.special as scs
from threading import Lock
class Energy_Detector(gr.basic_block):
def __init__(self, samples=1024, Pfa=0.01):
self.samples = int(samples)
self.Pfa = float(Pfa)
gr.basic_block.__init__(self,
name="Energy_Detector",
in_sig=[(np.float32, self.samples), (np.float32, self.samples)],
out_sig=None)
self.message_port_register_out(pmt.intern("detection_in"))
self.message_port_register_in(pmt.intern("freq_in"))
# CORREÇÃO 1: Nome da função corrigido
self.set_msg_handler(pmt.intern("freq_in"), self.handle_freq_msg)
self.current_center_freq = 0.0 # Inicializa a frequência
self.lock = Lock()
def handle_freq_msg(self, msg):
if pmt.is_dict(msg):
freq_val = pmt.dict_ref(msg, pmt.intern("freq"), pmt.PMT_NIL)
if not pmt.is_nil(freq_val):
with self.lock:
self.current_center_freq = pmt.to_double(freq_val)
def general_work(self, input_items, output_items):
signal_energy_vector = input_items[0][0]
noise_vector = input_items[1][0]
signalAvg = np.mean(signal_energy_vector)
NoisePower = noise_vector ** 2
NoiseAvg = np.mean(NoisePower)
var = np.var(NoisePower)
stdev = np.sqrt(var)
Qinv = np.sqrt(2) * scs.erfinv(1 - 2 * self.Pfa)
Threshold = NoiseAvg + Qinv * stdev
# CORREÇÃO 3: Lógica de detecção corrigida
if signalAvg > Threshold:
local_center_freq = 0.0
with self.lock:
local_center_freq = self.current_center_freq
# Só envia a mensagem se a frequência for válida (maior que zero)
if local_center_freq > 0:
msg_tuple = pmt.make_tuple(
pmt.from_float(float(signalAvg)),
pmt.from_float(1.0),
pmt.from_double(local_center_freq)
)
self.message_port_pub(pmt.intern("detection_in"), msg_tuple)
# CORREÇÃO 2: Consumo explícito das portas
self.consume(0, 1)
self.consume(1, 1)
return 0 freq_hz = pmt.to_double(pmt.tuple_ref(msg, 2))
freq_mhz = freq_hz / 1e6
# Imprime o alerta formatado no terminal
print("="*40)
print("!!! ALERTA: SINAL DE DRONE DETECTADO !!!")
print(f" Frequência: {freq_mhz:.2f} MHz")
print(f" Energia do Sinal: {energy:.4f}")
print("="*40, flush=True) # flush=True para garantir a
impressão imediata
except Exception as e:
print(f"Alert Formatter - Erro ao processar mensagem: {e}")
print(f"Mensagem recebida: {msg}")
def work(self, input_items, output_items):
# Este bloco não processa sinais de streaming, apenas mensagens.
# Portanto, a função work não faz nada.
return 0