[hotfix][kafka] Move checkpointing enable checking to initializeState

initializeState is called before open and since both of those functions
relay on chosen semantic, that means checkpointing enable check should
happen in initializeState.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/425ffe26
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/425ffe26
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/425ffe26

Branch: refs/heads/master
Commit: 425ffe268f0c5aceac084b522af04736d2298da7
Parents: 856b6ba
Author: Piotr Nowojski <[email protected]>
Authored: Wed Oct 25 18:08:46 2017 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Thu Nov 2 12:43:20 2017 +0800

----------------------------------------------------------------------
 .../streaming/connectors/kafka/FlinkKafkaProducer011.java | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/425ffe26/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 6242a20..a69c730 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -524,11 +524,6 @@ public class FlinkKafkaProducer011<IN>
         */
        @Override
        public void open(Configuration configuration) throws Exception {
-               if (semantic != Semantic.NONE && !((StreamingRuntimeContext) 
this.getRuntimeContext()).isCheckpointingEnabled()) {
-                       LOG.warn("Using {} semantic, but checkpointing is not 
enabled. Switching to {} semantic.", semantic, Semantic.NONE);
-                       semantic = Semantic.NONE;
-               }
-
                if (logFailuresOnly) {
                        callback = new Callback() {
                                @Override
@@ -787,6 +782,11 @@ public class FlinkKafkaProducer011<IN>
 
        @Override
        public void initializeState(FunctionInitializationContext context) 
throws Exception {
+               if (semantic != Semantic.NONE && !((StreamingRuntimeContext) 
this.getRuntimeContext()).isCheckpointingEnabled()) {
+                       LOG.warn("Using {} semantic, but checkpointing is not 
enabled. Switching to {} semantic.", semantic, Semantic.NONE);
+                       semantic = Semantic.NONE;
+               }
+
                nextTransactionalIdHintState = 
context.getOperatorStateStore().getUnionListState(
                        NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
 

Reply via email to