kamat66 opened a new issue, #35082: URL: https://github.com/apache/beam/issues/35082
### What happened? I'm using RabbitMQIO to read messages using Direct Runner locally. I run into below exception. Channel closes itself after below condition is met. class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { // Occasionally close an existing reader and resume from checkpoint, to exercise close-and-resume private static final double DEFAULT_READER_REUSE_CHANCE = 0.95; if (ThreadLocalRandom.current().nextDouble(1.0) >= readerReuseChance) { UnboundedReader<OutputT> toClose = reader; // Prevent double-close. UnboundedReader is AutoCloseable, which does not require // idempotency of close. Nulling out the reader here prevents trying to re-close it // if the call to close throws an IOException. reader = null; toClose.close(); } Below exception is caused as pipeline to fail and it does not resume. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0) at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:281) ~[amqp-client-5.22.0.jar:5.22.0] at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:365) ~[amqp-client-5.22.0.jar:5.22.0] at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:305) ~[amqp-client-5.22.0.jar:5.22.0] at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:152) ~[amqp-client-5.22.0.jar:5.22.0] at com.rabbitmq.client.impl.ChannelN.txCommit(ChannelN.java:1564) ~[amqp-client-5.22.0.jar:5.22.0] at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.txCommit(AutorecoveringChannel.java:671) ~[amqp-client-5.22.0.jar:5.22.0] at org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$RabbitMQCheckpointMark.finalizeCheckpoint(RabbitMqIO.java:445) ~[beam-sdks-java-io-rabbitmq-2.65.0.jar:na] at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.finishRead(UnboundedReadEvaluatorFactory.java:260) ~[beam-runners-direct-java-2.65.0.jar:na] at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:153) ~[beam-runners-direct-java-2.65.0.jar:na] at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165) ~[beam-runners-direct-java-2.65.0.jar:na] at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129) ~[beam-runners-direct-java-2.65.0.jar:na] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[na:na] at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317) ~[na:na] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na] at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na] ### Issue Priority Priority: 1 (data loss / total loss of function) ### Issue Components - [ ] Component: Python SDK - [x] Component: Java SDK - [ ] Component: Go SDK - [ ] Component: Typescript SDK - [ ] Component: IO connector - [ ] Component: Beam YAML - [ ] Component: Beam examples - [ ] Component: Beam playground - [ ] Component: Beam katas - [ ] Component: Website - [ ] Component: Infrastructure - [ ] Component: Spark Runner - [ ] Component: Flink Runner - [ ] Component: Samza Runner - [ ] Component: Twister2 Runner - [ ] Component: Hazelcast Jet Runner - [ ] Component: Google Cloud Dataflow Runner -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org