kamat66 opened a new issue, #35082:
URL: https://github.com/apache/beam/issues/35082

   ### What happened?
   
   I'm using RabbitMQIO to read messages using Direct Runner locally. I run 
into below exception. 
   
   Channel closes itself after below condition is met.
   
   class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
     // Occasionally close an existing reader and resume from checkpoint, to 
exercise close-and-resume
     private static final double DEFAULT_READER_REUSE_CHANCE = 0.95;
   
   if (ThreadLocalRandom.current().nextDouble(1.0) >= readerReuseChance) {
               UnboundedReader<OutputT> toClose = reader;
               // Prevent double-close. UnboundedReader is AutoCloseable, which 
does not require
               // idempotency of close. Nulling out the reader here prevents 
trying to re-close it
               // if the call to close throws an IOException.
               reader = null;
               toClose.close();
             }
   
   Below exception is caused as pipeline to fail and it does not resume.
   
   com.rabbitmq.client.AlreadyClosedException: channel is already closed due to 
clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, 
reply-text=OK, class-id=0, method-id=0)
        at 
com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:281) 
~[amqp-client-5.22.0.jar:5.22.0]
        at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:365) 
~[amqp-client-5.22.0.jar:5.22.0]
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:305) 
~[amqp-client-5.22.0.jar:5.22.0]
        at 
com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:152) 
~[amqp-client-5.22.0.jar:5.22.0]
        at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1564) 
~[amqp-client-5.22.0.jar:5.22.0]
        at 
com.rabbitmq.client.impl.recovery.AutorecoveringChannel.txCommit(AutorecoveringChannel.java:671)
 ~[amqp-client-5.22.0.jar:5.22.0]
        at 
org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$RabbitMQCheckpointMark.finalizeCheckpoint(RabbitMqIO.java:445)
 ~[beam-sdks-java-io-rabbitmq-2.65.0.jar:na]
        at 
org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.finishRead(UnboundedReadEvaluatorFactory.java:260)
 ~[beam-runners-direct-java-2.65.0.jar:na]
        at 
org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:153)
 ~[beam-runners-direct-java-2.65.0.jar:na]
        at 
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
 ~[beam-runners-direct-java-2.65.0.jar:na]
        at 
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
 ~[beam-runners-direct-java-2.65.0.jar:na]
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
 ~[na:na]
        at 
java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317) 
~[na:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) 
~[na:na]
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
 ~[na:na]
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
 ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
   
   ### Issue Priority
   
   Priority: 1 (data loss / total loss of function)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [x] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to