pnowojski commented on a change in pull request #16773: URL: https://github.com/apache/flink/pull/16773#discussion_r688483258
########## File path: flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/StopWithSavepointITCase.java ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.lifecycle; + +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.runtime.operators.lifecycle.event.WatermarkReceivedEvent; +import org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders.TestingGraphBuilder; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedMultiInput; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.testutils.junit.SharedObjects; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; + +import static org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders.COMPLEX_GRAPH_BUILDER; +import static org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders.SIMPLE_GRAPH_BUILDER; + +/** + * A test suite to check that the operator methods are called according to contract when the job is + * stopped with savepoint. The contract was refined in FLIP-147. + * + * <p>Checked assumptions: + * + * <ol> + * <li>Downstream should only be "finished" after all of its the upstreams are + * <li>Order of events when finishing an operator: + * <ol> + * <li>(last data element) + * <li>{@link Watermark#MAX_WATERMARK MAX_WATERMARK} (if with drain) + * <li>{@link BoundedMultiInput#endInput endInput} (if with drain) + * <li>timer service quiesced + * <li>{@link StreamOperator#finish() finish} (if with drain; support is planned for + * no-drain) + * <li>{@link AbstractStreamOperator#snapshotState(StateSnapshotContext) snapshotState} (for + * the respective checkpoint) + * <li>{@link CheckpointListener#notifyCheckpointComplete notifyCheckpointComplete} (for the + * respective checkpoint) + * <li>(task termination) + * </ol> + * <li>Timers can be registered until the operator is finished (though may not fire) (simply + * register every 1ms and don't expect any exception) + * <li>The same watermark is received + * </ol> + * + * <p>Variants: + * + * <ul> + * <li>command - with or without drain (MAX_WATERMARK and endInput should be iff drain) + * <li>graph - different exchanges (keyBy, forward) + * <li>graph - multi-inputs, unions + * <li>graph - FLIP-27 and regular sources (should work for both) - NOT IMPLEMENTED + * </ul> + * + * <p>Not checked: + * + * <ul> + * <li>state distribution on recovery (when a new job started from the taken savepoint) (a + * separate IT case for partial finishing and state distribution) + * <li>re-taking a savepoint after one fails (and job fails over) (as it should not affect + * savepoints) + * <li>taking a savepoint after recovery (as it should not affect savepoints) + * <li>taking a savepoint on a partially completed graph (a separate IT case) + * </ul> + */ +@RunWith(Parameterized.class) +public class StopWithSavepointITCase extends AbstractTestBase { + + @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule public final SharedObjects sharedObjects = SharedObjects.create(); + + @Parameter(0) + public boolean withDrain; + + @Parameter(1) + public TestingGraphBuilder graphBuilder; + + @Test + public void test() throws Exception { + TestJobWithDescription testJob = graphBuilder.apply(sharedObjects, cfg -> {}, env -> {}); + + TestJobExecutor.submitGraph(testJob.jobGraph) + .waitForEvent(WatermarkReceivedEvent.class, testJob.eventQueue) + .stopWithSavepoint(temporaryFolder, withDrain) + .execute(miniClusterResource); + + TestJobExecutionValidators.checkOperatorsLifecycle(testJob, withDrain, true); + if (withDrain) { + // currently (1.14), sources do not stop before taking a savepoint and continue emission + // todo: enable after updating production code + TestJobExecutionValidators.checkDataFlow(testJob); + } + } + + @Parameterized.Parameters(name = "withDrain: {0}, {1}") + public static Object[] parameters() { + return new Object[][] { + new Object[] {true, SIMPLE_GRAPH_BUILDER}, new Object[] {false, SIMPLE_GRAPH_BUILDER}, + new Object[] {true, COMPLEX_GRAPH_BUILDER}, new Object[] {false, COMPLEX_GRAPH_BUILDER}, + }; + } Review comment: Good idea to keep `SIMPLE_GRAPH_BUILDER` separate! ########## File path: flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/event/TestEventQueueImpl.java ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.lifecycle.event; + +import org.apache.flink.util.function.RunnableWithException; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.LinkedBlockingQueue; + +class TestEventQueueImpl implements TestEventQueue { Review comment: I think this is somewhat too complicated and hard to follow with those handlers, listeners and runnables. It took me more then 10 minutes to understand this code (and I had to open it in the IDE to click through all of the usages. Do we need all of those extra layers of abstractions and indirections? As of now this simplified proposal https://github.com/pnowojski/flink/tree/f21090 would work just as fine. (note my other comment about squashing TestEventQueue interface with it's implementations). ########## File path: flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java ########## @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.lifecycle; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue; +import org.apache.flink.runtime.operators.lifecycle.event.WatermarkReceivedEvent; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue.TestEventHandler.TestEventNextAction.CONTINUE; +import static org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue.TestEventHandler.TestEventNextAction.STOP; +import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning; + +class TestJobExecutor { + + private final List<ThrowingConsumer<TestJobExecutionContext, Exception>> steps; Review comment: Why do we need this indirection with first defining list of steps and only after that executing them one by one? Why can not we execute them immediately one by one? ########## File path: flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandQueue.java ########## @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.lifecycle.command; + +import org.apache.flink.testutils.junit.SharedObjects; + +import java.io.Serializable; + +/** A queue for {@link TestCommand} executed by operators in a test job. */ +public interface TestCommandQueue extends Serializable { + + void add(TestCommand testCommand, TestCommandTarget target, TestCommandRetention retention); Review comment: instead of adding the `TestCommandTarget` (final version is quite complicated) I would also remove this indirection with two `add` methods: ``` add(TestCommand, String operatorId, TestCommandRetention); addToAll(TestCommand, TestCommandRetention); ``` ########## File path: flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java ########## @@ -84,6 +89,28 @@ public TestJobExecutor stopWithSavepoint(TemporaryFolder folder, boolean withDra return new TestJobExecutor(steps); } + public TestJobExecutor sendCommand( + TestCommandQueue commandQueue, + TestCommand testCommand, + TestCommandRetention retention) { + steps.add(ctx -> commandQueue.add(testCommand, TestCommandTarget.ALL, retention)); + return this; + } + + public TestJobExecutor waitForTermination() { + steps.add( + ctx -> { + while (!ctx.miniClusterResource + .getClusterClient() + .getJobStatus(ctx.job) + .get() + .isGloballyTerminalState()) { + Thread.sleep(100); Review comment: nit: `sleep(1)`? In the worst case `sleep(10)`? ########## File path: flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/event/TestEventQueue.java ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.lifecycle.event; + +import org.apache.flink.testutils.junit.SharedObjects; + +import java.io.Serializable; +import java.util.List; + +/** A queue for {@link TestEvent}. */ +public interface TestEventQueue extends Serializable { Review comment: I would suggest to squash `TestEventQueue`, `TestEventQueueImpl` and `SharedTestEventQueue`. I don't think we need this abstraction. `SharedTestEventQueue(new TestEventQueueImpl())` is the only combination being used and the split of responsibilities between `SharedTestEventQueue` and `TestEventQueueImpl` is doing more harm than good in this case I think. ########## File path: flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/event/SharedTestEventQueue.java ########## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.lifecycle.event; + +import org.apache.flink.testutils.junit.SharedReference; + +import java.util.List; + +class SharedTestEventQueue implements TestEventQueue { Review comment: nit: I'm not a fan of package private classes bypassing our checkstyles for missing Javadoc. ########## File path: flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/event/TestEventQueue.java ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.lifecycle.event; + +import org.apache.flink.testutils.junit.SharedObjects; + +import java.io.Serializable; +import java.util.List; + +/** A queue for {@link TestEvent}. */ +public interface TestEventQueue extends Serializable { Review comment: Java doc should explain what's the purpose of this class. Also without squashing `TestEventQueue, `TestEventQueueImpl` and `SharedTestEventQueue`, both `TestEventQueueImpl` and `SharedTestEventQueue` would require also an extra explanation what's their purpose (I'm struggling to understand this construct). ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java ########## @@ -182,8 +182,10 @@ public static void waitForAllTaskRunning( .allMatch( task -> task.getExecutionState() - == ExecutionState - .RUNNING)); + == ExecutionState + .RUNNING + || task.getExecutionState() + .isTerminal())); Review comment: I would create a separate JIRA ticket for this (if it brakes something, it would be easier to find/analyse/reference it). It sounds like it's indeed a valid fix. However shouldn't we throw an exception if state is terminal? `waitForAllTaskRunning` suggests that this is an unexpected outcome. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
