Abacn commented on code in PR #17565:
URL: https://github.com/apache/beam/pull/17565#discussion_r866312831


##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java:
##########
@@ -1615,6 +1646,44 @@ public void testExactlyOnceSink() {
     }
   }
 
+  @Test
+  public void testExactlyOnceSinkWithSendException() throws Throwable {

Review Comment:
   Ran tests locally and this failed:
   ```
   testExactlyOnceSinkWithSendException
   java.lang.AssertionError: 
   Expected: (an instance of org.apache.kafka.common.KafkaException and 
exception with message a string containing "fakeException")
        but: an instance of org.apache.kafka.common.KafkaException 
<java.lang.RuntimeException: java.lang.reflect.InvocationTargetException> is a 
java.lang.RuntimeException
   Stacktrace was: java.lang.RuntimeException: 
java.lang.reflect.InvocationTargetException
        at 
org.apache.beam.sdk.io.kafka.ProducerSpEL.invoke(ProducerSpEL.java:100)
        at 
org.apache.beam.sdk.io.kafka.ProducerSpEL.beginTransaction(ProducerSpEL.java:119)
        at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$ShardWriter.beginTxn(KafkaExactlyOnceSink.java:473)
        at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.processElement(KafkaExactlyOnceSink.java:322)
   Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.beam.sdk.io.kafka.ProducerSpEL.invoke(ProducerSpEL.java:98)
        at 
org.apache.beam.sdk.io.kafka.ProducerSpEL.beginTransaction(ProducerSpEL.java:119)
        at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$ShardWriter.beginTxn(KafkaExactlyOnceSink.java:473)
        at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.processElement(KafkaExactlyOnceSink.java:322)
        at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:228)
        at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:184)
        at 
org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner.processElementUnordered(StatefulDoFnRunner.java:160)
        at 
org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner.processElement(StatefulDoFnRunner.java:153)
        at 
org.apache.beam.repackaged.direct_java.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
        at 
org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
        at 
org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
        at 
org.apache.beam.runners.direct.StatefulParDoEvaluatorFactory$StatefulParDoEvaluator.processElement(StatefulParDoEvaluatorFactory.java:175)
        at 
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
        at 
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   Caused by: java.lang.IllegalStateException: MockProducer is already closed.
        at 
org.apache.kafka.clients.producer.MockProducer.verifyProducerState(MockProducer.java:211)
        at 
org.apache.kafka.clients.producer.MockProducer.beginTransaction(MockProducer.java:148)
        ... 24 more
   
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
        at org.junit.Assert.assertThat(Assert.java:964)
        at org.junit.Assert.assertThat(Assert.java:930)
        at 
org.junit.rules.ExpectedException.handleException(ExpectedException.java:271)
        at 
org.junit.rules.ExpectedException.access$000(ExpectedException.java:111)
        at 
org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:260)
        at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
        at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
        at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
        at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
        at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
        at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
        at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
        at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
        at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
        at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
        at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
        at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
        at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
        at 
org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
        at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
        at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
        at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
        at 
org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
        at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133)
        at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
        at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
        at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to