This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4052fa5d072ee270c22efe8b334a162e430520fb Author: Arvid Heise <[email protected]> AuthorDate: Tue Dec 1 14:37:10 2020 +0100 [FLINK-20418][core] Fixing checkpointing of IteratorSourceReader. IteratorSourceReader fails checkpointing when assignment has not yet happened with NPE. Furthermore, if the last element of a split has been pulled just before checkpointing, then a split would be created with (to+1, to) leading to an IllegalArgumentException: 'from' must be <= 'to'. --- .../source/lib/util/IteratorSourceReader.java | 7 +- .../flink/streaming/util/StreamCollector.java | 146 +++++++++++++++++++++ .../source/lib/NumberSequenceSourceITCase.java | 63 +++++++++ 3 files changed, 215 insertions(+), 1 deletion(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java index 92ab054..d16e597 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java @@ -27,6 +27,7 @@ import javax.annotation.Nullable; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Queue; @@ -129,8 +130,12 @@ public class IteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends I @Override public List<SplitT> snapshotState(long checkpointId) { + if (remainingSplits == null) { + // no assignment yet + return Collections.emptyList(); + } final ArrayList<SplitT> allSplits = new ArrayList<>(1 + remainingSplits.size()); - if (iterator != null) { + if (iterator != null && iterator.hasNext()) { @SuppressWarnings("unchecked") final SplitT inProgressSplit = (SplitT) currentSplit.getUpdatedSplitForIterator(iterator); allSplits.add(inProgressSplit); diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollector.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollector.java new file mode 100644 index 0000000..5adb1a0 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/StreamCollector.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; + +import org.junit.rules.ExternalResource; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; + +/** + * A simple utility for collecting all the elements in a {@link DataStream}. + * <pre>{@code + * public class DataStreamTest { + * + * {@literal @}Rule + * public StreamCollector collector = new StreamCollector(); + * + * public void test() throws Exception { + * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + * DataStream<Integer> stream = env.fromElements(1, 2, 3); + * + * CompletableFuture<Collection<Integer>> results = collector.collect(stream); + * Assert.assertThat(results.get(), hasItems(1, 2, 3)); + * } + * } + * } + * </pre> + * + * <p><b>Note:</b> The stream collector assumes: + * 1) The stream is bounded. + * 2) All elements will fit in memory. + * 3) All tasks run within the same JVM. + */ +@SuppressWarnings("rawtypes") +public class StreamCollector extends ExternalResource { + + private static final AtomicLong counter = new AtomicLong(); + + private static final Map<Long, CountDownLatch> latches = new ConcurrentHashMap<>(); + + private static final Map<Long, Queue> resultQueues = new ConcurrentHashMap<>(); + + private List<Long> ids; + + @Override + protected void before() { + ids = new ArrayList<>(); + } + + /** + * @return A future that contains all the elements of the DataStream + * which completes when all elements have been processed. + */ + public <IN> CompletableFuture<Collection<IN>> collect(DataStream<IN> stream) { + final long id = counter.getAndIncrement(); + ids.add(id); + + int parallelism = stream.getParallelism(); + if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) { + parallelism = stream.getExecutionEnvironment().getParallelism(); + } + + CountDownLatch latch = new CountDownLatch(parallelism); + latches.put(id, latch); + + Queue<IN> results = new ConcurrentLinkedDeque<>(); + resultQueues.put(id, results); + + stream.addSink(new CollectingSink<>(id)); + + return CompletableFuture.runAsync(() -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException("Failed to collect results"); + } + }).thenApply(ignore -> results); + } + + @Override + protected void after() { + for (Long id : ids) { + latches.remove(id); + resultQueues.remove(id); + } + } + + private static class CollectingSink<IN> extends RichSinkFunction<IN> { + + private final long id; + + private transient CountDownLatch latch; + + private transient Queue<IN> results; + + private CollectingSink(long id) { + this.id = id; + } + + @Override + @SuppressWarnings("unchecked") + public void open(Configuration parameters) throws Exception { + latch = StreamCollector.latches.get(id); + results = (Queue<IN>) StreamCollector.resultQueues.get(id); + } + + @Override + public void invoke(IN value, Context context) throws Exception { + results.add(value); + } + + @Override + public void close() throws Exception { + latch.countDown(); + } + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java new file mode 100644 index 0000000..20f996a --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source.lib; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamCollector; + +import org.junit.Rule; +import org.junit.Test; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.stream.LongStream; + +import static org.junit.Assert.assertArrayEquals; + +/** + * Tests {@link NumberSequenceSource}. + */ +public class NumberSequenceSourceITCase { + @Rule + public StreamCollector collector = new StreamCollector(); + + @Test + public void testCheckpointingWithDelayedAssignment() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + env.enableCheckpointing(10, CheckpointingMode.EXACTLY_ONCE); + final SingleOutputStreamOperator<Long> stream = env + .fromSource( + new NumberSequenceSource(0, 100), + WatermarkStrategy.noWatermarks(), + "Sequence Source") + .map(x -> { + Thread.sleep(10); + return x; + }); + final CompletableFuture<Collection<Long>> result = collector.collect(stream); + env.execute(); + + assertArrayEquals(LongStream.rangeClosed(0, 100).boxed().toArray(), result.get().toArray()); + } +}
