GitHub user ppluck closed a discussion: Rocketmq Rust clients: I have an error for "..." while consuming a message.
I need your help. thanks! --- - rmq cluster: 5.0 - the log: ```text INFO create session success, peer: ... DEBG start session success, peer: ... DEBG query route for topic=test-topic, component: ... DEBG receive telemetry command: TelemetryCommand { status: Some(Status { code: Ok, ... DEBG send heartbeat to server success, peer=... ERRO \client.rs:196:33] handle telemetry command failed: Failed to parse config at client.handle_telemetry_command => receive telemetry command but there is no handler Context: command: Settings(Settings { client_type: Some(SimpleConsumer), access_point: None, ... DEBG query route for topic=test-topic success: route=Route { index: 0, queue: [MessageQueue ... INFO update route for topic=test-topic, component: client INFO simple_consumer.rs:99:9] start simple consumer success, client_id: ... DEBG client.rs:201:25] receive shutdown signal, stop heartbeat task and telemetry command handler,... INFO client.rs:206:13] heartbeat task and telemetry command handler are stopped,... ``` - code: ```rust let mut consumer_option = SimpleConsumerOption::default(); consumer_option.set_topics(vec!["test-topic"]); consumer_option.set_consumer_group("CG-test"); // set which rocketmq proxy to connect let mut client_option = ClientOption::default(); // producer type client_option.set_access_url("localhost:8081"); client_option.set_access_key("CG-test"); // This is a company agreement client_option.set_secret_key("CT-xxxx"); // This is a company agreement // build and start simple consumer let mut consumer = SimpleConsumer::new(consumer_option, client_option).unwrap(); consumer.start().await.unwrap(); // pop message from rocketmq proxy let receive_result = consumer .receive( "test-topic".to_string(), &FilterExpression::new(FilterType::Tag, "*"), ) .await; let messages = receive_result.unwrap(); for message in messages { println!("receive message: {:?}", message); // ack message to rocketmq proxy let ack_result = consumer.ack(&message).await; } ``` --- supplement: I found this error while I was debugging. client.rs 285, here method only mathed RecoverOrphanedTransactionCommand, actually has others: ```rust Settings(super::Settings), ThreadStackTrace(super::ThreadStackTrace), VerifyMessageResult(super::VerifyMessageResult), RecoverOrphanedTransactionCommand(super::RecoverOrphanedTransactionCommand), PrintThreadStackTraceCommand(super::PrintThreadStackTraceCommand), VerifyMessageCommand(super::VerifyMessageCommand), ``` ```rust async fn handle_telemetry_command<T: RPCClient + 'static>( mut rpc_client: T, transaction_checker: &Option<Box<TransactionChecker>>, endpoints: Endpoints, command: pb::telemetry_command::Command, ) -> Result<(), ClientError> { return match command { RecoverOrphanedTransactionCommand(command) => { let transaction_id = command.transaction_id; let message = command.message.unwrap(); let message_id = message .system_properties .as_ref() .unwrap() .message_id .clone(); let topic = message.topic.as_ref().unwrap().clone(); if let Some(transaction_checker) = transaction_checker { let resolution = transaction_checker( transaction_id.clone(), MessageView::from_pb_message(message, endpoints), ); let response = rpc_client .end_transaction(EndTransactionRequest { topic: Some(topic), message_id: message_id.to_string(), transaction_id, resolution: resolution as i32, source: TransactionSource::SourceServerCheck as i32, trace_context: "".to_string(), }) .await?; Self::handle_response_status(response.status, OPERATION_END_TRANSACTION) } else { Err(ClientError::new( ErrorKind::Config, "failed to get transaction checker", OPERATION_END_TRANSACTION, )) } } _ => Err(ClientError::new( ErrorKind::Config, "receive telemetry command but there is no handler", OPERATION_HANDLE_TELEMETRY_COMMAND, ) .with_context("command", format!("{:?}", command))), }; } ``` GitHub link: https://github.com/apache/rocketmq-clients/discussions/737 ---- This is an automatically sent email for dev@rocketmq.apache.org. To unsubscribe, please send an email to: dev-unsubscr...@rocketmq.apache.org