Abacn commented on code in PR #17565:
URL: https://github.com/apache/beam/pull/17565#discussion_r866312831


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java:
##########
@@ -1615,6 +1646,44 @@ public void testExactlyOnceSink() {
     }
   }
 
+  @Test
+  public void testExactlyOnceSinkWithSendException() throws Throwable {

Review Comment:
   Edit: run all tests in KafkaIOTest, this one fails; but if only run this 
test method, it is succeeded. Likely due to racing condition.
   
   Ran tests locally and this failed:
   ```
   testExactlyOnceSinkWithSendException
   java.lang.AssertionError: 
   Expected: (an instance of org.apache.kafka.common.KafkaException and 
exception with message a string containing "fakeException")
        but: an instance of org.apache.kafka.common.KafkaException 
<java.lang.RuntimeException: java.lang.reflect.InvocationTargetException> is a 
java.lang.RuntimeException
   Stacktrace was: java.lang.RuntimeException: 
java.lang.reflect.InvocationTargetException
        at 
org.apache.beam.sdk.io.kafka.ProducerSpEL.invoke(ProducerSpEL.java:100)
        at 
org.apache.beam.sdk.io.kafka.ProducerSpEL.beginTransaction(ProducerSpEL.java:119)
        at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$ShardWriter.beginTxn(KafkaExactlyOnceSink.java:473)
        at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.processElement(KafkaExactlyOnceSink.java:322)
   Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.beam.sdk.io.kafka.ProducerSpEL.invoke(ProducerSpEL.java:98)
        at 
org.apache.beam.sdk.io.kafka.ProducerSpEL.beginTransaction(ProducerSpEL.java:119)
        at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$ShardWriter.beginTxn(KafkaExactlyOnceSink.java:473)
        at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.processElement(KafkaExactlyOnceSink.java:322)
        at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
        at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
        at 
org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner.processElementUnordered(StatefulDoFnRunner.java:160)
        at 
org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner.processElement(StatefulDoFnRunner.java:153)
        at 
org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
        at 
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
        at 
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
        at 
org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory$StatefulParDoEvaluator.processElement(StatefulParDoEvaluatorFactory.java:175)
        at 
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
        at 
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.IllegalStateException: MockProducer is already closed.
        at 
org.apache.kafka.clients.producer.MockProducer.verifyProducerState(MockProducer.java:211)
        at 
org.apache.kafka.clients.producer.MockProducer.beginTransaction(MockProducer.java:148)
        ... 24 more
   
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
        at org.junit.Assert.assertThat(Assert.java:964)
        at org.junit.Assert.assertThat(Assert.java:930)
        at 
org.junit.rules.ExpectedException.handleException(ExpectedException.java:271)
        at 
org.junit.rules.ExpectedException.access$000(ExpectedException.java:111)
        at 
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:260)
        at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
        at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
        at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
        at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
        at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
        at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
        at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
        at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
        at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
        at 
org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
        at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
        at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
        at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
        at 
org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
        at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133)
        at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
        at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
        at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to