GitHub user ppluck closed a discussion: Rocketmq Rust clients: I have an error 
for "..." while consuming a message.

I need your help. thanks!

---

- rmq cluster: 5.0

- the log: 

```text
INFO create session success, peer: ...
DEBG start session success, peer: ...
DEBG query route for topic=test-topic, component:  ...
DEBG receive telemetry command: TelemetryCommand { status: Some(Status { code: 
Ok, ...
DEBG send heartbeat to server success, peer=...
ERRO \client.rs:196:33] handle telemetry command failed: Failed to parse config 
at client.handle_telemetry_command =>
 receive telemetry command but there is no handler

Context:
    command: Settings(Settings { client_type: Some(SimpleConsumer), 
access_point: None,  ...

DEBG query route for topic=test-topic success: route=Route { index: 0, queue: 
[MessageQueue ...
INFO  update route for topic=test-topic, component: client
INFO  simple_consumer.rs:99:9] start simple consumer success, client_id: ...
DEBG client.rs:201:25] receive shutdown signal, stop heartbeat task and 
telemetry command handler,...
INFO client.rs:206:13] heartbeat task and telemetry command handler are 
stopped,...
```

- code:

```rust
    let mut consumer_option = SimpleConsumerOption::default();
    consumer_option.set_topics(vec!["test-topic"]);
    consumer_option.set_consumer_group("CG-test");

    // set which rocketmq proxy to connect
    let mut client_option = ClientOption::default(); // producer type
    client_option.set_access_url("localhost:8081");
    client_option.set_access_key("CG-test");  // This is a company agreement
    client_option.set_secret_key("CT-xxxx"); // This is a company agreement

    // build and start simple consumer
    let mut  consumer = SimpleConsumer::new(consumer_option, 
client_option).unwrap();
    consumer.start().await.unwrap();


    // pop message from rocketmq proxy
    let receive_result = consumer
        .receive(
            "test-topic".to_string(),
            &FilterExpression::new(FilterType::Tag, "*"),
        )
        .await;

    let messages = receive_result.unwrap();
    for message in messages {
        println!("receive message: {:?}", message);
        // ack message to rocketmq proxy
        let ack_result = consumer.ack(&message).await;
    }

```

---

supplement:

I found this error while I was debugging.

client.rs  285, here method only mathed RecoverOrphanedTransactionCommand, 
actually has others: 
```rust
Settings(super::Settings),
ThreadStackTrace(super::ThreadStackTrace),
VerifyMessageResult(super::VerifyMessageResult),
RecoverOrphanedTransactionCommand(super::RecoverOrphanedTransactionCommand),
PrintThreadStackTraceCommand(super::PrintThreadStackTraceCommand),
VerifyMessageCommand(super::VerifyMessageCommand),
```

```rust
async fn handle_telemetry_command<T: RPCClient + 'static>(
        mut rpc_client: T,
        transaction_checker: &Option<Box<TransactionChecker>>,
        endpoints: Endpoints,
        command: pb::telemetry_command::Command,
    ) -> Result<(), ClientError> {
        return match command {
            RecoverOrphanedTransactionCommand(command) => {
                let transaction_id = command.transaction_id;
                let message = command.message.unwrap();
                let message_id = message
                    .system_properties
                    .as_ref()
                    .unwrap()
                    .message_id
                    .clone();
                let topic = message.topic.as_ref().unwrap().clone();
                if let Some(transaction_checker) = transaction_checker {
                    let resolution = transaction_checker(
                        transaction_id.clone(),
                        MessageView::from_pb_message(message, endpoints),
                    );

                    let response = rpc_client
                        .end_transaction(EndTransactionRequest {
                            topic: Some(topic),
                            message_id: message_id.to_string(),
                            transaction_id,
                            resolution: resolution as i32,
                            source: TransactionSource::SourceServerCheck as i32,
                            trace_context: "".to_string(),
                        })
                        .await?;
                    Self::handle_response_status(response.status, 
OPERATION_END_TRANSACTION)
                } else {
                    Err(ClientError::new(
                        ErrorKind::Config,
                        "failed to get transaction checker",
                        OPERATION_END_TRANSACTION,
                    ))
                }
            }
            _ => Err(ClientError::new(
                ErrorKind::Config,
                "receive telemetry command but there is no handler",
                OPERATION_HANDLE_TELEMETRY_COMMAND,
            )
            .with_context("command", format!("{:?}", command))),
        };
    }

```





GitHub link: https://github.com/apache/rocketmq-clients/discussions/737

----
This is an automatically sent email for dev@rocketmq.apache.org.
To unsubscribe, please send an email to: dev-unsubscr...@rocketmq.apache.org

Reply via email to