Abacn commented on code in PR #35299:
URL: https://github.com/apache/beam/pull/35299#discussion_r2154938488
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -1311,30 +1303,35 @@ public void startBundle() throws JMSException {
@ProcessElement
public void processElement(@Element T input, ProcessContext context) {
- try {
- publishMessage(input);
- } catch (JMSException | JmsIOException | IOException |
InterruptedException exception) {
- LOG.error("Error while publishing the message", exception);
- context.output(this.failedMessagesTags, input);
- if (exception instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- private void publishMessage(T input)
- throws JMSException, JmsIOException, IOException,
InterruptedException {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = checkStateNotNull(retryBackOff).backoff();
while (true) {
try {
this.jmsConnection.publishMessage(input);
break;
} catch (JMSException | JmsIOException exception) {
- if (!BackOffUtils.next(sleeper, backoff)) {
- throw exception;
- } else {
- publicationRetries.inc();
+ LOG.warn("Error while publishing the message, retrying",
exception);
Review Comment:
log spam possible. Can we remove this log or at least log once per while
block?
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -1179,8 +1174,10 @@ public WriteJmsResult<T> expand(PCollection<T> input) {
PUBLISH_TO_JMS_STEP_NAME,
ParDo.of(new JmsIOProducerFn<>(spec, failedMessagesTag))
.withOutputTags(messagesTag,
TupleTagList.of(failedMessagesTag)));
- PCollection<T> failedPublishedMessages =
-
failedPublishedMessagesTuple.get(failedMessagesTag).setCoder(input.getCoder());
+ PCollection<JmsError<T>> failedPublishedMessages =
+ failedPublishedMessagesTuple
Review Comment:
Technically it's a breaking change, and we try to avoid maintaining two
implementations.
This JmsIO DLQ predates Beam's standard DLQ library which came with
Exception
So I would prefer not branching the code, either
- keep the old behavior or do the change, or
- Update the code to fit with Beam's DLQ pattern, to use a BadRecord object
here:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/BadRecord.java
. Otherwise we may need to do another breaking change letter if migrating
JmsIO to Beam's errorhandlng module
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]