Amraneze commented on PR #24973:
URL: https://github.com/apache/beam/pull/24973#issuecomment-1386663454

   > 
   
   The issue is that JmsIO doesn't retry at all when publishing a message, any 
message that is failed to publish is pushed to ouput as you can see in [JmsIO 
code](https://github.com/apache/beam/blob/master/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java#L940):
   
   ````java
   @ProcessElement
   public void processElement(ProcessContext ctx) {
     Destination destinationToSendTo = destination;
     try {
       Message message = spec.getValueMapper().apply(ctx.element(), session);
       if (spec.getTopicNameMapper() != null) {
         destinationToSendTo =
             
session.createTopic(spec.getTopicNameMapper().apply(ctx.element()));
       }
       producer.send(destinationToSendTo, message);
     } catch (Exception ex) {
       LOG.error("Error sending message on topic {}", destinationToSendTo);
       ctx.output(failedMessageTag, ctx.element());
     }
   }
   ````
   If there are any exceptions, they will be catched and the message is pushed 
to failed message tag's output. Which means we will need to get the failed 
messages and retry by ourselves N times like this:
   
   ````java
   public PDone expand(PCollection<Message> messages) {
       // Retry 3 times when the session is closed
       messages.apply(getJmsWriter(sinkOptions))
             .getFailedMessages()
            .apply(REPUBLISH_FAILED_MESSAGE_NAME, getJmsWriter(sinkOptions))
              .getFailedMessages()
            .apply(REPUBLISH_FAILED_MESSAGE_NAME, getJmsWriter(sinkOptions))
              .getFailedMessages()
            .apply(REPUBLISH_FAILED_MESSAGE_NAME, getJmsWriter(sinkOptions));
   
       return PDone.in(messages.getPipeline());
     }
   
     private static JmsIO.Write<Message> getJmsWriter(SinkOptions sinkOptions) {
       return JmsIO.<Message>write()
           .withConnectionFactory(SinkOptions.getConnectionFactory())
           .withValueMapper(getValueMapper())
           .withTopicNameMapper(getTopicNameMapper());
     }
   ````
   When the session is closed and the first PTransform that is publishing the 
message is displaying error logs `Error sending message on topic`, it will not 
reconnect and just send all messages to the second step as you can see in the 
screenshot
   
   <img width="1280" alt="image" 
src="https://user-images.githubusercontent.com/28459763/213120382-941c1769-805d-4685-aff0-94a508b94f3d.png";>
   
   >Note: The graph of first PTransform is the same as the second PTransform 
after the session is closed, which means the first step doesn't publish at all 
and it just send all messages to the second step.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to