Repository: flink
Updated Branches:
  refs/heads/release-1.3 51818a283 -> 2708f6d9a


[FLINK-6520] [kafka] Overwrite auto commit props for ON_CHECKPOINTS / DISABLED 
commit mode


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2708f6d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2708f6d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2708f6d9

Branch: refs/heads/release-1.3
Commit: 2708f6d9a4b74c35c598146765d4a188452f323e
Parents: 51818a2
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Mon May 15 14:49:50 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Mon May 15 16:00:09 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaConsumer010.java |  7 ++
 .../connectors/kafka/FlinkKafkaConsumer09.java  |  6 ++
 .../kafka/FlinkKafkaConsumerBase.java           |  5 +
 .../kafka/FlinkKafkaConsumerBaseTest.java       | 74 ++++++++++++++-
 .../connectors/kafka/KafkaConsumerTestBase.java | 98 ++++++++++----------
 5 files changed, 140 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2708f6d9/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 9e06d6e..23fc84e 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.util.PropertiesUtil;
 import org.apache.flink.util.SerializedValue;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 
 import java.util.Collections;
 import java.util.Map;
@@ -138,6 +139,12 @@ public class FlinkKafkaConsumer010<T> extends 
FlinkKafkaConsumer09<T> {
 
                boolean useMetrics = !PropertiesUtil.getBoolean(properties, 
KEY_DISABLE_METRICS, false);
 
+               // make sure that auto commit is disabled when our offset 
commit mode is ON_CHECKPOINTS;
+               // this overwrites whatever setting the user configured in the 
properties
+               if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || 
offsetCommitMode == OffsetCommitMode.DISABLED) {
+                       
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+               }
+
                return new Kafka010Fetcher<>(
                                sourceContext,
                                assignedPartitionsWithInitialOffsets,

http://git-wip-us.apache.org/repos/asf/flink/blob/2708f6d9/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index d0284ce..e638348 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -181,6 +181,12 @@ public class FlinkKafkaConsumer09<T> extends 
FlinkKafkaConsumerBase<T> {
 
                boolean useMetrics = !PropertiesUtil.getBoolean(properties, 
KEY_DISABLE_METRICS, false);
 
+               // make sure that auto commit is disabled when our offset 
commit mode is ON_CHECKPOINTS;
+               // this overwrites whatever setting the user configured in the 
properties
+               if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || 
offsetCommitMode == OffsetCommitMode.DISABLED) {
+                       
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+               }
+
                return new Kafka09Fetcher<>(
                                sourceContext,
                                assignedPartitionsWithInitialOffsets,

http://git-wip-us.apache.org/repos/asf/flink/blob/2708f6d9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index 4a05efa..87bedce 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -769,4 +769,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
        HashMap<KafkaTopicPartition, Long> getRestoredState() {
                return restoredState;
        }
+
+       @VisibleForTesting
+       OffsetCommitMode getOffsetCommitMode() {
+               return offsetCommitMode;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2708f6d9/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
index 4f5b283..ccf2ed2 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
@@ -215,6 +215,72 @@ public class FlinkKafkaConsumerBaseTest {
        }
 
        @Test
+       public void testConfigureOnCheckpointsCommitMode() {
+
+               DummyFlinkKafkaConsumer consumer = new 
DummyFlinkKafkaConsumer();
+               consumer.setIsAutoCommitEnabled(true); // this should be ignored
+
+               StreamingRuntimeContext context = 
mock(StreamingRuntimeContext.class);
+               when(context.getIndexOfThisSubtask()).thenReturn(0);
+               when(context.getNumberOfParallelSubtasks()).thenReturn(1);
+               when(context.isCheckpointingEnabled()).thenReturn(true); // 
enable checkpointing, auto commit should be ignored
+               consumer.setRuntimeContext(context);
+
+               consumer.open(new Configuration());
+               assertEquals(OffsetCommitMode.ON_CHECKPOINTS, 
consumer.getOffsetCommitMode());
+       }
+
+       @Test
+       public void testConfigureAutoCommitMode() {
+
+               DummyFlinkKafkaConsumer consumer = new 
DummyFlinkKafkaConsumer();
+               consumer.setIsAutoCommitEnabled(true);
+
+               StreamingRuntimeContext context = 
mock(StreamingRuntimeContext.class);
+               when(context.getIndexOfThisSubtask()).thenReturn(0);
+               when(context.getNumberOfParallelSubtasks()).thenReturn(1);
+               when(context.isCheckpointingEnabled()).thenReturn(false); // 
disable checkpointing, auto commit should be respected
+               consumer.setRuntimeContext(context);
+
+               consumer.open(new Configuration());
+               assertEquals(OffsetCommitMode.KAFKA_PERIODIC, 
consumer.getOffsetCommitMode());
+       }
+
+       @Test
+       public void testConfigureDisableOffsetCommitWithCheckpointing() {
+
+               DummyFlinkKafkaConsumer consumer = new 
DummyFlinkKafkaConsumer();
+               consumer.setIsAutoCommitEnabled(true); // this should be ignored
+
+               StreamingRuntimeContext context = 
mock(StreamingRuntimeContext.class);
+               when(context.getIndexOfThisSubtask()).thenReturn(0);
+               when(context.getNumberOfParallelSubtasks()).thenReturn(1);
+               when(context.isCheckpointingEnabled()).thenReturn(true); // 
enable checkpointing, auto commit should be ignored
+               consumer.setRuntimeContext(context);
+
+               consumer.setCommitOffsetsOnCheckpoints(false); // disabling 
offset committing should override everything
+
+               consumer.open(new Configuration());
+               assertEquals(OffsetCommitMode.DISABLED, 
consumer.getOffsetCommitMode());
+       }
+
+       @Test
+       public void testConfigureDisableOffsetCommitWithoutCheckpointing() {
+
+               DummyFlinkKafkaConsumer consumer = new 
DummyFlinkKafkaConsumer();
+               consumer.setIsAutoCommitEnabled(false);
+
+               StreamingRuntimeContext context = 
mock(StreamingRuntimeContext.class);
+               when(context.getIndexOfThisSubtask()).thenReturn(0);
+               when(context.getNumberOfParallelSubtasks()).thenReturn(1);
+               when(context.isCheckpointingEnabled()).thenReturn(false); // 
disable checkpointing, auto commit should be respected
+               consumer.setRuntimeContext(context);
+
+               consumer.open(new Configuration());
+               assertEquals(OffsetCommitMode.DISABLED, 
consumer.getOffsetCommitMode());
+       }
+
+       @Test
        @SuppressWarnings("unchecked")
        public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws 
Exception {
 
@@ -496,6 +562,8 @@ public class FlinkKafkaConsumerBaseTest {
        private static class DummyFlinkKafkaConsumer<T> extends 
FlinkKafkaConsumerBase<T> {
                private static final long serialVersionUID = 1L;
 
+               boolean isAutoCommitEnabled = false;
+
                @SuppressWarnings("unchecked")
                public DummyFlinkKafkaConsumer() {
                        super(Arrays.asList("dummy-topic"), 
(KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class));
@@ -520,7 +588,11 @@ public class FlinkKafkaConsumerBaseTest {
 
                @Override
                protected boolean getIsAutoCommitEnabled() {
-                       return false;
+                       return isAutoCommitEnabled;
+               }
+
+               public void setIsAutoCommitEnabled(boolean isAutoCommitEnabled) 
{
+                       this.isAutoCommitEnabled = isAutoCommitEnabled;
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2708f6d9/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index ba83460..2adb5ec 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1291,55 +1291,6 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                }
        }
 
-       private static class Tuple2WithTopicSchema implements 
KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
-                       KeyedSerializationSchema<Tuple3<Integer, Integer, 
String>> {
-
-               private final TypeSerializer<Tuple2<Integer, Integer>> ts;
-               
-               public Tuple2WithTopicSchema(ExecutionConfig ec) {
-                       ts = TypeInfoParser.<Tuple2<Integer, 
Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
-               }
-
-               @Override
-               public Tuple3<Integer, Integer, String> deserialize(byte[] 
messageKey, byte[] message, String topic, int partition, long offset) throws 
IOException {
-                       DataInputView in = new DataInputViewStreamWrapper(new 
ByteArrayInputStream(message));
-                       Tuple2<Integer, Integer> t2 = ts.deserialize(in);
-                       return new Tuple3<>(t2.f0, t2.f1, topic);
-               }
-
-               @Override
-               public boolean isEndOfStream(Tuple3<Integer, Integer, String> 
nextElement) {
-                       return false;
-               }
-
-               @Override
-               public TypeInformation<Tuple3<Integer, Integer, String>> 
getProducedType() {
-                       return TypeInfoParser.parse("Tuple3<Integer, Integer, 
String>");
-               }
-
-               @Override
-               public byte[] serializeKey(Tuple3<Integer, Integer, String> 
element) {
-                       return null;
-               }
-
-               @Override
-               public byte[] serializeValue(Tuple3<Integer, Integer, String> 
element) {
-                       ByteArrayOutputStream by = new ByteArrayOutputStream();
-                       DataOutputView out = new 
DataOutputViewStreamWrapper(by);
-                       try {
-                               ts.serialize(new Tuple2<>(element.f0, 
element.f1), out);
-                       } catch (IOException e) {
-                               throw new RuntimeException("Error" ,e);
-                       }
-                       return by.toByteArray();
-               }
-
-               @Override
-               public String getTargetTopic(Tuple3<Integer, Integer, String> 
element) {
-                       return element.f2;
-               }
-       }
-
        /**
         * Test Flink's Kafka integration also with very big records (30MB)
         * see 
http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
@@ -2276,4 +2227,53 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        this.numElementsTotal = state.get(0);
                }
        }
+
+       private static class Tuple2WithTopicSchema implements 
KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
+               KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
+
+               private final TypeSerializer<Tuple2<Integer, Integer>> ts;
+
+               public Tuple2WithTopicSchema(ExecutionConfig ec) {
+                       ts = TypeInfoParser.<Tuple2<Integer, 
Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
+               }
+
+               @Override
+               public Tuple3<Integer, Integer, String> deserialize(byte[] 
messageKey, byte[] message, String topic, int partition, long offset) throws 
IOException {
+                       DataInputView in = new DataInputViewStreamWrapper(new 
ByteArrayInputStream(message));
+                       Tuple2<Integer, Integer> t2 = ts.deserialize(in);
+                       return new Tuple3<>(t2.f0, t2.f1, topic);
+               }
+
+               @Override
+               public boolean isEndOfStream(Tuple3<Integer, Integer, String> 
nextElement) {
+                       return false;
+               }
+
+               @Override
+               public TypeInformation<Tuple3<Integer, Integer, String>> 
getProducedType() {
+                       return TypeInfoParser.parse("Tuple3<Integer, Integer, 
String>");
+               }
+
+               @Override
+               public byte[] serializeKey(Tuple3<Integer, Integer, String> 
element) {
+                       return null;
+               }
+
+               @Override
+               public byte[] serializeValue(Tuple3<Integer, Integer, String> 
element) {
+                       ByteArrayOutputStream by = new ByteArrayOutputStream();
+                       DataOutputView out = new 
DataOutputViewStreamWrapper(by);
+                       try {
+                               ts.serialize(new Tuple2<>(element.f0, 
element.f1), out);
+                       } catch (IOException e) {
+                               throw new RuntimeException("Error" ,e);
+                       }
+                       return by.toByteArray();
+               }
+
+               @Override
+               public String getTargetTopic(Tuple3<Integer, Integer, String> 
element) {
+                       return element.f2;
+               }
+       }
 }

Reply via email to