fengcharly commented on issue #430:
URL: 
https://github.com/apache/rocketmq-clients/issues/430#issuecomment-1489550691

   > > > 我目前用的客户端是: `org.apache.rocketmq:rocketmq-client-java:5.0.5`, 并且是通过 
proxy:8081 来访问的MQ
   > > 
   > > 
   > > 
我是通过手动创建Topic的方式解决的,但是在消费消息的时候有问题,不知道你是否有尝试过使用rocketmq-clients消费消息,如果能成功消费的话,希望与你交流
   > 
   > 我目前也是用手动创建的方式解决的,新client-java的消费目前没问题
   
   I used this code:
   
   `
   public class SimpleConsumerExample {
   private static final Logger log = 
LoggerFactory.getLogger(SimpleConsumerExample.class);
   
   private SimpleConsumerExample() {
   }
   
   @SuppressWarnings({"resource", "InfiniteLoopStatement"})
   public static void main(String[] args) throws ClientException {
       final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
   
       // Credential provider is optional for client configuration.
       String accessKey = "rocketmq2";
       String secretKey = "12345678";
       SessionCredentialsProvider sessionCredentialsProvider =
           new StaticSessionCredentialsProvider(accessKey, secretKey);
   
       String endpoints ="10.15.25.4:30402";
       ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
               .setRequestTimeout(Duration.ofSeconds(30))
           .setEndpoints(endpoints)
           .setCredentialProvider(sessionCredentialsProvider)
           .build();
       String consumerGroup = "yourConsumerGroup";
       Duration awaitDuration = Duration.ofSeconds(30);
       String tag = "yourMessageTagA";
       String topic = "testTopic";
       FilterExpression filterExpression = new FilterExpression(tag, 
FilterExpressionType.TAG);
       SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
           .setClientConfiguration(clientConfiguration)
           // Set the consumer group name.
           .setConsumerGroup(consumerGroup)
           // set await duration for long-polling.
           .setAwaitDuration(awaitDuration)
           // Set the subscription for the consumer.
           .setSubscriptionExpressions(Collections.singletonMap(topic, 
filterExpression))
           .build();
       // Max message num for each long polling.
       int maxMessageNum = 16;
       // Set message invisible duration after it is received.
       Duration invisibleDuration = Duration.ofSeconds(15);
       // Receive message, multi-threading is more recommended.
       do {
           final List<MessageView> messages = consumer.receive(maxMessageNum, 
invisibleDuration);
           log.info("Received {} message(s)", messages.size());
           for (MessageView message : messages) {
               final MessageId messageId = message.getMessageId();
               try {
                   consumer.ack(message);
                   log.info("Message is acknowledged successfully, 
messageId={}", messageId);
               } catch (Throwable t) {
                   log.error("Message is failed to be acknowledged, 
messageId={}", messageId, t);
               }
           }
       } while (true);
       // Close the simple consumer when you don't need it anymore.
       // consumer.close();
   }
   }
   
   `
   
   Then error is:
   io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 
29.999577300s. [remote_addr=/10.15.25.4:30402]
   at io.grpc.Status.asRuntimeException(Status.java:539)
   at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:544)
   at 
io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
   at 
io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
   at 
io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
   at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:563)
   at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
   at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:744)
   at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
   at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
   at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
   at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   at java.base/java.lang.Thread.run(Thread.java:834)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to