kezhuw commented on a change in pull request #15557:
URL: https://github.com/apache/flink/pull/15557#discussion_r611302632
##########
File path:
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
##########
@@ -71,40 +70,67 @@
*/
@Nullable private SplitT currentSplit;
- /** The remaining splits. Null means no splits have yet been assigned. */
- @Nullable private Queue<SplitT> remainingSplits;
+ /** The remaining splits that were assigned but not yet processed. */
+ private Queue<SplitT> remainingSplits;
Review comment:
`final` ?
##########
File path:
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
##########
@@ -71,40 +70,67 @@
*/
@Nullable private SplitT currentSplit;
- /** The remaining splits. Null means no splits have yet been assigned. */
- @Nullable private Queue<SplitT> remainingSplits;
+ /** The remaining splits that were assigned but not yet processed. */
+ private Queue<SplitT> remainingSplits;
+
+ private boolean noMoreSplits;
public IteratorSourceReader(SourceReaderContext context) {
this.context = checkNotNull(context);
this.availability = new CompletableFuture<>();
+ this.remainingSplits = new ArrayDeque<>();
}
// ------------------------------------------------------------------------
@Override
public void start() {
- // request a split only if we did not get one during restore
- if (remainingSplits == null) {
+ // request a split if we don't have one
+ if (remainingSplits.isEmpty()) {
context.sendSplitRequest();
}
}
@Override
public InputStatus pollNext(ReaderOutput<E> output) {
- if (iterator != null && iterator.hasNext()) {
- output.collect(iterator.next());
+ if (iterator != null) {
+ if (iterator.hasNext()) {
+ output.collect(iterator.next());
+ return InputStatus.MORE_AVAILABLE;
+ } else {
+ finishSplit();
+ }
+ }
+
+ return tryMoveToNextSplit();
+ }
+
+ private void finishSplit() {
+ iterator = null;
+ currentSplit = null;
+
+ // request another split if no other is left
+ // we do this only here in the finishSplit part to avoid requesting a
split
+ // whenever the reader is polled and doesn't currently have a split
+ if (remainingSplits.isEmpty() && !noMoreSplits) {
Review comment:
How about move this to `tryMoveToNextSplit` ?
##########
File path:
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
##########
@@ -71,40 +70,67 @@
*/
@Nullable private SplitT currentSplit;
- /** The remaining splits. Null means no splits have yet been assigned. */
- @Nullable private Queue<SplitT> remainingSplits;
+ /** The remaining splits that were assigned but not yet processed. */
+ private Queue<SplitT> remainingSplits;
+
+ private boolean noMoreSplits;
public IteratorSourceReader(SourceReaderContext context) {
this.context = checkNotNull(context);
this.availability = new CompletableFuture<>();
+ this.remainingSplits = new ArrayDeque<>();
}
// ------------------------------------------------------------------------
@Override
public void start() {
- // request a split only if we did not get one during restore
- if (remainingSplits == null) {
+ // request a split if we don't have one
+ if (remainingSplits.isEmpty()) {
context.sendSplitRequest();
}
}
@Override
public InputStatus pollNext(ReaderOutput<E> output) {
- if (iterator != null && iterator.hasNext()) {
- output.collect(iterator.next());
+ if (iterator != null) {
+ if (iterator.hasNext()) {
+ output.collect(iterator.next());
+ return InputStatus.MORE_AVAILABLE;
+ } else {
+ finishSplit();
+ }
+ }
+
+ return tryMoveToNextSplit();
+ }
+
+ private void finishSplit() {
+ iterator = null;
+ currentSplit = null;
+
+ // request another split if no other is left
+ // we do this only here in the finishSplit part to avoid requesting a
split
+ // whenever the reader is polled and doesn't currently have a split
+ if (remainingSplits.isEmpty() && !noMoreSplits) {
Review comment:
It should be more easy to evaluate `InputStatus.NOTHING_AVAILABLE`
clause part.
##########
File path:
flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGatewayAdapter;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.TriFunction;
+
+import akka.actor.ActorSystem;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A test suite for source enumerator (operator coordinator) for situations
where RPC calls for
+ * split assignments (operator events) fails from time to time.
+ */
+@SuppressWarnings("serial")
+public class OperatorEventSendingCheckpointITCase {
+
+ private static final int PARALLELISM = 1;
+ private static MiniCluster flinkCluster;
+
+ @BeforeClass
+ public static void setupMiniClusterAndEnv() throws Exception {
+ flinkCluster = new MiniClusterWithRpcIntercepting(PARALLELISM);
+ flinkCluster.start();
+ TestStreamEnvironment.setAsContext(flinkCluster, PARALLELISM);
+ }
+
+ @AfterClass
+ public static void clearEnvAndStopMiniCluster() throws Exception {
+ TestStreamEnvironment.unsetAsContext();
+ if (flinkCluster != null) {
+ flinkCluster.close();
+ flinkCluster = null;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // tests
+ // ------------------------------------------------------------------------
+
+ /**
+ * Every second assign split event is lost. Eventually, the enumerator
must recognize that an
+ * event was lost and trigger recovery to prevent data loss. Data loss
would manifest in a
+ * stalled test, because we could wait forever to collect the required
number of events back.
+ */
+ @Ignore // ignore for now, because this test fails due to FLINK-21996
+ @Test
+ public void testOperatorEventLostNoReaderFailure() throws Exception {
+ final int[] eventsToLose = new int[] {2, 4, 6};
+
+ OpEventRpcInterceptor.currentHandler =
+ new OperatorEventRpcHandler(
+ (task, operator, event, originalRpcHandler) ->
askTimeoutFuture(),
+ eventsToLose);
+
+ runTest(false);
+ }
+
+ /**
+ * First and third assign split events are lost. In the middle pf all
events being processed
+ * (which is after the second successful event delivery, the fourth
event), there is
+ * additionally a failure on the reader that triggers recovery.
+ */
+ @Ignore // ignore for now, because this test fails due to FLINK-21996
+ @Test
+ public void testOperatorEventLostWithReaderFailure() throws Exception {
+ final int[] eventsToLose = new int[] {1, 3};
+
+ OpEventRpcInterceptor.currentHandler =
+ new OperatorEventRpcHandler(
+ (task, operator, event, originalRpcHandler) ->
askTimeoutFuture(),
+ eventsToLose);
+
+ runTest(true);
+ }
+
+ /**
+ * This test the case that the enumerator must handle the case of
presumably lost splits that
+ * were actually delivered.
+ *
+ * <p>Some split assignment events happen normally, but for some their
acknowledgement never
+ * comes back. The enumerator must assume the assignments were
unsuccessful, even though the
+ * split assignment was received by the reader.
+ */
+ @Test
+ public void testOperatorEventAckLost() throws Exception {
+ final int[] eventsWithLostAck = new int[] {2, 4};
+
+ OpEventRpcInterceptor.currentHandler =
+ new OperatorEventRpcHandler(
+ (task, operator, event, originalRpcHandler) -> {
+ // forward call
+ originalRpcHandler.apply(task, operator, event);
+ // but return an ack future that times out to
simulate lost response
+ return askTimeoutFuture();
+ },
+ eventsWithLostAck);
+
+ runTest(false);
+ }
+
+ /**
+ * This tests the case where the status of an assignment remains unknown
across checkpoints.
+ *
+ * <p>Some split assignment events happen normally, but for some their
acknowledgement comes
+ * very late, so that we expect multiple checkpoints would have normally
happened in the
+ * meantime. We trigger a failure (which happens after the second split)
+ */
+ @Test
+ public void testOperatorEventAckDelay() throws Exception {
+ final int[] eventsWithLateAck = new int[] {2, 4};
+
+ OpEventRpcInterceptor.currentHandler =
+ new OperatorEventRpcHandler(
+ (task, operator, event, originalRpcHandler) -> {
+ // forward call
+ final CompletableFuture<Acknowledge> result =
+ originalRpcHandler.apply(task, operator,
event);
+ // but return an ack future that completes late,
after
+ // multiple checkpoints should have happened
+ final CompletableFuture<Acknowledge> late =
lateFuture();
+ return result.thenCompose((v) -> late);
+ },
+ eventsWithLateAck);
+
+ runTest(false);
+ }
+
+ /**
+ * Runs the test program, which uses a single reader (parallelism = 1) and
has three splits of
+ * data, to be assigned to the same reader.
+ *
+ * <p>If an intermittent failure should happen, it will happen after the
second split was
+ * assigned.
+ */
+ private void runTest(boolean intermittentFailure) throws Exception {
+ final int numElements = 100;
+ final int failAt = intermittentFailure ? numElements / 2 : numElements
* 2;
+
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(50);
+
+ final DataStream<Long> numbers =
+ env.fromSource(
+ new TestingNumberSequenceSource(1L,
numElements, 3),
+ WatermarkStrategy.noWatermarks(),
+ "numbers")
+ .map(
+ new MapFunction<Long, Long>() {
+ private int num;
+
+ @Override
+ public Long map(Long value) throws
Exception {
+ if (++num > failAt) {
+ throw new Exception("Artificial
intermittent failure.");
+ }
+ return value;
+ }
+ });
+
+ final List<Long> sequence = numbers.executeAndCollect(numElements);
+
+ final List<Long> expectedSequence =
+ LongStream.rangeClosed(1L,
numElements).boxed().collect(Collectors.toList());
+
+ assertEquals(expectedSequence, sequence);
+ }
+
+ private static CompletableFuture<Acknowledge> askTimeoutFuture() {
+ final CompletableFuture<Acknowledge> future = new
CompletableFuture<>();
+ FutureUtils.orTimeout(future, 500, TimeUnit.MILLISECONDS);
+ return future;
+ }
+
+ private static CompletableFuture<Acknowledge> lateFuture() {
+ final CompletableFuture<Acknowledge> future = new
CompletableFuture<>();
+ FutureUtils.completeDelayed(future, Acknowledge.get(),
Duration.ofMillis(500));
+ return future;
+ }
+
+ // ------------------------------------------------------------------------
+ // Specialized Source
+ // ------------------------------------------------------------------------
+
+ /**
+ * This is an enumerator for the {@link NumberSequenceSource}, which only
responds to the split
+ * requests after the next checkpoint is complete. That way, we naturally
draw the split
+ * processing across checkpoints without artificial sleep statements.
+ */
+ private static final class AssignAfterCheckpointEnumerator<
+ SplitT extends IteratorSourceSplit<?, ?>>
+ extends IteratorSourceEnumerator<SplitT> {
+
+ private final Queue<Integer> pendingRequests = new ArrayDeque<>();
+ private final SplitEnumeratorContext<?> context;
+
+ public AssignAfterCheckpointEnumerator(
+ SplitEnumeratorContext<SplitT> context, Collection<SplitT>
splits) {
+ super(context, splits);
+ this.context = context;
+ }
+
+ @Override
+ public void handleSplitRequest(int subtaskId, @Nullable String
requesterHostname) {
+ pendingRequests.add(subtaskId);
+ }
+
+ @Override
+ public Collection<SplitT> snapshotState() throws Exception {
+ // this will be enqueued in the enumerator thread, so it will
actually run after this
+ // method (the snapshot operation) is complete!
+ context.runInCoordinatorThread(this::fullFillPendingRequests);
+
+ return super.snapshotState();
+ }
+
+ private void fullFillPendingRequests() {
+ for (int subtask : pendingRequests) {
+ super.handleSplitRequest(subtask, null);
+ }
+ pendingRequests.clear();
+ }
+ }
+
+ private static class TestingNumberSequenceSource extends
NumberSequenceSource {
+
+ private final int numSplits;
+
+ public TestingNumberSequenceSource(long from, long to, int numSplits) {
+ super(from, to);
+ this.numSplits = numSplits;
+ }
+
+ @Override
+ public SplitEnumerator<NumberSequenceSplit,
Collection<NumberSequenceSplit>>
+ createEnumerator(final
SplitEnumeratorContext<NumberSequenceSplit> enumContext) {
+ final List<NumberSequenceSplit> splits =
+ splitNumberRange(getFrom(), getTo(), numSplits);
+ return new AssignAfterCheckpointEnumerator<>(enumContext, splits);
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utils for RPC intercepting cluster
+ // ------------------------------------------------------------------------
+
+ private static class OperatorEventRpcHandler {
+
+ private final FilteredRpcAction actionForFilteredEvent;
+ private final Set<Integer> eventsToFilter;
+ private int eventNum;
+
+ OperatorEventRpcHandler(FilteredRpcAction actionForFilteredEvent,
int... eventsToFilter) {
+ this(
+ actionForFilteredEvent,
+
IntStream.of(eventsToFilter).boxed().collect(Collectors.toSet()));
+ }
+
+ OperatorEventRpcHandler(
+ FilteredRpcAction actionForFilteredEvent, Set<Integer>
eventsToFilter) {
+ this.actionForFilteredEvent = actionForFilteredEvent;
+ this.eventsToFilter = eventsToFilter;
+ }
+
+ CompletableFuture<Acknowledge> filterCall(
+ ExecutionAttemptID task,
+ OperatorID operator,
+ SerializedValue<OperatorEvent> evt,
+ TriFunction<
+ ExecutionAttemptID,
+ OperatorID,
+ SerializedValue<OperatorEvent>,
+ CompletableFuture<Acknowledge>>
+ rpcHandler) {
+
+ final Object o;
+ try {
+ o = evt.deserializeValue(getClass().getClassLoader());
+ } catch (Exception e) {
+ throw new Error(e); // should never happen
+ }
+
+ if (o instanceof AddSplitEvent || o instanceof NoMoreSplitsEvent) {
Review comment:
I guess it might be better to shape all `OperatorEventRpcHandler`
components using one construction, say `builder`. It takes me some time to
understand what `eventsToLose` means. It is also a good to make `AddSplitEvent`
and `NoMoreSplitsEvent` explicitly to gain more context in first place. In this
way, when we want extending exactly once to no source coordinator, then this
part could be reused.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
##########
@@ -202,8 +202,7 @@ default void notifyCheckpointAborted(long checkpointId) {}
* target TaskManager. The future is completed exceptionally if the
event cannot be sent.
* That includes situations where the target task is not running.
*/
- CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int
targetSubtask)
- throws TaskNotRunningException;
+ CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt, int
targetSubtask);
Review comment:
It is good to not throw exception in both call stack and exceptional
future. But I am afraid whether there will be withdraw in `SubtaskGateway`. I
guess in that phase, a dated `SubtaskGateway` could throw
`TaskNotRunningException` after subtask failed/reset for earlier failing
possible active activities.
Anyway, I think it is at least safe to go with `Context.sendEvent`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -264,11 +266,12 @@ private void checkpointCoordinatorInternal(
result.completeExceptionally(e);
}
}
- });
+ },
+ mainThreadExecutor);
Review comment:
I guess javadoc section `Concurrency and Threading Model` should change
a bit.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##########
@@ -226,6 +225,12 @@ private static void failAllFutures(@Nullable
List<BlockedEvent> events) {
}
}
+ private void checkRunsInMainThread() {
Review comment:
I guess a similar assertion could be added to `resetToCheckpoint`. The
gotcha part to me is that `RpcEndpoint` is not constructed in its main thread
and then all these lazy initializations.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##########
@@ -140,76 +140,75 @@ public void markForCheckpoint(long checkpointId) {
* If the valve is already shut, this does nothing.
*/
public void shutValve(long checkpointId) {
- synchronized (lock) {
- if (checkpointId == currentCheckpointId) {
- shut = true;
- } else {
- throw new IllegalStateException(
- String.format(
- "Cannot shut valve for non-prepared
checkpoint. "
- + "Prepared checkpoint = %s,
attempting-to-close checkpoint = %d",
- (currentCheckpointId == NO_CHECKPOINT
- ? "(none)"
- : String.valueOf(currentCheckpointId)),
- checkpointId));
- }
+ checkRunsInMainThread();
+
+ if (checkpointId == currentCheckpointId) {
+ shut = true;
+ } else {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot shut valve for non-prepared checkpoint. "
+ + "Prepared checkpoint = %s,
attempting-to-close checkpoint = %d",
+ (currentCheckpointId == NO_CHECKPOINT
+ ? "(none)"
+ : String.valueOf(currentCheckpointId)),
+ checkpointId));
}
}
- /** Opens the value, releasing all buffered events. */
- public void openValveAndUnmarkCheckpoint() {
- final ArrayList<FuturePair> futures;
-
- // send all events under lock, so that no new event can sneak between
- synchronized (lock) {
- currentCheckpointId = NO_CHECKPOINT;
+ public void openValveAndUnmarkCheckpoint(long expectedCheckpointId) {
+ checkRunsInMainThread();
- if (!shut) {
- return;
- }
+ if (expectedCheckpointId != currentCheckpointId) {
+ throw new IllegalStateException(
+ String.format(
+ "Valve closed for different checkpoint: closed for
= %d, expected = %d",
+ currentCheckpointId, expectedCheckpointId));
+ }
+ openValveAndUnmarkCheckpoint();
+ }
- futures = new ArrayList<>(blockedEvents.size());
+ /** Opens the value, releasing all buffered events. */
+ public void openValveAndUnmarkCheckpoint() {
+ checkRunsInMainThread();
- for (List<BlockedEvent> eventsForTask : blockedEvents.values()) {
- for (BlockedEvent blockedEvent : eventsForTask) {
- final CompletableFuture<Acknowledge> ackFuture =
- eventSender.apply(blockedEvent.event,
blockedEvent.subtask);
- futures.add(new FuturePair(blockedEvent.future,
ackFuture));
- }
- }
- blockedEvents.clear();
- shut = false;
+ currentCheckpointId = NO_CHECKPOINT;
+ if (!shut) {
+ return;
}
- // apply the logic on the future outside the lock, to be safe
- for (FuturePair pair : futures) {
- FutureUtils.forward(pair.ackFuture, pair.originalFuture);
+ for (List<BlockedEvent> eventsForTask : blockedEvents.values()) {
+ for (BlockedEvent blockedEvent : eventsForTask) {
+ final CompletableFuture<Acknowledge> ackFuture =
+ eventSender.apply(blockedEvent.event,
blockedEvent.subtask);
+ FutureUtils.forward(ackFuture, blockedEvent.future);
+ }
}
+ blockedEvents.clear();
+ shut = false;
}
/** Drops all blocked events for a specific subtask. */
public void resetForTask(int subtask) {
- final List<BlockedEvent> events;
- synchronized (lock) {
- events = blockedEvents.remove(subtask);
- }
+ checkRunsInMainThread();
+ final List<BlockedEvent> events = blockedEvents.remove(subtask);
failAllFutures(events);
}
/** Resets the valve, dropping all blocked events and opening the valve. */
public void reset() {
+ checkRunsInMainThread();
+
final List<BlockedEvent> events = new ArrayList<>();
- synchronized (lock) {
- for (List<BlockedEvent> taskEvents : blockedEvents.values()) {
- if (taskEvents != null) {
- events.addAll(taskEvents);
- }
+ for (List<BlockedEvent> taskEvents : blockedEvents.values()) {
+ if (taskEvents != null) {
Review comment:
Is there any possibility for this to be `null` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]