kezhuw commented on a change in pull request #15557: URL: https://github.com/apache/flink/pull/15557#discussion_r612937053
########## File path: flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java ########## @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.minicluster.MiniClusterConfiguration; +import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration; +import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; +import org.apache.flink.runtime.source.event.AddSplitEvent; +import org.apache.flink.runtime.source.event.NoMoreSplitsEvent; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGatewayAdapter; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.TestStreamEnvironment; +import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.function.TriFunction; + +import akka.actor.ActorSystem; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.junit.Assert.assertEquals; + +/** + * A test suite for source enumerator (operator coordinator) for situations where RPC calls for + * split assignments (operator events) fails from time to time. + */ +@SuppressWarnings("serial") +public class OperatorEventSendingCheckpointITCase { + + private static final int PARALLELISM = 1; + private static MiniCluster flinkCluster; + + @BeforeClass + public static void setupMiniClusterAndEnv() throws Exception { + flinkCluster = new MiniClusterWithRpcIntercepting(PARALLELISM); + flinkCluster.start(); + TestStreamEnvironment.setAsContext(flinkCluster, PARALLELISM); + } + + @AfterClass + public static void clearEnvAndStopMiniCluster() throws Exception { + TestStreamEnvironment.unsetAsContext(); + if (flinkCluster != null) { + flinkCluster.close(); + flinkCluster = null; + } + } + + // ------------------------------------------------------------------------ + // tests + // ------------------------------------------------------------------------ + + /** + * Every second assign split event is lost. Eventually, the enumerator must recognize that an + * event was lost and trigger recovery to prevent data loss. Data loss would manifest in a + * stalled test, because we could wait forever to collect the required number of events back. + */ + @Ignore // ignore for now, because this test fails due to FLINK-21996 + @Test + public void testOperatorEventLostNoReaderFailure() throws Exception { + final int[] eventsToLose = new int[] {2, 4, 6}; + + OpEventRpcInterceptor.currentHandler = + new OperatorEventRpcHandler( + (task, operator, event, originalRpcHandler) -> askTimeoutFuture(), + eventsToLose); + + runTest(false); + } + + /** + * First and third assign split events are lost. In the middle pf all events being processed + * (which is after the second successful event delivery, the fourth event), there is + * additionally a failure on the reader that triggers recovery. + */ + @Ignore // ignore for now, because this test fails due to FLINK-21996 + @Test + public void testOperatorEventLostWithReaderFailure() throws Exception { + final int[] eventsToLose = new int[] {1, 3}; + + OpEventRpcInterceptor.currentHandler = + new OperatorEventRpcHandler( + (task, operator, event, originalRpcHandler) -> askTimeoutFuture(), + eventsToLose); + + runTest(true); + } + + /** + * This test the case that the enumerator must handle the case of presumably lost splits that + * were actually delivered. + * + * <p>Some split assignment events happen normally, but for some their acknowledgement never + * comes back. The enumerator must assume the assignments were unsuccessful, even though the + * split assignment was received by the reader. + */ + @Test + public void testOperatorEventAckLost() throws Exception { + final int[] eventsWithLostAck = new int[] {2, 4}; + + OpEventRpcInterceptor.currentHandler = + new OperatorEventRpcHandler( + (task, operator, event, originalRpcHandler) -> { + // forward call + originalRpcHandler.apply(task, operator, event); + // but return an ack future that times out to simulate lost response + return askTimeoutFuture(); + }, + eventsWithLostAck); + + runTest(false); + } + + /** + * This tests the case where the status of an assignment remains unknown across checkpoints. + * + * <p>Some split assignment events happen normally, but for some their acknowledgement comes + * very late, so that we expect multiple checkpoints would have normally happened in the + * meantime. We trigger a failure (which happens after the second split) + */ + @Test + public void testOperatorEventAckDelay() throws Exception { + final int[] eventsWithLateAck = new int[] {2, 4}; + + OpEventRpcInterceptor.currentHandler = + new OperatorEventRpcHandler( + (task, operator, event, originalRpcHandler) -> { + // forward call + final CompletableFuture<Acknowledge> result = + originalRpcHandler.apply(task, operator, event); + // but return an ack future that completes late, after + // multiple checkpoints should have happened + final CompletableFuture<Acknowledge> late = lateFuture(); + return result.thenCompose((v) -> late); + }, + eventsWithLateAck); + + runTest(false); + } + + /** + * Runs the test program, which uses a single reader (parallelism = 1) and has three splits of + * data, to be assigned to the same reader. + * + * <p>If an intermittent failure should happen, it will happen after the second split was + * assigned. + */ + private void runTest(boolean intermittentFailure) throws Exception { + final int numElements = 100; + final int failAt = intermittentFailure ? numElements / 2 : numElements * 2; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(50); + + final DataStream<Long> numbers = + env.fromSource( + new TestingNumberSequenceSource(1L, numElements, 3), + WatermarkStrategy.noWatermarks(), + "numbers") + .map( + new MapFunction<Long, Long>() { + private int num; + + @Override + public Long map(Long value) throws Exception { + if (++num > failAt) { + throw new Exception("Artificial intermittent failure."); + } + return value; + } + }); + + final List<Long> sequence = numbers.executeAndCollect(numElements); + + final List<Long> expectedSequence = + LongStream.rangeClosed(1L, numElements).boxed().collect(Collectors.toList()); + + assertEquals(expectedSequence, sequence); + } + + private static CompletableFuture<Acknowledge> askTimeoutFuture() { + final CompletableFuture<Acknowledge> future = new CompletableFuture<>(); + FutureUtils.orTimeout(future, 500, TimeUnit.MILLISECONDS); + return future; + } + + private static CompletableFuture<Acknowledge> lateFuture() { + final CompletableFuture<Acknowledge> future = new CompletableFuture<>(); + FutureUtils.completeDelayed(future, Acknowledge.get(), Duration.ofMillis(500)); + return future; + } + + // ------------------------------------------------------------------------ + // Specialized Source + // ------------------------------------------------------------------------ + + /** + * This is an enumerator for the {@link NumberSequenceSource}, which only responds to the split + * requests after the next checkpoint is complete. That way, we naturally draw the split + * processing across checkpoints without artificial sleep statements. + */ + private static final class AssignAfterCheckpointEnumerator< + SplitT extends IteratorSourceSplit<?, ?>> + extends IteratorSourceEnumerator<SplitT> { + + private final Queue<Integer> pendingRequests = new ArrayDeque<>(); + private final SplitEnumeratorContext<?> context; + + public AssignAfterCheckpointEnumerator( + SplitEnumeratorContext<SplitT> context, Collection<SplitT> splits) { + super(context, splits); + this.context = context; + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + pendingRequests.add(subtaskId); + } + + @Override + public Collection<SplitT> snapshotState() throws Exception { + // this will be enqueued in the enumerator thread, so it will actually run after this + // method (the snapshot operation) is complete! + context.runInCoordinatorThread(this::fullFillPendingRequests); + + return super.snapshotState(); + } + + private void fullFillPendingRequests() { + for (int subtask : pendingRequests) { + super.handleSplitRequest(subtask, null); + } + pendingRequests.clear(); + } + } + + private static class TestingNumberSequenceSource extends NumberSequenceSource { + + private final int numSplits; + + public TestingNumberSequenceSource(long from, long to, int numSplits) { + super(from, to); + this.numSplits = numSplits; + } + + @Override + public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>> + createEnumerator(final SplitEnumeratorContext<NumberSequenceSplit> enumContext) { + final List<NumberSequenceSplit> splits = + splitNumberRange(getFrom(), getTo(), numSplits); + return new AssignAfterCheckpointEnumerator<>(enumContext, splits); + } + } + + // ------------------------------------------------------------------------ + // Utils for RPC intercepting cluster + // ------------------------------------------------------------------------ + + private static class OperatorEventRpcHandler { + + private final FilteredRpcAction actionForFilteredEvent; + private final Set<Integer> eventsToFilter; + private int eventNum; + + OperatorEventRpcHandler(FilteredRpcAction actionForFilteredEvent, int... eventsToFilter) { + this( + actionForFilteredEvent, + IntStream.of(eventsToFilter).boxed().collect(Collectors.toSet())); + } + + OperatorEventRpcHandler( + FilteredRpcAction actionForFilteredEvent, Set<Integer> eventsToFilter) { + this.actionForFilteredEvent = actionForFilteredEvent; + this.eventsToFilter = eventsToFilter; + } + + CompletableFuture<Acknowledge> filterCall( + ExecutionAttemptID task, + OperatorID operator, + SerializedValue<OperatorEvent> evt, + TriFunction< + ExecutionAttemptID, + OperatorID, + SerializedValue<OperatorEvent>, + CompletableFuture<Acknowledge>> + rpcHandler) { + + final Object o; + try { + o = evt.deserializeValue(getClass().getClassLoader()); + } catch (Exception e) { + throw new Error(e); // should never happen + } + + if (o instanceof AddSplitEvent || o instanceof NoMoreSplitsEvent) { Review comment: I see that `Source Operator Event specific intercepting`. Initially, I thought it might be a good to make `AddSplitEvent` and `NoMoreSplitsEvent` explicitly in construction side of `OperatorEventRpcHandler` so readers can get more insights in first place. I am ok to either approaches given that we only intercept these two events in long time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org