tillrohrmann commented on a change in pull request #15557:
URL: https://github.com/apache/flink/pull/15557#discussion_r612582795
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
##########
@@ -340,6 +341,12 @@ public AkkaRpcServiceBuilder withBindPort(int bindPort) {
}
public AkkaRpcService createAndStart() throws Exception {
+ return createAndStart(AkkaRpcService::new);
+ }
+
+ public AkkaRpcService createAndStart(
Review comment:
I would suggest to update the commit message to say "... to support
instantiating custom AkkaRpcServices."
##########
File path:
flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGatewayAdapter;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.TriFunction;
+
+import akka.actor.ActorSystem;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A test suite for source enumerator (operator coordinator) for situations
where RPC calls for
+ * split assignments (operator events) fails from time to time.
+ */
+@SuppressWarnings("serial")
+public class OperatorEventSendingCheckpointITCase {
Review comment:
`extends TestLogger` is missing.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -216,13 +201,8 @@ public void signalNoMoreSplits(int subtask) {
// Ensure the split assignment is done by the the coordinator executor.
callInCoordinatorThread(
() -> {
- try {
- operatorCoordinatorContext.sendEvent(new
NoMoreSplitsEvent(), subtask);
- return null; // void return value
- } catch (TaskNotRunningException e) {
- throw new FlinkRuntimeException(
- "Failed to send 'NoMoreSplits' to reader " +
subtask, e);
- }
+ operatorCoordinatorContext.sendEvent(new
NoMoreSplitsEvent(), subtask);
Review comment:
Same here. Please ignore if this problem will be addressed in a future
PR.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##########
@@ -96,42 +94,44 @@ public boolean isShut() {
* (because it gets dropped through a call to {@link #reset()} or {@link
#resetForTask(int)},
* then the returned future till be completed exceptionally.
*/
- public CompletableFuture<Acknowledge> sendEvent(
- SerializedValue<OperatorEvent> event, int subtask) {
- synchronized (lock) {
- if (!shut) {
- return eventSender.apply(event, subtask);
- }
-
- final List<BlockedEvent> eventsForTask =
- blockedEvents.computeIfAbsent(subtask, (key) -> new
ArrayList<>());
- final CompletableFuture<Acknowledge> future = new
CompletableFuture<>();
- eventsForTask.add(new BlockedEvent(event, subtask, future));
- return future;
+ public void sendEvent(
+ SerializedValue<OperatorEvent> event,
+ int subtask,
+ CompletableFuture<Acknowledge> result) {
+ checkRunsInMainThread();
+
+ if (!shut) {
+ final CompletableFuture<Acknowledge> ack =
eventSender.apply(event, subtask);
+ FutureUtils.forward(ack, result);
+ return;
}
+
+ final List<BlockedEvent> eventsForTask =
+ blockedEvents.computeIfAbsent(subtask, (key) -> new
ArrayList<>());
+ eventsForTask.add(new BlockedEvent(event, subtask, result));
}
/**
* Shuts the value. All events sent through this valve are blocked until
the valve is re-opened.
* If the valve is already shut, this does nothing.
*/
Review comment:
The JavaDocs seem to be wrong.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayAdapter.java
##########
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rest.messages.LogInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo;
+import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest;
+import org.apache.flink.types.SerializableOptional;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A class that decorates/forwards calls to a {@link TaskExecutorGateway}.
+ *
+ * <p>This class is meant as a base for custom decorators, to avoid having to
maintain all the
+ * method overrides in each decorator.
+ */
+public class TaskExecutorGatewayAdapter implements TaskExecutorGateway {
Review comment:
nit: Maybe call it `TaskExecutorGatewayDecorator`
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -69,9 +68,9 @@
* <ul>
* <li>Events pass through a special channel, the {@link
OperatorEventValve}. If we are not
* currently triggering a checkpoint, then events simply pass through.
- * <li>Atomically, with the completion of the checkpoint future for the
coordinator, this operator
- * operator event valve is closed. Events coming after that are held
back (buffered), because
- * they belong to the epoch after the checkpoint.
+ * <li>With the completion of the checkpoint future for the coordinator,
this operator operator
Review comment:
operator operator -> operator
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##########
@@ -96,42 +94,44 @@ public boolean isShut() {
* (because it gets dropped through a call to {@link #reset()} or {@link
#resetForTask(int)},
* then the returned future till be completed exceptionally.
*/
- public CompletableFuture<Acknowledge> sendEvent(
- SerializedValue<OperatorEvent> event, int subtask) {
- synchronized (lock) {
- if (!shut) {
- return eventSender.apply(event, subtask);
- }
-
- final List<BlockedEvent> eventsForTask =
- blockedEvents.computeIfAbsent(subtask, (key) -> new
ArrayList<>());
- final CompletableFuture<Acknowledge> future = new
CompletableFuture<>();
- eventsForTask.add(new BlockedEvent(event, subtask, future));
- return future;
+ public void sendEvent(
+ SerializedValue<OperatorEvent> event,
+ int subtask,
+ CompletableFuture<Acknowledge> result) {
Review comment:
JavaDocs for the parameters are missing. Is there any contract which
thread should complete the `result` future?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -238,8 +241,11 @@ public void notifyCheckpointAborted(long checkpointId) {
@Override
public void resetToCheckpoint(long checkpointId, @Nullable byte[]
checkpointData)
throws Exception {
- // ideally we would like to check this here, however this method is
called early during
- // execution graph construction, before the main thread executor is set
+ // the first time this method is called is early during execution
graph construction,
+ // before the main thread executor is set. hence this conditional
check.
+ if (mainThreadExecutor != null) {
+ mainThreadExecutor.assertRunningInMainThread();
+ }
Review comment:
I think we no longer need the `OperatorCoordinatorHolder.lazyInitialize`
strictly speaking because the main thread executor is now known when creating
the `ExecutionGraph`. This is, however, a follow up refactoring task to clean
it up.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -195,12 +186,6 @@ public void assignSplits(SplitsAssignment<SplitT>
assignment) {
operatorCoordinatorContext.sendEvent(
new
AddSplitEvent<>(splits, splitSerializer),
id);
Review comment:
Same here. If this problem will be addressed in a future PR, then ignore
these comments.
##########
File path:
flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGatewayAdapter;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.TriFunction;
+
+import akka.actor.ActorSystem;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A test suite for source enumerator (operator coordinator) for situations
where RPC calls for
+ * split assignments (operator events) fails from time to time.
+ */
+@SuppressWarnings("serial")
Review comment:
Where are the serialization warnings coming from?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##########
@@ -46,25 +46,20 @@
private static final long NO_CHECKPOINT = Long.MIN_VALUE;
- private final Object lock = new Object();
-
- @GuardedBy("lock")
private final BiFunction<
SerializedValue<OperatorEvent>, Integer,
CompletableFuture<Acknowledge>>
eventSender;
- @GuardedBy("lock")
private final Map<Integer, List<BlockedEvent>> blockedEvents = new
LinkedHashMap<>();
- @GuardedBy("lock")
private long currentCheckpointId;
- @GuardedBy("lock")
private long lastCheckpointId;
- @GuardedBy("lock")
private boolean shut;
+ @Nullable private ComponentMainThreadExecutor mainThreadExecutor;
Review comment:
I think we should update the JavaDocs of this class. With the
introduction of the `mainThreadExecutor` this class is no longer thread safe
and assumes that certain calls are handled by the `mainThreadExecutor`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -147,16 +146,8 @@ public MetricGroup metricGroup() {
public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
callInCoordinatorThread(
() -> {
- try {
- operatorCoordinatorContext.sendEvent(
- new SourceEventWrapper(event), subtaskId);
- return null;
- } catch (TaskNotRunningException e) {
- throw new FlinkRuntimeException(
- String.format(
- "Failed to send event %s to subtask
%d", event, subtaskId),
- e);
- }
+ operatorCoordinatorContext.sendEvent(new
SourceEventWrapper(event), subtaskId);
Review comment:
I think this line can swallow a potential exception because we now only
return it via the returned `CompletableFuture`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##########
@@ -96,42 +94,44 @@ public boolean isShut() {
* (because it gets dropped through a call to {@link #reset()} or {@link
#resetForTask(int)},
* then the returned future till be completed exceptionally.
*/
- public CompletableFuture<Acknowledge> sendEvent(
- SerializedValue<OperatorEvent> event, int subtask) {
- synchronized (lock) {
- if (!shut) {
- return eventSender.apply(event, subtask);
- }
-
- final List<BlockedEvent> eventsForTask =
- blockedEvents.computeIfAbsent(subtask, (key) -> new
ArrayList<>());
- final CompletableFuture<Acknowledge> future = new
CompletableFuture<>();
- eventsForTask.add(new BlockedEvent(event, subtask, future));
- return future;
+ public void sendEvent(
+ SerializedValue<OperatorEvent> event,
+ int subtask,
+ CompletableFuture<Acknowledge> result) {
Review comment:
nit: Returning an ack future instead of passing a future into the method
could make the API a bit nicer. Of course, this would mean that we have to do
something like
```
CompletableFuture.supplyAsync(() -> sendEvent(),
executor).thenCompose(Functions.identity())
```
in order to get the future and execute `sendEvent` in the main thread.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java
##########
@@ -226,6 +225,12 @@ private static void failAllFutures(@Nullable
List<BlockedEvent> events) {
}
}
+ private void checkRunsInMainThread() {
Review comment:
I think the lazy initializations are no longer necessary because the
`JobMaster` is now a `PermanentlyFencedRpcEndpoint`.
##########
File path:
flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGatewayAdapter;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.function.TriFunction;
+
+import akka.actor.ActorSystem;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A test suite for source enumerator (operator coordinator) for situations
where RPC calls for
+ * split assignments (operator events) fails from time to time.
+ */
+@SuppressWarnings("serial")
+public class OperatorEventSendingCheckpointITCase {
+
+ private static final int PARALLELISM = 1;
+ private static MiniCluster flinkCluster;
+
+ @BeforeClass
+ public static void setupMiniClusterAndEnv() throws Exception {
+ flinkCluster = new MiniClusterWithRpcIntercepting(PARALLELISM);
+ flinkCluster.start();
+ TestStreamEnvironment.setAsContext(flinkCluster, PARALLELISM);
+ }
+
+ @AfterClass
+ public static void clearEnvAndStopMiniCluster() throws Exception {
+ TestStreamEnvironment.unsetAsContext();
+ if (flinkCluster != null) {
+ flinkCluster.close();
+ flinkCluster = null;
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // tests
+ // ------------------------------------------------------------------------
+
+ /**
+ * Every second assign split event is lost. Eventually, the enumerator
must recognize that an
+ * event was lost and trigger recovery to prevent data loss. Data loss
would manifest in a
+ * stalled test, because we could wait forever to collect the required
number of events back.
+ */
+ @Ignore // ignore for now, because this test fails due to FLINK-21996
+ @Test
+ public void testOperatorEventLostNoReaderFailure() throws Exception {
+ final int[] eventsToLose = new int[] {2, 4, 6};
+
+ OpEventRpcInterceptor.currentHandler =
+ new OperatorEventRpcHandler(
+ (task, operator, event, originalRpcHandler) ->
askTimeoutFuture(),
+ eventsToLose);
+
+ runTest(false);
+ }
+
+ /**
+ * First and third assign split events are lost. In the middle pf all
events being processed
Review comment:
middle of
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]