[ 
https://issues.apache.org/jira/browse/FLINK-10353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16676510#comment-16676510
 ] 

ASF GitHub Bot commented on FLINK-10353:
----------------------------------------

StefanRRichter commented on a change in pull request #7010: 
[FLINK-10353][kafka] Support change of transactional semantics in Kaf…
URL: https://github.com/apache/flink/pull/7010#discussion_r231067562
 
 

 ##########
 File path: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java
 ##########
 @@ -566,6 +568,76 @@ public void testRunOutOfProducersInThePool() throws 
Exception {
                deleteTestTopic(topic);
        }
 
+       @Test
+       public void testMigrateFromAtLeastOnceToExactlyOnce() throws Exception {
+               String topic = "testMigrateFromAtLeastOnceToExactlyOnce";
+
+               OperatorSubtaskState producerSnapshot;
+               try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness = createTestHarness(topic, AT_LEAST_ONCE)) {
+                       testHarness.setup();
+                       testHarness.open();
+                       testHarness.processElement(42, 0);
+                       testHarness.snapshot(0, 1);
+                       testHarness.processElement(43, 2);
+                       testHarness.notifyOfCompletedCheckpoint(0);
+                       producerSnapshot = testHarness.snapshot(1, 3);
+                       testHarness.processElement(44, 4);
+               }
+
+               try (OneInputStreamOperatorTestHarness<Integer, Object> 
testHarness = createTestHarness(topic, EXACTLY_ONCE)) {
+                       testHarness.setup();
+                       // restore from snapshot, all records until here should 
be persisted
+                       testHarness.initializeState(producerSnapshot);
+                       testHarness.open();
+
+                       // write and commit more records
+                       testHarness.processElement(44, 7);
 
 Review comment:
   can do

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Restoring a KafkaProducer with Semantic.EXACTLY_ONCE from a savepoint written 
> with Semantic.AT_LEAST_ONCE fails with NPE
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-10353
>                 URL: https://issues.apache.org/jira/browse/FLINK-10353
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.5.3, 1.6.0
>            Reporter: Konstantin Knauf
>            Assignee: Stefan Richter
>            Priority: Critical
>              Labels: pull-request-available
>
> If a KafkaProducer with {{Semantic.EXACTLY_ONCE}} is restored from a 
> savepoint written with {{Semantic.AT_LEAST_ONCE}} the job fails on restore 
> with the NPE below. This makes it impossible to upgrade an AT_LEAST_ONCE 
> pipeline to an EXACTL_ONCE pipeline statefully.
> {quote}
> java.lang.NullPointerException
>       at java.util.Hashtable.put(Hashtable.java:460)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:955)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:733)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.recoverAndCommit(FlinkKafkaProducer011.java:93)
>       at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:373)
>       at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:333)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:867)
>       at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>       at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>       at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:254)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:748){quote}
> The reason is, that for {{Semantic.AT_LEAST_ONCE}} the snapshotted state of 
> the {{TwoPhaseCommitFunction}} is of the form 
> "TransactionHolder\{handle=KafkaTransactionState [transactionalId=null, 
> producerId=-1, epoch=-1], transactionStartTime=1537175471175}".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to