lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r934612070
##########
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java:
##########
@@ -111,30 +108,21 @@
* <h2>2. Exactly-once alignment between multiple Coordinators</h2>
*
* <p>After a coordinator completed its checkpoint future, all events sent
after that must be held
- * back until the checkpoint barriers have been sent to the sources. That is
because from the
- * coordinator's perspective, the events are after the checkpoint, so they
must also be after the
- * checkpoint from the source task's perspective.
+ * back until its subtasks completed their checkpoint. That is because from
the coordinator's
+ * perspective, the events are after the checkpoint, so they must also be
after the checkpoint from
+ * the subtask's perspective.
*
* <p>When multiple coordinators exist, there are time spans during which some
coordinators finished
* their checkpoints, but others did not yet, and hence the source checkpoint
barriers are not yet
* injected (that happens only once all coordinators are done with their
checkpoint). The events
- * from the earlier coordinators must be blocked until all coordinators
finished their checkpoints
- * and the source checkpoint barriers are injected.
- *
- * <p>In the example below, the events {@code c & d} must be held back until
after the barrier
- * injection.
- *
- * <pre>
- * Coordinator one events: => a . . b . |trigger| . . |complete| . . c . . d .
|barrier| . e . f
- * Coordinator two events: => . . x . . |trigger| . . . . . . . . .
.|complete||barrier| . . y . . z
- * </pre>
+ * from the earlier coordinators must be blocked until all coordinators
finished their checkpoints ,
Review Comment:
`checkpoints ,` -> `checkpoints,`
And `finished` seems to be inconsistent with `complete` in the next line.
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##########
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for
operator events sent around
+ * checkpoint. This class is an extension to {@link
CoordinatorEventsExactlyOnceITCase}, further
+ * verifying the exactly-once semantics of events in the following conditions:
+ *
+ * <h2>Stream operator recipient</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on
verifying the
+ * correctness of operator coordinator's behavior. It uses a custom {@link
AbstractInvokable}
+ * subclass that mocks the behavior of coordinator's recipients. This test
class uses actual stream
+ * operators as the recipient of coordinator events, verifying that stream
operators can correctly
+ * handle received operator events and inform coordinators of completing
checkpoint.
+ *
+ * <h2>Non-source stream task</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are
executed independently
+ * of each other. They do not have the upstream-downstream relationship as
operators usually do in a
+ * streaming job, and thus both of them are treated as source tasks. This test
class further
+ * verifies situations when the tested operators are not sources, which means
when checkpoint
+ * barriers are injected into sources, these operators may not have started
checkpoint yet.
+ *
+ * <h2>Unaligned checkpoint</h2>
+ *
+ * <p>This class tests both aligned and unaligned checkpoints to verify that
the correctness of the
+ * event delivery behavior around checkpoint is not affected by this condition.
+ *
+ * <h2>Non-global failure</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, failures occur at the
coordinators' side, so
+ * they will cause the whole Flink job to fail over. In this class, test cases
are added when there
+ * might only be fail-overs on the subtasks' side, while the coordinators are
not affected. In this
+ * case the production infrastructure code needs to work together with user
code (implementations
+ * inside the coordinator and operator subclass) to ensure the exactly-once
semantics of operator
+ * event delivery.
+ */
+public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
+ extends CoordinatorEventsExactlyOnceITCase {
+ private static final AtomicBoolean shouldCloseSource = new
AtomicBoolean(false);
+
+ private static final int numEvents = 100;
+
+ private static final int delay = 1;
+
+ private StreamExecutionEnvironment env;
+
+ @Before
+ public void setup() {
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(100);
+ shouldCloseSource.set(false);
+ }
+
+ @Test
+ public void testCoordinatorSendEvents() throws Exception {
+ executeAndVerifyResults(
+ env, new EventReceivingOperatorFactory<>("eventReceiving",
numEvents, delay));
+ }
+
+ @Test
+ public void testUnalignedCheckpoint() throws Exception {
+ env.getCheckpointConfig().enableUnalignedCheckpoints();
+ executeAndVerifyResults(
+ env, new EventReceivingOperatorFactory<>("eventReceiving",
numEvents, delay));
+ }
+
+ @Test
+ public void testFailingCheckpoint() throws Exception {
+ executeAndVerifyResults(
+ env, new
FailingCheckpointOperatorFactory<>("failingCheckpoint", numEvents, delay));
+ }
+
+ private void executeAndVerifyResults(
+ StreamExecutionEnvironment env,
OneInputStreamOperatorFactory<Long, Long> factory)
+ throws Exception {
+ // Differs the parallelism of source operators from that of tested
operators, so that when
+ // checkpoint barriers are injected into sources, the tested operators
have not started
+ // checkpoint yet.
+ DataStream<Long> stream =
+ env.addSource(new IdlingSourceFunction<>(),
TypeInformation.of(Long.class))
+ .setParallelism(2);
+ stream =
+ stream.transform("eventReceiving",
TypeInformation.of(Long.class), factory)
+ .setParallelism(1);
+ stream.addSink(new DiscardingSink<>());
+
+ JobExecutionResult executionResult =
+ MINI_CLUSTER
+ .getMiniCluster()
+
.executeJobBlocking(env.getStreamGraph().getJobGraph());
+
+ long count =
executionResult.getAccumulatorResult(EventReceivingOperator.COUNTER_NAME);
+ assertThat(count).isEqualTo(numEvents);
+ }
+
+ /** A mock source function that does not collect any stream record and
finishes on demand. */
+ private static class IdlingSourceFunction<T> extends RichSourceFunction<T>
+ implements ParallelSourceFunction<T> {
+ private boolean isCancelled = false;
+
+ @Override
+ public void run(SourceContext<T> ctx) throws Exception {
+ while (!isCancelled && !shouldCloseSource.get()) {
+ Thread.sleep(100);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isCancelled = true;
+ }
+ }
+
+ /**
+ * A wrapper operator factory for {@link EventReceivingOperator} and {@link
+ * EventSendingCoordinator}.
+ */
+ private static class EventReceivingOperatorFactory<IN, OUT>
+ extends AbstractStreamOperatorFactory<OUT>
+ implements CoordinatedOperatorFactory<OUT>,
OneInputStreamOperatorFactory<IN, OUT> {
+ private final String name;
+ protected final int numEvents;
Review Comment:
Can you improve the readability/consistency of the code here?
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##########
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for
operator events sent around
+ * checkpoint. This class is an extension to {@link
CoordinatorEventsExactlyOnceITCase}, further
+ * verifying the exactly-once semantics of events in the following conditions:
+ *
+ * <h2>Stream operator recipient</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on
verifying the
+ * correctness of operator coordinator's behavior. It uses a custom {@link
AbstractInvokable}
+ * subclass that mocks the behavior of coordinator's recipients. This test
class uses actual stream
+ * operators as the recipient of coordinator events, verifying that stream
operators can correctly
+ * handle received operator events and inform coordinators of completing
checkpoint.
+ *
+ * <h2>Non-source stream task</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are
executed independently
+ * of each other. They do not have the upstream-downstream relationship as
operators usually do in a
+ * streaming job, and thus both of them are treated as source tasks. This test
class further
+ * verifies situations when the tested operators are not sources, which means
when checkpoint
+ * barriers are injected into sources, these operators may not have started
checkpoint yet.
+ *
+ * <h2>Unaligned checkpoint</h2>
+ *
+ * <p>This class tests both aligned and unaligned checkpoints to verify that
the correctness of the
+ * event delivery behavior around checkpoint is not affected by this condition.
+ *
+ * <h2>Non-global failure</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, failures occur at the
coordinators' side, so
+ * they will cause the whole Flink job to fail over. In this class, test cases
are added when there
+ * might only be fail-overs on the subtasks' side, while the coordinators are
not affected. In this
+ * case the production infrastructure code needs to work together with user
code (implementations
+ * inside the coordinator and operator subclass) to ensure the exactly-once
semantics of operator
+ * event delivery.
+ */
+public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
+ extends CoordinatorEventsExactlyOnceITCase {
+ private static final AtomicBoolean shouldCloseSource = new
AtomicBoolean(false);
+
+ private static final int numEvents = 100;
+
+ private static final int delay = 1;
+
+ private StreamExecutionEnvironment env;
+
+ @Before
+ public void setup() {
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(100);
+ shouldCloseSource.set(false);
+ }
+
+ @Test
+ public void testCoordinatorSendEvents() throws Exception {
+ executeAndVerifyResults(
+ env, new EventReceivingOperatorFactory<>("eventReceiving",
numEvents, delay));
+ }
+
+ @Test
+ public void testUnalignedCheckpoint() throws Exception {
+ env.getCheckpointConfig().enableUnalignedCheckpoints();
+ executeAndVerifyResults(
+ env, new EventReceivingOperatorFactory<>("eventReceiving",
numEvents, delay));
+ }
+
+ @Test
+ public void testFailingCheckpoint() throws Exception {
+ executeAndVerifyResults(
+ env, new
FailingCheckpointOperatorFactory<>("failingCheckpoint", numEvents, delay));
+ }
+
+ private void executeAndVerifyResults(
+ StreamExecutionEnvironment env,
OneInputStreamOperatorFactory<Long, Long> factory)
+ throws Exception {
+ // Differs the parallelism of source operators from that of tested
operators, so that when
+ // checkpoint barriers are injected into sources, the tested operators
have not started
+ // checkpoint yet.
+ DataStream<Long> stream =
+ env.addSource(new IdlingSourceFunction<>(),
TypeInformation.of(Long.class))
+ .setParallelism(2);
+ stream =
+ stream.transform("eventReceiving",
TypeInformation.of(Long.class), factory)
+ .setParallelism(1);
+ stream.addSink(new DiscardingSink<>());
+
+ JobExecutionResult executionResult =
+ MINI_CLUSTER
+ .getMiniCluster()
+
.executeJobBlocking(env.getStreamGraph().getJobGraph());
+
+ long count =
executionResult.getAccumulatorResult(EventReceivingOperator.COUNTER_NAME);
+ assertThat(count).isEqualTo(numEvents);
+ }
+
+ /** A mock source function that does not collect any stream record and
finishes on demand. */
+ private static class IdlingSourceFunction<T> extends RichSourceFunction<T>
+ implements ParallelSourceFunction<T> {
+ private boolean isCancelled = false;
+
+ @Override
+ public void run(SourceContext<T> ctx) throws Exception {
+ while (!isCancelled && !shouldCloseSource.get()) {
+ Thread.sleep(100);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isCancelled = true;
+ }
+ }
+
+ /**
+ * A wrapper operator factory for {@link EventReceivingOperator} and {@link
+ * EventSendingCoordinator}.
+ */
+ private static class EventReceivingOperatorFactory<IN, OUT>
+ extends AbstractStreamOperatorFactory<OUT>
+ implements CoordinatedOperatorFactory<OUT>,
OneInputStreamOperatorFactory<IN, OUT> {
+ private final String name;
+ protected final int numEvents;
+ private final int delay;
+
+ public EventReceivingOperatorFactory(String name, int numEvents, int
delay) {
+ this.name = name;
+ this.numEvents = numEvents;
+ this.delay = delay;
+ }
+
+ @Override
+ public OperatorCoordinator.Provider getCoordinatorProvider(
+ String operatorName, OperatorID operatorID) {
+ return new OperatorCoordinator.Provider() {
+
+ @Override
+ public OperatorID getOperatorId() {
+ return operatorID;
+ }
+
+ @Override
+ public OperatorCoordinator create(OperatorCoordinator.Context
context) {
+ return new EventSendingCoordinator(context, name,
numEvents, delay);
+ }
+ };
+ }
+
+ @Override
+ public <T extends StreamOperator<OUT>> T createStreamOperator(
+ StreamOperatorParameters<OUT> parameters) {
+ EventReceivingOperator<OUT> operator = new
EventReceivingOperator<>();
+ operator.setup(
+ parameters.getContainingTask(),
+ parameters.getStreamConfig(),
+ parameters.getOutput());
+ parameters
+ .getOperatorEventDispatcher()
+
.registerEventHandler(parameters.getStreamConfig().getOperatorID(), operator);
+ return (T) operator;
+ }
+
+ @Override
+ public Class<? extends StreamOperator>
getStreamOperatorClass(ClassLoader classLoader) {
+ return EventReceivingOperator.class;
+ }
+ }
+
+ /**
+ * The stream operator that receives the events and accumulates the
numbers. The task is
+ * stateful and checkpoints the accumulator.
+ */
+ private static class EventReceivingOperator<T> extends
AbstractStreamOperator<T>
+ implements OneInputStreamOperator<T, T>, OperatorEventHandler {
+ protected static final String COUNTER_NAME = "numEvents";
+
+ protected final LongCounter counter = new LongCounter();
+
+ protected ListState<Long> state;
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ getRuntimeContext().addAccumulator(COUNTER_NAME, counter);
+ }
+
+ @Override
+ public void processElement(StreamRecord<T> element) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void handleOperatorEvent(OperatorEvent evt) {
+ if (evt instanceof IntegerEvent) {
+ counter.add(1L);
+ } else if (evt instanceof EndEvent) {
+ try {
+
state.update(Collections.singletonList(counter.getLocalValue()));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ shouldCloseSource.set(true);
+ } else {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws
Exception {
+ super.snapshotState(context);
+ state.update(Collections.singletonList(counter.getLocalValue()));
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+
+ state =
+ context.getOperatorStateStore()
+ .getListState(
+ new ListStateDescriptor<>(
+ "counterState",
BasicTypeInfo.LONG_TYPE_INFO));
+
+ counter.resetLocal();
+ Iterator<Long> iterator = state.get().iterator();
+ if (iterator.hasNext()) {
+ counter.add(iterator.next());
+ }
+ Preconditions.checkArgument(!iterator.hasNext());
+
+ // signal the coordinator to start
+ getContainingTask()
+ .getEnvironment()
+ .getOperatorCoordinatorEventGateway()
+ .sendOperatorEventToCoordinator(
+ getOperatorID(),
+ new SerializedValue<>(
+ new
StartEvent(counter.getLocalValue().intValue() - 1)));
+ }
+ }
+
+ /**
+ * A wrapper operator factory for {@link FailingCheckpointOperator} and
{@link
+ * EventSendingCoordinator}.
+ */
+ private static class FailingCheckpointOperatorFactory<IN, OUT>
+ extends EventReceivingOperatorFactory<IN, OUT> {
+ public FailingCheckpointOperatorFactory(String name, int numEvents,
int delay) {
+ super(name, numEvents, delay);
+ }
+
+ @Override
+ public <T extends StreamOperator<OUT>> T createStreamOperator(
+ StreamOperatorParameters<OUT> parameters) {
+ EventReceivingOperator<OUT> operator = new
FailingCheckpointOperator<>(numEvents);
+ operator.setup(
+ parameters.getContainingTask(),
+ parameters.getStreamConfig(),
+ parameters.getOutput());
+ parameters
+ .getOperatorEventDispatcher()
+
.registerEventHandler(parameters.getStreamConfig().getOperatorID(), operator);
+ return (T) operator;
+ }
+ }
+
+ /**
+ * A subclass of {@link EventReceivingOperator} whose subtask would fail
during a specific
+ * checkpoint.
+ */
+ private static class FailingCheckpointOperator<T> extends
EventReceivingOperator<T> {
+
+ private final int failAtCheckpointAfterMessage;
+
+ private TestScript testScript;
+
+ private FailingCheckpointOperator(int numEvents) {
+ this.failAtCheckpointAfterMessage =
+ numEvents * 2 / 3 + new Random().nextInt(numEvents / 6);
+ }
+
+ @Override
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<T>> output) {
+ super.setup(containingTask, config, output);
+ if (containingTask.getIndexInSubtaskGroup() == 0) {
+ this.testScript = TestScript.getForOperator(getOperatorID() +
"-subtask0");
+ } else {
+ this.testScript = null;
+ }
+ }
+
+ @Override
+ public void handleOperatorEvent(OperatorEvent evt) {
+ if (evt instanceof IntegerEvent) {
+ counter.add(1L);
+ } else if (evt instanceof EndEvent) {
+ try {
+
state.update(Collections.singletonList(counter.getLocalValue()));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (testScript.hasAlreadyFailed()) {
Review Comment:
Is it possible that `testScript == null` here?
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##########
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for
operator events sent around
+ * checkpoint. This class is an extension to {@link
CoordinatorEventsExactlyOnceITCase}, further
+ * verifying the exactly-once semantics of events in the following conditions:
+ *
+ * <h2>Stream operator recipient</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on
verifying the
+ * correctness of operator coordinator's behavior. It uses a custom {@link
AbstractInvokable}
+ * subclass that mocks the behavior of coordinator's recipients. This test
class uses actual stream
+ * operators as the recipient of coordinator events, verifying that stream
operators can correctly
+ * handle received operator events and inform coordinators of completing
checkpoint.
+ *
+ * <h2>Non-source stream task</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are
executed independently
+ * of each other. They do not have the upstream-downstream relationship as
operators usually do in a
+ * streaming job, and thus both of them are treated as source tasks. This test
class further
+ * verifies situations when the tested operators are not sources, which means
when checkpoint
+ * barriers are injected into sources, these operators may not have started
checkpoint yet.
+ *
+ * <h2>Unaligned checkpoint</h2>
+ *
+ * <p>This class tests both aligned and unaligned checkpoints to verify that
the correctness of the
+ * event delivery behavior around checkpoint is not affected by this condition.
+ *
+ * <h2>Non-global failure</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, failures occur at the
coordinators' side, so
+ * they will cause the whole Flink job to fail over. In this class, test cases
are added when there
+ * might only be fail-overs on the subtasks' side, while the coordinators are
not affected. In this
+ * case the production infrastructure code needs to work together with user
code (implementations
+ * inside the coordinator and operator subclass) to ensure the exactly-once
semantics of operator
+ * event delivery.
+ */
+public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
+ extends CoordinatorEventsExactlyOnceITCase {
+ private static final AtomicBoolean shouldCloseSource = new
AtomicBoolean(false);
+
+ private static final int numEvents = 100;
+
+ private static final int delay = 1;
+
+ private StreamExecutionEnvironment env;
+
+ @Before
+ public void setup() {
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(100);
+ shouldCloseSource.set(false);
+ }
+
+ @Test
+ public void testCoordinatorSendEvents() throws Exception {
+ executeAndVerifyResults(
+ env, new EventReceivingOperatorFactory<>("eventReceiving",
numEvents, delay));
+ }
+
+ @Test
+ public void testUnalignedCheckpoint() throws Exception {
+ env.getCheckpointConfig().enableUnalignedCheckpoints();
+ executeAndVerifyResults(
+ env, new EventReceivingOperatorFactory<>("eventReceiving",
numEvents, delay));
+ }
+
+ @Test
+ public void testFailingCheckpoint() throws Exception {
+ executeAndVerifyResults(
+ env, new
FailingCheckpointOperatorFactory<>("failingCheckpoint", numEvents, delay));
+ }
+
+ private void executeAndVerifyResults(
+ StreamExecutionEnvironment env,
OneInputStreamOperatorFactory<Long, Long> factory)
+ throws Exception {
+ // Differs the parallelism of source operators from that of tested
operators, so that when
+ // checkpoint barriers are injected into sources, the tested operators
have not started
+ // checkpoint yet.
+ DataStream<Long> stream =
+ env.addSource(new IdlingSourceFunction<>(),
TypeInformation.of(Long.class))
+ .setParallelism(2);
+ stream =
+ stream.transform("eventReceiving",
TypeInformation.of(Long.class), factory)
+ .setParallelism(1);
+ stream.addSink(new DiscardingSink<>());
+
+ JobExecutionResult executionResult =
+ MINI_CLUSTER
+ .getMiniCluster()
+
.executeJobBlocking(env.getStreamGraph().getJobGraph());
+
+ long count =
executionResult.getAccumulatorResult(EventReceivingOperator.COUNTER_NAME);
+ assertThat(count).isEqualTo(numEvents);
+ }
+
+ /** A mock source function that does not collect any stream record and
finishes on demand. */
+ private static class IdlingSourceFunction<T> extends RichSourceFunction<T>
+ implements ParallelSourceFunction<T> {
+ private boolean isCancelled = false;
+
+ @Override
+ public void run(SourceContext<T> ctx) throws Exception {
+ while (!isCancelled && !shouldCloseSource.get()) {
+ Thread.sleep(100);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isCancelled = true;
+ }
+ }
+
+ /**
+ * A wrapper operator factory for {@link EventReceivingOperator} and {@link
+ * EventSendingCoordinator}.
+ */
+ private static class EventReceivingOperatorFactory<IN, OUT>
+ extends AbstractStreamOperatorFactory<OUT>
+ implements CoordinatedOperatorFactory<OUT>,
OneInputStreamOperatorFactory<IN, OUT> {
+ private final String name;
+ protected final int numEvents;
+ private final int delay;
+
+ public EventReceivingOperatorFactory(String name, int numEvents, int
delay) {
+ this.name = name;
+ this.numEvents = numEvents;
+ this.delay = delay;
+ }
+
+ @Override
+ public OperatorCoordinator.Provider getCoordinatorProvider(
+ String operatorName, OperatorID operatorID) {
+ return new OperatorCoordinator.Provider() {
+
+ @Override
+ public OperatorID getOperatorId() {
+ return operatorID;
+ }
+
+ @Override
+ public OperatorCoordinator create(OperatorCoordinator.Context
context) {
+ return new EventSendingCoordinator(context, name,
numEvents, delay);
+ }
+ };
+ }
+
+ @Override
+ public <T extends StreamOperator<OUT>> T createStreamOperator(
+ StreamOperatorParameters<OUT> parameters) {
+ EventReceivingOperator<OUT> operator = new
EventReceivingOperator<>();
+ operator.setup(
+ parameters.getContainingTask(),
+ parameters.getStreamConfig(),
+ parameters.getOutput());
+ parameters
+ .getOperatorEventDispatcher()
+
.registerEventHandler(parameters.getStreamConfig().getOperatorID(), operator);
+ return (T) operator;
+ }
+
+ @Override
+ public Class<? extends StreamOperator>
getStreamOperatorClass(ClassLoader classLoader) {
+ return EventReceivingOperator.class;
+ }
+ }
+
+ /**
+ * The stream operator that receives the events and accumulates the
numbers. The task is
+ * stateful and checkpoints the accumulator.
+ */
+ private static class EventReceivingOperator<T> extends
AbstractStreamOperator<T>
+ implements OneInputStreamOperator<T, T>, OperatorEventHandler {
+ protected static final String COUNTER_NAME = "numEvents";
+
+ protected final LongCounter counter = new LongCounter();
+
+ protected ListState<Long> state;
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ getRuntimeContext().addAccumulator(COUNTER_NAME, counter);
+ }
+
+ @Override
+ public void processElement(StreamRecord<T> element) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void handleOperatorEvent(OperatorEvent evt) {
+ if (evt instanceof IntegerEvent) {
+ counter.add(1L);
+ } else if (evt instanceof EndEvent) {
+ try {
+
state.update(Collections.singletonList(counter.getLocalValue()));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ shouldCloseSource.set(true);
+ } else {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws
Exception {
+ super.snapshotState(context);
+ state.update(Collections.singletonList(counter.getLocalValue()));
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+
+ state =
+ context.getOperatorStateStore()
+ .getListState(
+ new ListStateDescriptor<>(
+ "counterState",
BasicTypeInfo.LONG_TYPE_INFO));
+
+ counter.resetLocal();
+ Iterator<Long> iterator = state.get().iterator();
+ if (iterator.hasNext()) {
+ counter.add(iterator.next());
+ }
+ Preconditions.checkArgument(!iterator.hasNext());
+
+ // signal the coordinator to start
+ getContainingTask()
+ .getEnvironment()
+ .getOperatorCoordinatorEventGateway()
+ .sendOperatorEventToCoordinator(
+ getOperatorID(),
+ new SerializedValue<>(
+ new
StartEvent(counter.getLocalValue().intValue() - 1)));
+ }
+ }
+
+ /**
+ * A wrapper operator factory for {@link FailingCheckpointOperator} and
{@link
+ * EventSendingCoordinator}.
+ */
+ private static class FailingCheckpointOperatorFactory<IN, OUT>
+ extends EventReceivingOperatorFactory<IN, OUT> {
+ public FailingCheckpointOperatorFactory(String name, int numEvents,
int delay) {
+ super(name, numEvents, delay);
+ }
+
+ @Override
+ public <T extends StreamOperator<OUT>> T createStreamOperator(
+ StreamOperatorParameters<OUT> parameters) {
+ EventReceivingOperator<OUT> operator = new
FailingCheckpointOperator<>(numEvents);
+ operator.setup(
+ parameters.getContainingTask(),
+ parameters.getStreamConfig(),
+ parameters.getOutput());
+ parameters
+ .getOperatorEventDispatcher()
+
.registerEventHandler(parameters.getStreamConfig().getOperatorID(), operator);
+ return (T) operator;
+ }
+ }
+
+ /**
+ * A subclass of {@link EventReceivingOperator} whose subtask would fail
during a specific
+ * checkpoint.
+ */
+ private static class FailingCheckpointOperator<T> extends
EventReceivingOperator<T> {
+
+ private final int failAtCheckpointAfterMessage;
+
+ private TestScript testScript;
+
+ private FailingCheckpointOperator(int numEvents) {
+ this.failAtCheckpointAfterMessage =
+ numEvents * 2 / 3 + new Random().nextInt(numEvents / 6);
+ }
+
+ @Override
+ public void setup(
+ StreamTask<?, ?> containingTask,
+ StreamConfig config,
+ Output<StreamRecord<T>> output) {
+ super.setup(containingTask, config, output);
+ if (containingTask.getIndexInSubtaskGroup() == 0) {
+ this.testScript = TestScript.getForOperator(getOperatorID() +
"-subtask0");
+ } else {
+ this.testScript = null;
+ }
+ }
+
+ @Override
+ public void handleOperatorEvent(OperatorEvent evt) {
+ if (evt instanceof IntegerEvent) {
+ counter.add(1L);
+ } else if (evt instanceof EndEvent) {
+ try {
+
state.update(Collections.singletonList(counter.getLocalValue()));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ if (testScript.hasAlreadyFailed()) {
+ shouldCloseSource.set(true);
+ }
+ } else {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws
Exception {
+ super.snapshotState(context);
+ if (counter.getLocalValue() > failAtCheckpointAfterMessage
+ && testScript != null
+ && !testScript.hasAlreadyFailed()) {
+ testScript.recordHasFailed();
+ throw new RuntimeException();
Review Comment:
Have you verified that each invocation of `testFailingCheckpoint()` always
triggers this exception before this test ends? Note that we typically don't
want to have flaky tests that fail with low probability.
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java:
##########
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import
org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration test case that validates the exactly-once mechanism for
operator events sent around
+ * checkpoint. This class is an extension to {@link
CoordinatorEventsExactlyOnceITCase}, further
+ * verifying the exactly-once semantics of events in the following conditions:
+ *
+ * <h2>Stream operator recipient</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the test cases focus on
verifying the
+ * correctness of operator coordinator's behavior. It uses a custom {@link
AbstractInvokable}
+ * subclass that mocks the behavior of coordinator's recipients. This test
class uses actual stream
+ * operators as the recipient of coordinator events, verifying that stream
operators can correctly
+ * handle received operator events and inform coordinators of completing
checkpoint.
+ *
+ * <h2>Non-source stream task</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, the two tested tasks are
executed independently
+ * of each other. They do not have the upstream-downstream relationship as
operators usually do in a
+ * streaming job, and thus both of them are treated as source tasks. This test
class further
+ * verifies situations when the tested operators are not sources, which means
when checkpoint
+ * barriers are injected into sources, these operators may not have started
checkpoint yet.
+ *
+ * <h2>Unaligned checkpoint</h2>
+ *
+ * <p>This class tests both aligned and unaligned checkpoints to verify that
the correctness of the
+ * event delivery behavior around checkpoint is not affected by this condition.
+ *
+ * <h2>Non-global failure</h2>
+ *
+ * <p>In {@link CoordinatorEventsExactlyOnceITCase}, failures occur at the
coordinators' side, so
+ * they will cause the whole Flink job to fail over. In this class, test cases
are added when there
+ * might only be fail-overs on the subtasks' side, while the coordinators are
not affected. In this
+ * case the production infrastructure code needs to work together with user
code (implementations
+ * inside the coordinator and operator subclass) to ensure the exactly-once
semantics of operator
+ * event delivery.
+ */
+public class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
+ extends CoordinatorEventsExactlyOnceITCase {
+ private static final AtomicBoolean shouldCloseSource = new
AtomicBoolean(false);
+
+ private static final int numEvents = 100;
Review Comment:
Is this variable name consistent with other `private static final` variable
names in the repo?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]