pnowojski commented on a change in pull request #16773:
URL: https://github.com/apache/flink/pull/16773#discussion_r688483258



##########
File path: 
flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/StopWithSavepointITCase.java
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.lifecycle;
+
+import org.apache.flink.api.common.state.CheckpointListener;
+import 
org.apache.flink.runtime.operators.lifecycle.event.WatermarkReceivedEvent;
+import 
org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders.TestingGraphBuilder;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedMultiInput;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.junit.SharedObjects;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+import static 
org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders.COMPLEX_GRAPH_BUILDER;
+import static 
org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders.SIMPLE_GRAPH_BUILDER;
+
+/**
+ * A test suite to check that the operator methods are called according to 
contract when the job is
+ * stopped with savepoint. The contract was refined in FLIP-147.
+ *
+ * <p>Checked assumptions:
+ *
+ * <ol>
+ *   <li>Downstream should only be "finished" after all of its the upstreams 
are
+ *   <li>Order of events when finishing an operator:
+ *       <ol>
+ *         <li>(last data element)
+ *         <li>{@link Watermark#MAX_WATERMARK MAX_WATERMARK} (if with drain)
+ *         <li>{@link BoundedMultiInput#endInput endInput} (if with drain)
+ *         <li>timer service quiesced
+ *         <li>{@link StreamOperator#finish() finish} (if with drain; support 
is planned for
+ *             no-drain)
+ *         <li>{@link 
AbstractStreamOperator#snapshotState(StateSnapshotContext) snapshotState} (for
+ *             the respective checkpoint)
+ *         <li>{@link CheckpointListener#notifyCheckpointComplete 
notifyCheckpointComplete} (for the
+ *             respective checkpoint)
+ *         <li>(task termination)
+ *       </ol>
+ *   <li>Timers can be registered until the operator is finished (though may 
not fire) (simply
+ *       register every 1ms and don't expect any exception)
+ *   <li>The same watermark is received
+ * </ol>
+ *
+ * <p>Variants:
+ *
+ * <ul>
+ *   <li>command - with or without drain (MAX_WATERMARK and endInput should be 
iff drain)
+ *   <li>graph - different exchanges (keyBy, forward)
+ *   <li>graph - multi-inputs, unions
+ *   <li>graph - FLIP-27 and regular sources (should work for both) - NOT 
IMPLEMENTED
+ * </ul>
+ *
+ * <p>Not checked:
+ *
+ * <ul>
+ *   <li>state distribution on recovery (when a new job started from the taken 
savepoint) (a
+ *       separate IT case for partial finishing and state distribution)
+ *   <li>re-taking a savepoint after one fails (and job fails over) (as it 
should not affect
+ *       savepoints)
+ *   <li>taking a savepoint after recovery (as it should not affect savepoints)
+ *   <li>taking a savepoint on a partially completed graph (a separate IT case)
+ * </ul>
+ */
+@RunWith(Parameterized.class)
+public class StopWithSavepointITCase extends AbstractTestBase {
+
+    @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
+    @Parameter(0)
+    public boolean withDrain;
+
+    @Parameter(1)
+    public TestingGraphBuilder graphBuilder;
+
+    @Test
+    public void test() throws Exception {
+        TestJobWithDescription testJob = graphBuilder.apply(sharedObjects, cfg 
-> {}, env -> {});
+
+        TestJobExecutor.submitGraph(testJob.jobGraph)
+                .waitForEvent(WatermarkReceivedEvent.class, testJob.eventQueue)
+                .stopWithSavepoint(temporaryFolder, withDrain)
+                .execute(miniClusterResource);
+
+        TestJobExecutionValidators.checkOperatorsLifecycle(testJob, withDrain, 
true);
+        if (withDrain) {
+            // currently (1.14), sources do not stop before taking a savepoint 
and continue emission
+            // todo: enable after updating production code
+            TestJobExecutionValidators.checkDataFlow(testJob);
+        }
+    }
+
+    @Parameterized.Parameters(name = "withDrain: {0}, {1}")
+    public static Object[] parameters() {
+        return new Object[][] {
+            new Object[] {true, SIMPLE_GRAPH_BUILDER}, new Object[] {false, 
SIMPLE_GRAPH_BUILDER},
+            new Object[] {true, COMPLEX_GRAPH_BUILDER}, new Object[] {false, 
COMPLEX_GRAPH_BUILDER},
+        };
+    }

Review comment:
       Good idea to keep `SIMPLE_GRAPH_BUILDER` separate!

##########
File path: 
flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/event/TestEventQueueImpl.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.lifecycle.event;
+
+import org.apache.flink.util.function.RunnableWithException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
+
+class TestEventQueueImpl implements TestEventQueue {

Review comment:
       I think this is somewhat too complicated and hard to follow with those 
handlers, listeners and runnables. It took me more then 10 minutes to 
understand this code (and I had to open it in the IDE to click through all of 
the usages.
   
   Do we need all of those extra layers of abstractions and indirections? As of 
now this simplified proposal https://github.com/pnowojski/flink/tree/f21090 
would work just as fine. (note my other comment about squashing TestEventQueue 
interface with it's implementations).

##########
File path: 
flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.lifecycle;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue;
+import 
org.apache.flink.runtime.operators.lifecycle.event.WatermarkReceivedEvent;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue.TestEventHandler.TestEventNextAction.CONTINUE;
+import static 
org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue.TestEventHandler.TestEventNextAction.STOP;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+
+class TestJobExecutor {
+
+    private final List<ThrowingConsumer<TestJobExecutionContext, Exception>> 
steps;

Review comment:
       Why do we need this indirection with first defining list of steps and 
only after that executing them one by one? Why can not we execute them 
immediately one by one?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandQueue.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.lifecycle.command;
+
+import org.apache.flink.testutils.junit.SharedObjects;
+
+import java.io.Serializable;
+
+/** A queue for {@link TestCommand} executed by operators in a test job. */
+public interface TestCommandQueue extends Serializable {
+
+    void add(TestCommand testCommand, TestCommandTarget target, 
TestCommandRetention retention);

Review comment:
       instead of adding the `TestCommandTarget` (final version is quite 
complicated) I would also remove this indirection with two `add` methods:
   ```
   add(TestCommand, String operatorId, TestCommandRetention);
   addToAll(TestCommand, TestCommandRetention);
   ```

##########
File path: 
flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java
##########
@@ -84,6 +89,28 @@ public TestJobExecutor stopWithSavepoint(TemporaryFolder 
folder, boolean withDra
         return new TestJobExecutor(steps);
     }
 
+    public TestJobExecutor sendCommand(
+            TestCommandQueue commandQueue,
+            TestCommand testCommand,
+            TestCommandRetention retention) {
+        steps.add(ctx -> commandQueue.add(testCommand, TestCommandTarget.ALL, 
retention));
+        return this;
+    }
+
+    public TestJobExecutor waitForTermination() {
+        steps.add(
+                ctx -> {
+                    while (!ctx.miniClusterResource
+                            .getClusterClient()
+                            .getJobStatus(ctx.job)
+                            .get()
+                            .isGloballyTerminalState()) {
+                        Thread.sleep(100);

Review comment:
       nit: `sleep(1)`? In the worst case `sleep(10)`?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/event/TestEventQueue.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.lifecycle.event;
+
+import org.apache.flink.testutils.junit.SharedObjects;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** A queue for {@link TestEvent}. */
+public interface TestEventQueue extends Serializable {

Review comment:
       I would suggest to squash `TestEventQueue`, `TestEventQueueImpl` and 
`SharedTestEventQueue`. I don't think we need this abstraction. 
`SharedTestEventQueue(new TestEventQueueImpl())` is the only combination being 
used and the split of responsibilities between `SharedTestEventQueue` and 
`TestEventQueueImpl` is doing more harm than good in this case I think. 

##########
File path: 
flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/event/SharedTestEventQueue.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.lifecycle.event;
+
+import org.apache.flink.testutils.junit.SharedReference;
+
+import java.util.List;
+
+class SharedTestEventQueue implements TestEventQueue {

Review comment:
       nit: I'm not a fan of package private classes bypassing our checkstyles 
for missing Javadoc. 

##########
File path: 
flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/event/TestEventQueue.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.lifecycle.event;
+
+import org.apache.flink.testutils.junit.SharedObjects;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** A queue for {@link TestEvent}. */
+public interface TestEventQueue extends Serializable {

Review comment:
       Java doc should explain what's the purpose of this class. Also without 
squashing `TestEventQueue, `TestEventQueueImpl` and `SharedTestEventQueue`, 
both `TestEventQueueImpl` and `SharedTestEventQueue` would require also an 
extra explanation what's their purpose (I'm struggling to understand this 
construct).

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
##########
@@ -182,8 +182,10 @@ public static void waitForAllTaskRunning(
                                                             .allMatch(
                                                                     task ->
                                                                             
task.getExecutionState()
-                                                                               
     == ExecutionState
-                                                                               
             .RUNNING));
+                                                                               
             == ExecutionState
+                                                                               
                     .RUNNING
+                                                                               
     || task.getExecutionState()
+                                                                               
             .isTerminal()));

Review comment:
       I would create a separate JIRA ticket for this (if it brakes something, 
it would be easier to find/analyse/reference it). It sounds like it's indeed a 
valid fix.
   
   However shouldn't we throw an exception if state is terminal? 
`waitForAllTaskRunning` suggests that this is an unexpected outcome.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to