Repository: flink Updated Branches: refs/heads/release-1.3 51818a283 -> 2708f6d9a
[FLINK-6520] [kafka] Overwrite auto commit props for ON_CHECKPOINTS / DISABLED commit mode Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2708f6d9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2708f6d9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2708f6d9 Branch: refs/heads/release-1.3 Commit: 2708f6d9a4b74c35c598146765d4a188452f323e Parents: 51818a2 Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Mon May 15 14:49:50 2017 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Mon May 15 16:00:09 2017 +0800 ---------------------------------------------------------------------- .../connectors/kafka/FlinkKafkaConsumer010.java | 7 ++ .../connectors/kafka/FlinkKafkaConsumer09.java | 6 ++ .../kafka/FlinkKafkaConsumerBase.java | 5 + .../kafka/FlinkKafkaConsumerBaseTest.java | 74 ++++++++++++++- .../connectors/kafka/KafkaConsumerTestBase.java | 98 ++++++++++---------- 5 files changed, 140 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2708f6d9/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java index 9e06d6e..23fc84e 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java @@ -29,6 +29,7 @@ import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.util.PropertiesUtil; import org.apache.flink.util.SerializedValue; +import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Collections; import java.util.Map; @@ -138,6 +139,12 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> { boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false); + // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS; + // this overwrites whatever setting the user configured in the properties + if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) { + properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + } + return new Kafka010Fetcher<>( sourceContext, assignedPartitionsWithInitialOffsets, http://git-wip-us.apache.org/repos/asf/flink/blob/2708f6d9/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java index d0284ce..e638348 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -181,6 +181,12 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { boolean useMetrics = !PropertiesUtil.getBoolean(properties, KEY_DISABLE_METRICS, false); + // make sure that auto commit is disabled when our offset commit mode is ON_CHECKPOINTS; + // this overwrites whatever setting the user configured in the properties + if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) { + properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + } + return new Kafka09Fetcher<>( sourceContext, assignedPartitionsWithInitialOffsets, http://git-wip-us.apache.org/repos/asf/flink/blob/2708f6d9/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 4a05efa..87bedce 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -769,4 +769,9 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti HashMap<KafkaTopicPartition, Long> getRestoredState() { return restoredState; } + + @VisibleForTesting + OffsetCommitMode getOffsetCommitMode() { + return offsetCommitMode; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/2708f6d9/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index 4f5b283..ccf2ed2 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -215,6 +215,72 @@ public class FlinkKafkaConsumerBaseTest { } @Test + public void testConfigureOnCheckpointsCommitMode() { + + DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer(); + consumer.setIsAutoCommitEnabled(true); // this should be ignored + + StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); + when(context.getIndexOfThisSubtask()).thenReturn(0); + when(context.getNumberOfParallelSubtasks()).thenReturn(1); + when(context.isCheckpointingEnabled()).thenReturn(true); // enable checkpointing, auto commit should be ignored + consumer.setRuntimeContext(context); + + consumer.open(new Configuration()); + assertEquals(OffsetCommitMode.ON_CHECKPOINTS, consumer.getOffsetCommitMode()); + } + + @Test + public void testConfigureAutoCommitMode() { + + DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer(); + consumer.setIsAutoCommitEnabled(true); + + StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); + when(context.getIndexOfThisSubtask()).thenReturn(0); + when(context.getNumberOfParallelSubtasks()).thenReturn(1); + when(context.isCheckpointingEnabled()).thenReturn(false); // disable checkpointing, auto commit should be respected + consumer.setRuntimeContext(context); + + consumer.open(new Configuration()); + assertEquals(OffsetCommitMode.KAFKA_PERIODIC, consumer.getOffsetCommitMode()); + } + + @Test + public void testConfigureDisableOffsetCommitWithCheckpointing() { + + DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer(); + consumer.setIsAutoCommitEnabled(true); // this should be ignored + + StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); + when(context.getIndexOfThisSubtask()).thenReturn(0); + when(context.getNumberOfParallelSubtasks()).thenReturn(1); + when(context.isCheckpointingEnabled()).thenReturn(true); // enable checkpointing, auto commit should be ignored + consumer.setRuntimeContext(context); + + consumer.setCommitOffsetsOnCheckpoints(false); // disabling offset committing should override everything + + consumer.open(new Configuration()); + assertEquals(OffsetCommitMode.DISABLED, consumer.getOffsetCommitMode()); + } + + @Test + public void testConfigureDisableOffsetCommitWithoutCheckpointing() { + + DummyFlinkKafkaConsumer consumer = new DummyFlinkKafkaConsumer(); + consumer.setIsAutoCommitEnabled(false); + + StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); + when(context.getIndexOfThisSubtask()).thenReturn(0); + when(context.getNumberOfParallelSubtasks()).thenReturn(1); + when(context.isCheckpointingEnabled()).thenReturn(false); // disable checkpointing, auto commit should be respected + consumer.setRuntimeContext(context); + + consumer.open(new Configuration()); + assertEquals(OffsetCommitMode.DISABLED, consumer.getOffsetCommitMode()); + } + + @Test @SuppressWarnings("unchecked") public void testSnapshotStateWithCommitOnCheckpointsEnabled() throws Exception { @@ -496,6 +562,8 @@ public class FlinkKafkaConsumerBaseTest { private static class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> { private static final long serialVersionUID = 1L; + boolean isAutoCommitEnabled = false; + @SuppressWarnings("unchecked") public DummyFlinkKafkaConsumer() { super(Arrays.asList("dummy-topic"), (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class)); @@ -520,7 +588,11 @@ public class FlinkKafkaConsumerBaseTest { @Override protected boolean getIsAutoCommitEnabled() { - return false; + return isAutoCommitEnabled; + } + + public void setIsAutoCommitEnabled(boolean isAutoCommitEnabled) { + this.isAutoCommitEnabled = isAutoCommitEnabled; } } http://git-wip-us.apache.org/repos/asf/flink/blob/2708f6d9/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index ba83460..2adb5ec 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -1291,55 +1291,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } } - private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>, - KeyedSerializationSchema<Tuple3<Integer, Integer, String>> { - - private final TypeSerializer<Tuple2<Integer, Integer>> ts; - - public Tuple2WithTopicSchema(ExecutionConfig ec) { - ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec); - } - - @Override - public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { - DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); - Tuple2<Integer, Integer> t2 = ts.deserialize(in); - return new Tuple3<>(t2.f0, t2.f1, topic); - } - - @Override - public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) { - return false; - } - - @Override - public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() { - return TypeInfoParser.parse("Tuple3<Integer, Integer, String>"); - } - - @Override - public byte[] serializeKey(Tuple3<Integer, Integer, String> element) { - return null; - } - - @Override - public byte[] serializeValue(Tuple3<Integer, Integer, String> element) { - ByteArrayOutputStream by = new ByteArrayOutputStream(); - DataOutputView out = new DataOutputViewStreamWrapper(by); - try { - ts.serialize(new Tuple2<>(element.f0, element.f1), out); - } catch (IOException e) { - throw new RuntimeException("Error" ,e); - } - return by.toByteArray(); - } - - @Override - public String getTargetTopic(Tuple3<Integer, Integer, String> element) { - return element.f2; - } - } - /** * Test Flink's Kafka integration also with very big records (30MB) * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message @@ -2276,4 +2227,53 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { this.numElementsTotal = state.get(0); } } + + private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>, + KeyedSerializationSchema<Tuple3<Integer, Integer, String>> { + + private final TypeSerializer<Tuple2<Integer, Integer>> ts; + + public Tuple2WithTopicSchema(ExecutionConfig ec) { + ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec); + } + + @Override + public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); + Tuple2<Integer, Integer> t2 = ts.deserialize(in); + return new Tuple3<>(t2.f0, t2.f1, topic); + } + + @Override + public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) { + return false; + } + + @Override + public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() { + return TypeInfoParser.parse("Tuple3<Integer, Integer, String>"); + } + + @Override + public byte[] serializeKey(Tuple3<Integer, Integer, String> element) { + return null; + } + + @Override + public byte[] serializeValue(Tuple3<Integer, Integer, String> element) { + ByteArrayOutputStream by = new ByteArrayOutputStream(); + DataOutputView out = new DataOutputViewStreamWrapper(by); + try { + ts.serialize(new Tuple2<>(element.f0, element.f1), out); + } catch (IOException e) { + throw new RuntimeException("Error" ,e); + } + return by.toByteArray(); + } + + @Override + public String getTargetTopic(Tuple3<Integer, Integer, String> element) { + return element.f2; + } + } }
