Abacn commented on code in PR #17565:
URL: https://github.com/apache/beam/pull/17565#discussion_r866312831
##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java:
##########
@@ -1615,6 +1646,44 @@ public void testExactlyOnceSink() {
}
}
+ @Test
+ public void testExactlyOnceSinkWithSendException() throws Throwable {
Review Comment:
Edit: run all tests in KafkaIOTest, this one fails; but if only run this
test method, it is succeeded. Likely due to racing condition.
Ran tests locally and this failed:
```
testExactlyOnceSinkWithSendException
java.lang.AssertionError:
Expected: (an instance of org.apache.kafka.common.KafkaException and
exception with message a string containing "fakeException")
but: an instance of org.apache.kafka.common.KafkaException
<java.lang.RuntimeException: java.lang.reflect.InvocationTargetException> is a
java.lang.RuntimeException
Stacktrace was: java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException
at
org.apache.beam.sdk.io.kafka.ProducerSpEL.invoke(ProducerSpEL.java:100)
at
org.apache.beam.sdk.io.kafka.ProducerSpEL.beginTransaction(ProducerSpEL.java:119)
at
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$ShardWriter.beginTxn(KafkaExactlyOnceSink.java:473)
at
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.processElement(KafkaExactlyOnceSink.java:322)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.beam.sdk.io.kafka.ProducerSpEL.invoke(ProducerSpEL.java:98)
at
org.apache.beam.sdk.io.kafka.ProducerSpEL.beginTransaction(ProducerSpEL.java:119)
at
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$ShardWriter.beginTxn(KafkaExactlyOnceSink.java:473)
at
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.processElement(KafkaExactlyOnceSink.java:322)
at
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
at
org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner.processElementUnordered(StatefulDoFnRunner.java:160)
at
org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner.processElement(StatefulDoFnRunner.java:153)
at
org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
at
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
at
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
at
org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory$StatefulParDoEvaluator.processElement(StatefulParDoEvaluatorFactory.java:175)
at
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
at
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.IllegalStateException: MockProducer is already closed.
at
org.apache.kafka.clients.producer.MockProducer.verifyProducerState(MockProducer.java:211)
at
org.apache.kafka.clients.producer.MockProducer.beginTransaction(MockProducer.java:148)
... 24 more
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
at org.junit.Assert.assertThat(Assert.java:964)
at org.junit.Assert.assertThat(Assert.java:930)
at
org.junit.rules.ExpectedException.handleException(ExpectedException.java:271)
at
org.junit.rules.ExpectedException.access$000(ExpectedException.java:111)
at
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:260)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at
org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
at
org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
at
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
at
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
at
org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
at
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133)
at
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
at
worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
at
worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]