http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java new file mode 100644 index 0000000..99c5d69 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; + +/** + * A special version of the per-kafka-partition-state that additionally holds + * a periodic watermark generator (and timestamp extractor) per partition. + * + * @param <T> The type of records handled by the watermark generator + * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions. + */ +public final class KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> { + + /** The timestamp assigner and watermark generator for the partition */ + private final AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks; + + /** The last watermark timestamp generated by this partition */ + private long partitionWatermark; + + // ------------------------------------------------------------------------ + + public KafkaTopicPartitionStateWithPeriodicWatermarks( + KafkaTopicPartition partition, KPH kafkaPartitionHandle, + AssignerWithPeriodicWatermarks<T> timestampsAndWatermarks) + { + super(partition, kafkaPartitionHandle); + + this.timestampsAndWatermarks = timestampsAndWatermarks; + this.partitionWatermark = Long.MIN_VALUE; + } + + // ------------------------------------------------------------------------ + + public long getTimestampForRecord (T record) { + return timestampsAndWatermarks.extractTimestamp(record, Long.MIN_VALUE); + } + + public long getCurrentWatermarkTimestamp() { + Watermark wm = timestampsAndWatermarks.getCurrentWatermark(); + if (wm != null) { + partitionWatermark = Math.max(partitionWatermark, wm.getTimestamp()); + } + return partitionWatermark; + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "KafkaTopicPartitionStateWithPeriodicWatermarks: partition=" + getKafkaTopicPartition() + + ", offset=" + getOffset() + ", watermark=" + partitionWatermark; + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java new file mode 100644 index 0000000..b265990 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; + +import javax.annotation.Nullable; + +/** + * A special version of the per-kafka-partition-state that additionally holds + * a periodic watermark generator (and timestamp extractor) per partition. + * + * <p>This class is not thread safe, but it gives volatile access to the current + * partition watermark ({@link #getCurrentPartitionWatermark()}). + * + * @param <T> The type of records handled by the watermark generator + * @param <KPH> The type of the Kafka partition descriptor, which varies across Kafka versions + */ +public final class KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH> extends KafkaTopicPartitionState<KPH> { + + /** The timestamp assigner and watermark generator for the partition */ + private final AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks; + + /** The last watermark timestamp generated by this partition */ + private volatile long partitionWatermark; + + // ------------------------------------------------------------------------ + + public KafkaTopicPartitionStateWithPunctuatedWatermarks( + KafkaTopicPartition partition, KPH kafkaPartitionHandle, + AssignerWithPunctuatedWatermarks<T> timestampsAndWatermarks) + { + super(partition, kafkaPartitionHandle); + + this.timestampsAndWatermarks = timestampsAndWatermarks; + this.partitionWatermark = Long.MIN_VALUE; + } + + // ------------------------------------------------------------------------ + + public long getTimestampForRecord(T record) { + return timestampsAndWatermarks.extractTimestamp(record, Long.MIN_VALUE); + } + + @Nullable + public Watermark checkAndGetNewWatermark(T record, long timestamp) { + Watermark mark = timestampsAndWatermarks.checkAndGetNextWatermark(record, timestamp); + if (mark != null && mark.getTimestamp() > partitionWatermark) { + partitionWatermark = mark.getTimestamp(); + return mark; + } + else { + return null; + } + } + + public long getCurrentPartitionWatermark() { + return partitionWatermark; + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return "KafkaTopicPartitionStateWithPunctuatedWatermarks: partition=" + getKafkaTopicPartition() + + ", offset=" + getOffset() + ", watermark=" + partitionWatermark; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java index 038f414..37e2ef6 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java @@ -14,8 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.connectors.kafka.partitioner; +package org.apache.flink.streaming.connectors.kafka.partitioner; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java index 1be6b00..bda90bd 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java @@ -14,8 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.streaming.connectors.kafka.util; +package org.apache.flink.streaming.connectors.kafka.util; import java.util.Properties; @@ -38,10 +38,9 @@ public class KafkaUtils { "Entered value='" + config.getProperty(key) + "'. Default value='" + defaultValue + "'"); } } - - public static void checkArgument(boolean arg) { - if(!arg) { - throw new IllegalArgumentException(); - } - } + + // ------------------------------------------------------------------------ + + /** Private default constructor to prevent instantiation */ + private KafkaUtils() {} } http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java new file mode 100644 index 0000000..f4ef995 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.commons.collections.map.LinkedMap; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.util.SerializedValue; + +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.List; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class FlinkKafkaConsumerBaseTest { + + /** + * Tests that not both types of timestamp extractors / watermark generators can be used. + */ + @Test + public void testEitherWatermarkExtractor() { + try { + new DummyFlinkKafkaConsumer<>().setPeriodicWatermarkEmitter(null); + fail(); + } catch (NullPointerException ignored) {} + + try { + new DummyFlinkKafkaConsumer<>().setPunctuatedWatermarkEmitter(null); + fail(); + } catch (NullPointerException ignored) {} + + @SuppressWarnings("unchecked") + final AssignerWithPeriodicWatermarks<String> periodicAssigner = mock(AssignerWithPeriodicWatermarks.class); + @SuppressWarnings("unchecked") + final AssignerWithPunctuatedWatermarks<String> punctuatedAssigner = mock(AssignerWithPunctuatedWatermarks.class); + + DummyFlinkKafkaConsumer<String> c1 = new DummyFlinkKafkaConsumer<>(); + c1.setPeriodicWatermarkEmitter(periodicAssigner); + try { + c1.setPunctuatedWatermarkEmitter(punctuatedAssigner); + fail(); + } catch (IllegalStateException ignored) {} + + DummyFlinkKafkaConsumer<String> c2 = new DummyFlinkKafkaConsumer<>(); + c2.setPunctuatedWatermarkEmitter(punctuatedAssigner); + try { + c2.setPeriodicWatermarkEmitter(periodicAssigner); + fail(); + } catch (IllegalStateException ignored) {} + } + + /** + * Tests that no checkpoints happen when the fetcher is not running. + */ + @Test + public void ignoreCheckpointWhenNotRunning() throws Exception { + @SuppressWarnings("unchecked") + final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class); + + FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, new LinkedMap(), false); + assertNull(consumer.snapshotState(17L, 23L)); + consumer.notifyCheckpointComplete(66L); + } + + /** + * Tests that no checkpoints happen when the fetcher is not running. + */ + @Test + public void checkRestoredCheckpointWhenFetcherNotReady() throws Exception { + HashMap<KafkaTopicPartition, Long> restoreState = new HashMap<>(); + restoreState.put(new KafkaTopicPartition("abc", 13), 16768L); + restoreState.put(new KafkaTopicPartition("def", 7), 987654321L); + + FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true); + consumer.restoreState(restoreState); + + assertEquals(restoreState, consumer.snapshotState(17L, 23L)); + } + + /** + * Tests that no checkpoints happen when the fetcher is not running. + */ + @Test + public void checkRestoredNullCheckpointWhenFetcherNotReady() throws Exception { + FlinkKafkaConsumerBase<String> consumer = getConsumer(null, new LinkedMap(), true); + assertNull(consumer.snapshotState(17L, 23L)); + } + + @Test + @SuppressWarnings("unchecked") + public void testSnapshotState() throws Exception { + final HashMap<KafkaTopicPartition, Long> state1 = new HashMap<>(); + state1.put(new KafkaTopicPartition("abc", 13), 16768L); + state1.put(new KafkaTopicPartition("def", 7), 987654321L); + + final HashMap<KafkaTopicPartition, Long> state2 = new HashMap<>(); + state2.put(new KafkaTopicPartition("abc", 13), 16770L); + state2.put(new KafkaTopicPartition("def", 7), 987654329L); + + final HashMap<KafkaTopicPartition, Long> state3 = new HashMap<>(); + state2.put(new KafkaTopicPartition("abc", 13), 16780L); + state2.put(new KafkaTopicPartition("def", 7), 987654377L); + + final AbstractFetcher<String, ?> fetcher = mock(AbstractFetcher.class); + when(fetcher.snapshotCurrentState()).thenReturn(state1, state2, state3); + + final LinkedMap pendingCheckpoints = new LinkedMap(); + + FlinkKafkaConsumerBase<String> consumer = getConsumer(fetcher, pendingCheckpoints, true); + assertEquals(0, pendingCheckpoints.size()); + + // checkpoint 1 + HashMap<KafkaTopicPartition, Long> snapshot1 = consumer.snapshotState(138L, 19L); + assertEquals(state1, snapshot1); + assertEquals(1, pendingCheckpoints.size()); + assertEquals(state1, pendingCheckpoints.get(138L)); + + // checkpoint 2 + HashMap<KafkaTopicPartition, Long> snapshot2 = consumer.snapshotState(140L, 1578L); + assertEquals(state2, snapshot2); + assertEquals(2, pendingCheckpoints.size()); + assertEquals(state2, pendingCheckpoints.get(140L)); + + // ack checkpoint 1 + consumer.notifyCheckpointComplete(138L); + assertEquals(1, pendingCheckpoints.size()); + assertTrue(pendingCheckpoints.containsKey(140L)); + + // checkpoint 3 + HashMap<KafkaTopicPartition, Long> snapshot3 = consumer.snapshotState(141L, 1578L); + assertEquals(state3, snapshot3); + assertEquals(2, pendingCheckpoints.size()); + assertEquals(state3, pendingCheckpoints.get(141L)); + + // ack checkpoint 3, subsumes number 2 + consumer.notifyCheckpointComplete(141L); + assertEquals(0, pendingCheckpoints.size()); + + + consumer.notifyCheckpointComplete(666); // invalid checkpoint + assertEquals(0, pendingCheckpoints.size()); + + // create 500 snapshots + for (int i = 100; i < 600; i++) { + consumer.snapshotState(i, 15 * i); + } + assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size()); + + // commit only the second last + consumer.notifyCheckpointComplete(598); + assertEquals(1, pendingCheckpoints.size()); + + // access invalid checkpoint + consumer.notifyCheckpointComplete(590); + + // and the last + consumer.notifyCheckpointComplete(599); + assertEquals(0, pendingCheckpoints.size()); + } + + // ------------------------------------------------------------------------ + + private static <T> FlinkKafkaConsumerBase<T> getConsumer( + AbstractFetcher<T, ?> fetcher, LinkedMap pendingCheckpoints, boolean running) throws Exception + { + FlinkKafkaConsumerBase<T> consumer = new DummyFlinkKafkaConsumer<>(); + + Field fetcherField = FlinkKafkaConsumerBase.class.getDeclaredField("kafkaFetcher"); + fetcherField.setAccessible(true); + fetcherField.set(consumer, fetcher); + + Field mapField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints"); + mapField.setAccessible(true); + mapField.set(consumer, pendingCheckpoints); + + Field runningField = FlinkKafkaConsumerBase.class.getDeclaredField("running"); + runningField.setAccessible(true); + runningField.set(consumer, running); + + return consumer; + } + + // ------------------------------------------------------------------------ + + private static final class DummyFlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> { + private static final long serialVersionUID = 1L; + + @SuppressWarnings("unchecked") + public DummyFlinkKafkaConsumer() { + super((KeyedDeserializationSchema<T>) mock(KeyedDeserializationSchema.class)); + } + + @Override + protected AbstractFetcher<T, ?> createFetcher(SourceContext<T> sourceContext, List<KafkaTopicPartition> thisSubtaskPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext) throws Exception { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java index e86d51a..9beed22 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java @@ -18,10 +18,8 @@ package org.apache.flink.streaming.connectors.kafka; - import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader; -import org.apache.kafka.common.Node; + import org.junit.Test; import java.util.ArrayList; @@ -32,30 +30,27 @@ import java.util.Set; import static org.junit.Assert.*; - /** * Tests that the partition assignment is deterministic and stable. */ public class KafkaConsumerPartitionAssignmentTest { - private final Node fake = new Node(1337, "localhost", 1337); - @Test public void testPartitionsEqualConsumers() { try { - List<KafkaTopicPartitionLeader> inPartitions = new ArrayList<>(); - inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4), fake)); - inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52), fake)); - inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17), fake)); - inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1), fake)); + List<KafkaTopicPartition> inPartitions = Arrays.asList( + new KafkaTopicPartition("test-topic", 4), + new KafkaTopicPartition("test-topic", 52), + new KafkaTopicPartition("test-topic", 17), + new KafkaTopicPartition("test-topic", 1)); for (int i = 0; i < inPartitions.size(); i++) { - List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumerBase.assignPartitions( - inPartitions, inPartitions.size(), i); + List<KafkaTopicPartition> parts = + FlinkKafkaConsumerBase.assignPartitions(inPartitions, inPartitions.size(), i); assertNotNull(parts); assertEquals(1, parts.size()); - assertTrue(contains(inPartitions, parts.get(0).getTopicPartition().getPartition())); + assertTrue(contains(inPartitions, parts.get(0).getPartition())); } } catch (Exception e) { @@ -64,9 +59,9 @@ public class KafkaConsumerPartitionAssignmentTest { } } - private boolean contains(List<KafkaTopicPartitionLeader> inPartitions, int partition) { - for (KafkaTopicPartitionLeader ktp: inPartitions) { - if (ktp.getTopicPartition().getPartition() == partition) { + private boolean contains(List<KafkaTopicPartition> inPartitions, int partition) { + for (KafkaTopicPartition ktp : inPartitions) { + if (ktp.getPartition() == partition) { return true; } } @@ -78,11 +73,11 @@ public class KafkaConsumerPartitionAssignmentTest { try { final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14}; - final List<KafkaTopicPartitionLeader> partitions = new ArrayList<>(); - final Set<KafkaTopicPartitionLeader> allPartitions = new HashSet<>(); + final List<KafkaTopicPartition> partitions = new ArrayList<>(); + final Set<KafkaTopicPartition> allPartitions = new HashSet<>(); for (int p : partitionIDs) { - KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake); + KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p); partitions.add(part); allPartitions.add(part); } @@ -92,13 +87,14 @@ public class KafkaConsumerPartitionAssignmentTest { final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1; for (int i = 0; i < numConsumers; i++) { - List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i); + List<KafkaTopicPartition> parts = + FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i); assertNotNull(parts); assertTrue(parts.size() >= minPartitionsPerConsumer); assertTrue(parts.size() <= maxPartitionsPerConsumer); - for (KafkaTopicPartitionLeader p : parts) { + for (KafkaTopicPartition p : parts) { // check that the element was actually contained assertTrue(allPartitions.remove(p)); } @@ -116,24 +112,24 @@ public class KafkaConsumerPartitionAssignmentTest { @Test public void testPartitionsFewerThanConsumers() { try { - List<KafkaTopicPartitionLeader> inPartitions = new ArrayList<>(); - inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4), fake)); - inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52), fake)); - inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17), fake)); - inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1), fake)); + List<KafkaTopicPartition> inPartitions = Arrays.asList( + new KafkaTopicPartition("test-topic", 4), + new KafkaTopicPartition("test-topic", 52), + new KafkaTopicPartition("test-topic", 17), + new KafkaTopicPartition("test-topic", 1)); - final Set<KafkaTopicPartitionLeader> allPartitions = new HashSet<>(); + final Set<KafkaTopicPartition> allPartitions = new HashSet<>(); allPartitions.addAll(inPartitions); final int numConsumers = 2 * inPartitions.size() + 3; for (int i = 0; i < numConsumers; i++) { - List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i); + List<KafkaTopicPartition> parts = FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i); assertNotNull(parts); assertTrue(parts.size() <= 1); - for (KafkaTopicPartitionLeader p : parts) { + for (KafkaTopicPartition p : parts) { // check that the element was actually contained assertTrue(allPartitions.remove(p)); } @@ -151,12 +147,12 @@ public class KafkaConsumerPartitionAssignmentTest { @Test public void testAssignEmptyPartitions() { try { - List<KafkaTopicPartitionLeader> ep = new ArrayList<>(); - List<KafkaTopicPartitionLeader> parts1 = FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2); + List<KafkaTopicPartition> ep = new ArrayList<>(); + List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2); assertNotNull(parts1); assertTrue(parts1.isEmpty()); - List<KafkaTopicPartitionLeader> parts2 = FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0); + List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0); assertNotNull(parts2); assertTrue(parts2.isEmpty()); } @@ -170,17 +166,17 @@ public class KafkaConsumerPartitionAssignmentTest { public void testGrowingPartitionsRemainsStable() { try { final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14}; - List<KafkaTopicPartitionLeader> newPartitions = new ArrayList<>(); + List<KafkaTopicPartition> newPartitions = new ArrayList<>(); for (int p : newPartitionIDs) { - KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake); + KafkaTopicPartition part = new KafkaTopicPartition("test-topic", p); newPartitions.add(part); } - List<KafkaTopicPartitionLeader> initialPartitions = newPartitions.subList(0, 7); + List<KafkaTopicPartition> initialPartitions = newPartitions.subList(0, 7); - final Set<KafkaTopicPartitionLeader> allNewPartitions = new HashSet<>(newPartitions); - final Set<KafkaTopicPartitionLeader> allInitialPartitions = new HashSet<>(initialPartitions); + final Set<KafkaTopicPartition> allNewPartitions = new HashSet<>(newPartitions); + final Set<KafkaTopicPartition> allInitialPartitions = new HashSet<>(initialPartitions); final int numConsumers = 3; final int minInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers; @@ -188,11 +184,11 @@ public class KafkaConsumerPartitionAssignmentTest { final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers; final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1; - List<KafkaTopicPartitionLeader> parts1 = FlinkKafkaConsumerBase.assignPartitions( + List<KafkaTopicPartition> parts1 = FlinkKafkaConsumerBase.assignPartitions( initialPartitions, numConsumers, 0); - List<KafkaTopicPartitionLeader> parts2 = FlinkKafkaConsumerBase.assignPartitions( + List<KafkaTopicPartition> parts2 = FlinkKafkaConsumerBase.assignPartitions( initialPartitions, numConsumers, 1); - List<KafkaTopicPartitionLeader> parts3 = FlinkKafkaConsumerBase.assignPartitions( + List<KafkaTopicPartition> parts3 = FlinkKafkaConsumerBase.assignPartitions( initialPartitions, numConsumers, 2); assertNotNull(parts1); @@ -206,15 +202,15 @@ public class KafkaConsumerPartitionAssignmentTest { assertTrue(parts3.size() >= minInitialPartitionsPerConsumer); assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer); - for (KafkaTopicPartitionLeader p : parts1) { + for (KafkaTopicPartition p : parts1) { // check that the element was actually contained assertTrue(allInitialPartitions.remove(p)); } - for (KafkaTopicPartitionLeader p : parts2) { + for (KafkaTopicPartition p : parts2) { // check that the element was actually contained assertTrue(allInitialPartitions.remove(p)); } - for (KafkaTopicPartitionLeader p : parts3) { + for (KafkaTopicPartition p : parts3) { // check that the element was actually contained assertTrue(allInitialPartitions.remove(p)); } @@ -224,11 +220,11 @@ public class KafkaConsumerPartitionAssignmentTest { // grow the set of partitions and distribute anew - List<KafkaTopicPartitionLeader> parts1new = FlinkKafkaConsumerBase.assignPartitions( + List<KafkaTopicPartition> parts1new = FlinkKafkaConsumerBase.assignPartitions( newPartitions, numConsumers, 0); - List<KafkaTopicPartitionLeader> parts2new = FlinkKafkaConsumerBase.assignPartitions( + List<KafkaTopicPartition> parts2new = FlinkKafkaConsumerBase.assignPartitions( newPartitions, numConsumers, 1); - List<KafkaTopicPartitionLeader> parts3new = FlinkKafkaConsumerBase.assignPartitions( + List<KafkaTopicPartition> parts3new = FlinkKafkaConsumerBase.assignPartitions( newPartitions, numConsumers, 2); // new partitions must include all old partitions @@ -248,15 +244,15 @@ public class KafkaConsumerPartitionAssignmentTest { assertTrue(parts3new.size() >= minNewPartitionsPerConsumer); assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer); - for (KafkaTopicPartitionLeader p : parts1new) { + for (KafkaTopicPartition p : parts1new) { // check that the element was actually contained assertTrue(allNewPartitions.remove(p)); } - for (KafkaTopicPartitionLeader p : parts2new) { + for (KafkaTopicPartition p : parts2new) { // check that the element was actually contained assertTrue(allNewPartitions.remove(p)); } - for (KafkaTopicPartitionLeader p : parts3new) { + for (KafkaTopicPartition p : parts3new) { // check that the element was actually contained assertTrue(allNewPartitions.remove(p)); } http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 340950b..aa5344b 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -26,8 +26,6 @@ import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import kafka.server.KafkaServer; -import org.apache.commons.collections.map.LinkedMap; - import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; @@ -66,12 +64,10 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators; import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink; import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapper; import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils; -import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext; import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper; import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper; import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner; @@ -98,7 +94,6 @@ import org.junit.Rule; import java.io.ByteArrayInputStream; import java.io.IOException; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; @@ -176,70 +171,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } } } - /** - * Test that validates that checkpointing and checkpoint notification works properly - */ - public void runCheckpointingTest() throws Exception { - createTestTopic("testCheckpointing", 1, 1); - - FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer("testCheckpointing", new SimpleStringSchema(), standardProps); - Field pendingCheckpointsField = FlinkKafkaConsumerBase.class.getDeclaredField("pendingCheckpoints"); - pendingCheckpointsField.setAccessible(true); - LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source); - - Assert.assertEquals(0, pendingCheckpoints.size()); - source.setRuntimeContext(new MockRuntimeContext(1, 0)); - - final HashMap<KafkaTopicPartition, Long> initialOffsets = new HashMap<>(); - initialOffsets.put(new KafkaTopicPartition("testCheckpointing", 0), 1337L); - - // first restore - source.restoreState(initialOffsets); - - // then open - source.open(new Configuration()); - HashMap<KafkaTopicPartition, Long> state1 = source.snapshotState(1, 15); - - assertEquals(initialOffsets, state1); - - HashMap<KafkaTopicPartition, Long> state2 = source.snapshotState(2, 30); - Assert.assertEquals(initialOffsets, state2); - - Assert.assertEquals(2, pendingCheckpoints.size()); - - source.notifyCheckpointComplete(1); - Assert.assertEquals(1, pendingCheckpoints.size()); - - source.notifyCheckpointComplete(2); - Assert.assertEquals(0, pendingCheckpoints.size()); - - source.notifyCheckpointComplete(666); // invalid checkpoint - Assert.assertEquals(0, pendingCheckpoints.size()); - - // create 500 snapshots - for (int i = 100; i < 600; i++) { - source.snapshotState(i, 15 * i); - } - Assert.assertEquals(FlinkKafkaConsumerBase.MAX_NUM_PENDING_CHECKPOINTS, pendingCheckpoints.size()); - - // commit only the second last - source.notifyCheckpointComplete(598); - Assert.assertEquals(1, pendingCheckpoints.size()); - - // access invalid checkpoint - source.notifyCheckpointComplete(590); - - // and the last - source.notifyCheckpointComplete(599); - Assert.assertEquals(0, pendingCheckpoints.size()); - - source.close(); - - deleteTestTopic("testCheckpointing"); - } - - - + /** * Ensure Kafka is working on both producer and consumer side. * This executes a job that contains two Flink pipelines. @@ -409,7 +341,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); env.enableCheckpointing(500); env.setParallelism(parallelism); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); env.getConfig().disableSysoutLogging(); FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps); @@ -454,7 +386,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); env.enableCheckpointing(500); env.setParallelism(parallelism); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 1000)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); env.getConfig().disableSysoutLogging(); FlinkKafkaConsumerBase<Integer> kafkaSource = kafkaServer.getConsumer(topic, schema, standardProps); @@ -499,7 +431,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); env.enableCheckpointing(500); env.setParallelism(parallelism); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); env.getConfig().disableSysoutLogging(); env.setBufferTimeout(0); @@ -562,7 +494,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { runnerThread.start(); // wait a bit before canceling - Thread.sleep(8000); + Thread.sleep(2000); Throwable failueCause = jobError.get(); if(failueCause != null) { @@ -634,10 +566,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { runnerThread.start(); // wait a bit before canceling - Thread.sleep(8000); + Thread.sleep(2000); Throwable failueCause = error.get(); - if(failueCause != null) { + if (failueCause != null) { failueCause.printStackTrace(); Assert.fail("Test failed prematurely with: " + failueCause.getMessage()); } @@ -709,7 +641,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { final int NUM_ELEMENTS = 20; StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - + env.getConfig().disableSysoutLogging(); + // create topics with content final List<String> topics = new ArrayList<>(); for (int i = 0; i < NUM_TOPICS; i++) { @@ -745,6 +678,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // run second job consuming from multiple topics env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + stream = env.addSource(kafkaServer.getConsumer(topics, schema, standardProps)); stream.flatMap(new FlatMapFunction<Tuple3<Integer, Integer, String>, Integer>() { @@ -1453,50 +1388,50 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { ///////////// Testing the Kafka consumer with embeded watermark generation functionality /////////////// - @RetryOnException(times=0, exception=kafka.common.NotLeaderForPartitionException.class) - public void runExplicitPunctuatedWMgeneratingConsumerTest(boolean emptyPartition) throws Exception { - - final String topic1 = "wmExtractorTopic1_" + UUID.randomUUID().toString(); - final String topic2 = "wmExtractorTopic2_" + UUID.randomUUID().toString(); - - final Map<String, Boolean> topics = new HashMap<>(); - topics.put(topic1, false); - topics.put(topic2, emptyPartition); - - final int noOfTopcis = topics.size(); - final int partitionsPerTopic = 1; - final int elementsPerPartition = 100 + 1; - - final int totalElements = emptyPartition ? - partitionsPerTopic * elementsPerPartition : - noOfTopcis * partitionsPerTopic * elementsPerPartition; - - createTestTopic(topic1, partitionsPerTopic, 1); - createTestTopic(topic2, partitionsPerTopic, 1); - - final StreamExecutionEnvironment env = - StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.setParallelism(partitionsPerTopic); - env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately - env.getConfig().disableSysoutLogging(); - - TypeInformation<Tuple2<Long, Integer>> longIntType = TypeInfoParser.parse("Tuple2<Long, Integer>"); - - Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); - producerProperties.setProperty("retries", "0"); - - putDataInTopics(env, producerProperties, elementsPerPartition, topics, longIntType); - - List<String> topicTitles = new ArrayList<>(topics.keySet()); - runPunctuatedComsumer(env, topicTitles, totalElements, longIntType); - - executeAndCatchException(env, "runComsumerWithPunctuatedExplicitWMTest"); - - for(String topic: topicTitles) { - deleteTestTopic(topic); - } - } +// @RetryOnException(times=0, exception=kafka.common.NotLeaderForPartitionException.class) +// public void runExplicitPunctuatedWMgeneratingConsumerTest(boolean emptyPartition) throws Exception { +// +// final String topic1 = "wmExtractorTopic1_" + UUID.randomUUID().toString(); +// final String topic2 = "wmExtractorTopic2_" + UUID.randomUUID().toString(); +// +// final Map<String, Boolean> topics = new HashMap<>(); +// topics.put(topic1, false); +// topics.put(topic2, emptyPartition); +// +// final int noOfTopcis = topics.size(); +// final int partitionsPerTopic = 1; +// final int elementsPerPartition = 100 + 1; +// +// final int totalElements = emptyPartition ? +// partitionsPerTopic * elementsPerPartition : +// noOfTopcis * partitionsPerTopic * elementsPerPartition; +// +// createTestTopic(topic1, partitionsPerTopic, 1); +// createTestTopic(topic2, partitionsPerTopic, 1); +// +// final StreamExecutionEnvironment env = +// StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); +// env.setParallelism(partitionsPerTopic); +// env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately +// env.getConfig().disableSysoutLogging(); +// +// TypeInformation<Tuple2<Long, Integer>> longIntType = TypeInfoParser.parse("Tuple2<Long, Integer>"); +// +// Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); +// producerProperties.setProperty("retries", "0"); +// +// putDataInTopics(env, producerProperties, elementsPerPartition, topics, longIntType); +// +// List<String> topicTitles = new ArrayList<>(topics.keySet()); +// runPunctuatedComsumer(env, topicTitles, totalElements, longIntType); +// +// executeAndCatchException(env, "runComsumerWithPunctuatedExplicitWMTest"); +// +// for(String topic: topicTitles) { +// deleteTestTopic(topic); +// } +// } private void executeAndCatchException(StreamExecutionEnvironment env, String execName) throws Exception { try { http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java index e251174..14e74f1 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java @@ -167,7 +167,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { // ------------------------------------------------------------------------ - public static class CustomPartitioner extends KafkaPartitioner implements Serializable { + public static class CustomPartitioner extends KafkaPartitioner<Tuple2<Long, String>> implements Serializable { private final int expectedPartitions; @@ -177,12 +177,10 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { @Override - public int partition(Object next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { - Tuple2<Long, String> tuple = (Tuple2<Long, String>) next; - + public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { assertEquals(expectedPartitions, numPartitions); - return (int) (tuple.f0 % numPartitions); + return (int) (next.f0 % numPartitions); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java index 9f8159c..9e3c33b 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java @@ -26,16 +26,16 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.transformations.SourceTransformation; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.test.util.ForkableFlinkMiniCluster; import org.apache.flink.util.InstantiationUtil; + import org.junit.AfterClass; import org.junit.BeforeClass; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,14 +44,18 @@ import java.io.Serializable; import java.util.Properties; import static org.apache.flink.test.util.TestUtils.tryExecute; - +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * A class containing a special Kafka broker which has a log retention of only 250 ms. * This way, we can make sure our consumer is properly handling cases where we run into out of offset * errors */ +@SuppressWarnings("serial") public class KafkaShortRetentionTestBase implements Serializable { + protected static final Logger LOG = LoggerFactory.getLogger(KafkaShortRetentionTestBase.class); + private static KafkaTestEnvironment kafkaServer; private static Properties standardProps; private static ForkableFlinkMiniCluster flink; @@ -108,7 +112,7 @@ public class KafkaShortRetentionTestBase implements Serializable { final String topic = "auto-offset-reset-test"; final int parallelism = 1; - final int elementsPerPartition = 50000; // with a sleep time of 1 ms per element, test should run for 50 s + final int elementsPerPartition = 50000; Properties tprops = new Properties(); tprops.setProperty("retention.ms", "250"); @@ -162,6 +166,7 @@ public class KafkaShortRetentionTestBase implements Serializable { kafkaServer.deleteTestTopic(topic); } + private class NonContinousOffsetsDeserializationSchema implements KeyedDeserializationSchema<String> { private int numJumps; long nextExpected = 0; @@ -205,12 +210,8 @@ public class KafkaShortRetentionTestBase implements Serializable { */ public void runFailOnAutoOffsetResetNone() throws Exception { final String topic = "auto-offset-reset-none-test"; - final int parallelism = 1; - final int elementsPerPartition = 50000; // with a sleep time of 1 ms per element, test should run for 50 s - final int totalElements = parallelism * elementsPerPartition; - - + kafkaServer.createTestTopic(topic, parallelism, 1); final StreamExecutionEnvironment env = @@ -218,8 +219,7 @@ public class KafkaShortRetentionTestBase implements Serializable { env.setParallelism(parallelism); env.setRestartStrategy(RestartStrategies.noRestart()); // fail immediately env.getConfig().disableSysoutLogging(); - - + // ----------- add consumer ---------- Properties customProps = new Properties(); @@ -245,4 +245,27 @@ public class KafkaShortRetentionTestBase implements Serializable { kafkaServer.deleteTestTopic(topic); } + public void runFailOnAutoOffsetResetNoneEager() throws Exception { + final String topic = "auto-offset-reset-none-test"; + final int parallelism = 1; + + kafkaServer.createTestTopic(topic, parallelism, 1); + + // ----------- add consumer ---------- + + Properties customProps = new Properties(); + customProps.putAll(standardProps); + customProps.setProperty("auto.offset.reset", "none"); // test that "none" leads to an exception + + try { + kafkaServer.getConsumer(topic, new SimpleStringSchema(), customProps); + fail("should fail with an exception"); + } + catch (IllegalArgumentException e) { + // expected + assertTrue(e.getMessage().contains("none")); + } + + kafkaServer.deleteTestTopic(topic); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java new file mode 100644 index 0000000..0e16263 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.junit.Test; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; + +import static org.junit.Assert.*; + +public class KafkaTopicPartitionTest { + + @Test + public void validateUid() { + Field uidField; + try { + uidField = KafkaTopicPartition.class.getDeclaredField("serialVersionUID"); + uidField.setAccessible(true); + } + catch (NoSuchFieldException e) { + fail("serialVersionUID is not defined"); + return; + } + + assertTrue(Modifier.isStatic(uidField.getModifiers())); + assertTrue(Modifier.isFinal(uidField.getModifiers())); + assertTrue(Modifier.isPrivate(uidField.getModifiers())); + + assertEquals(long.class, uidField.getType()); + + // the UID has to be constant to make sure old checkpoints/savepoints can be read + try { + assertEquals(722083576322742325L, uidField.getLong(null)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java index e94adb5..24822ed 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java @@ -36,30 +36,39 @@ public class JobManagerCommunicationUtils { public static void cancelCurrentJob(ActorGateway jobManager) throws Exception { + JobStatusMessage status = null; - // find the jobID - Future<Object> listResponse = jobManager.ask( - JobManagerMessages.getRequestRunningJobsStatus(), - askTimeout); - - List<JobStatusMessage> jobs; - try { - Object result = Await.result(listResponse, askTimeout); - jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages(); - } - catch (Exception e) { - throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e); - } + for (int i = 0; i < 200; i++) { + // find the jobID + Future<Object> listResponse = jobManager.ask( + JobManagerMessages.getRequestRunningJobsStatus(), + askTimeout); + + List<JobStatusMessage> jobs; + try { + Object result = Await.result(listResponse, askTimeout); + jobs = ((JobManagerMessages.RunningJobsStatus) result).getStatusMessages(); + } + catch (Exception e) { + throw new Exception("Could not cancel job - failed to retrieve running jobs from the JobManager.", e); + } - if (jobs.isEmpty()) { - throw new Exception("Could not cancel job - no running jobs"); - } - if (jobs.size() != 1) { - throw new Exception("Could not cancel job - more than one running job."); + if (jobs.isEmpty()) { + // try again, fall through the loop + Thread.sleep(50); + } + else if (jobs.size() == 1) { + status = jobs.get(0); + } + else { + throw new Exception("Could not cancel job - more than one running job."); + } } - JobStatusMessage status = jobs.get(0); - if (status.getJobState().isTerminalState()) { + if (status == null) { + throw new Exception("Could not cancel job - no running jobs"); + } + else if (status.getJobState().isTerminalState()) { throw new Exception("Could not cancel job - job is not running any more"); } http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java index 17e2e6f..e74eee4 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java @@ -28,7 +28,6 @@ import org.apache.flink.api.common.cache.DistributedCache; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.OperatorState; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueState; @@ -44,6 +43,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +@SuppressWarnings("deprecation") public class MockRuntimeContext extends StreamingRuntimeContext { private final int numberOfParallelSubtasks; @@ -57,15 +57,6 @@ public class MockRuntimeContext extends StreamingRuntimeContext { this.indexOfThisSubtask = indexOfThisSubtask; } - private static class MockStreamOperator extends AbstractStreamOperator<Integer> { - private static final long serialVersionUID = -1153976702711944427L; - - @Override - public ExecutionConfig getExecutionConfig() { - return new ExecutionConfig(); - } - } - @Override public boolean isCheckpointingEnabled() { return true; @@ -152,12 +143,12 @@ public class MockRuntimeContext extends StreamingRuntimeContext { } @Override - public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) { + public <S> org.apache.flink.api.common.state.OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) { throw new UnsupportedOperationException(); } @Override - public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) { + public <S> org.apache.flink.api.common.state.OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) { throw new UnsupportedOperationException(); } @@ -175,4 +166,15 @@ public class MockRuntimeContext extends StreamingRuntimeContext { public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) { throw new UnsupportedOperationException(); } + + // ------------------------------------------------------------------------ + + private static class MockStreamOperator extends AbstractStreamOperator<Integer> { + private static final long serialVersionUID = -1153976702711944427L; + + @Override + public ExecutionConfig getExecutionConfig() { + return new ExecutionConfig(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java index 4b17300..4388c9d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java @@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.streaming.api.watermark.Watermark; +import javax.annotation.Nullable; + /** * The {@code AssignerWithPeriodicWatermarks} assigns event time timestamps to elements, * and generates low watermarks that signal event time progress within the stream. @@ -71,5 +73,6 @@ public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> * * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit. */ + @Nullable Watermark getCurrentWatermark(); } http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java index 48f29b2..5b5694c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java @@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.functions; import org.apache.flink.streaming.api.watermark.Watermark; +import javax.annotation.Nullable; + /** * The {@code AssignerWithPunctuatedWatermarks} assigns event time timestamps to elements, * and generates low watermarks that signal event time progress within the stream. @@ -79,5 +81,6 @@ public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T * * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit. */ + @Nullable Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp); }
