gaoyunhaii commented on a change in pull request #16774:
URL: https://github.com/apache/flink/pull/16774#discussion_r687639592



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOperatorChain.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
+import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * The {@link OperatorChain} that is used for restoring tasks that are {@link
+ * TaskStateManager#isFinishedOnRestore()}.
+ */
+@Internal
+public class FinishedOperatorChain<OUT, OP extends StreamOperator<OUT>>
+        extends OperatorChain<OUT, OP> {
+
+    public FinishedOperatorChain(
+            StreamTask<OUT, OP> containingTask,
+            RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> 
recordWriterDelegate) {
+        super(containingTask, recordWriterDelegate);
+    }
+
+    @Override
+    public boolean isFinishedOnRestore() {
+        return true;
+    }
+
+    @Override
+    public void dispatchOperatorEvent(OperatorID operator, 
SerializedValue<OperatorEvent> event) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) {}
+
+    @Override
+    public void endInput(int inputId) throws Exception {}
+
+    @Override
+    public void initializeStateAndOpenOperators(
+            StreamTaskStateInitializer streamTaskStateInitializer) {}
+
+    @Override
+    public void finishOperators(StreamTaskActionExecutor actionExecutor) 
throws Exception {}
+
+    @Override
+    public void close() throws IOException {

Review comment:
       This method might not need override

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -609,16 +586,13 @@ private boolean takeSnapshotSync(
             Supplier<Boolean> isRunning)
             throws Exception {
 
-        for (final StreamOperatorWrapper<?, ?> operatorWrapper :
-                operatorChain.getAllOperators(true)) {
-            if (!enableCheckpointAfterTasksFinished && 
operatorWrapper.isClosed()) {
-                env.declineCheckpoint(
-                        checkpointMetaData.getCheckpointId(),
-                        new CheckpointException(
-                                "Task Name" + taskName,
-                                
CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING));
-                return false;
-            }
+        if (!enableCheckpointAfterTasksFinished && operatorChain.isClosed()) {

Review comment:
       Although it is not introduced in this PR, we could always decline the 
checkpoint no matter what the value of `enableCheckpointAfterTasksFinished` is. 
Since now we change the meaning of `close` to dispose the operator, it is 
called at the last step of the task, at that time the mailbox has been closed 
and there should be no new checkpoints. 

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
##########
@@ -490,12 +494,8 @@ public void notifyEndOfData() throws IOException {
                         
.finishForSingletonOperatorChain(StringSerializer.INSTANCE)
                         .build()) {
 
-            testHarness.processElement(Watermark.MAX_WATERMARK, 0, 0);

Review comment:
       We may keep emitting the MAX_WATERMARK and check the MAX_WATERMARK is 
passed ? 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOperatorChain.java
##########
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
+import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
+import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * The {@link OperatorChain} that is used for restoring tasks that are {@link
+ * TaskStateManager#isFinishedOnRestore()}.
+ */
+@Internal
+public class FinishedOperatorChain<OUT, OP extends StreamOperator<OUT>>
+        extends OperatorChain<OUT, OP> {
+
+    public FinishedOperatorChain(
+            StreamTask<OUT, OP> containingTask,
+            RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> 
recordWriterDelegate) {
+        super(containingTask, recordWriterDelegate);
+    }
+
+    @Override
+    public boolean isFinishedOnRestore() {
+        return true;
+    }
+
+    @Override
+    public void dispatchOperatorEvent(OperatorID operator, 
SerializedValue<OperatorEvent> event) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) {}
+
+    @Override
+    public void endInput(int inputId) throws Exception {}
+
+    @Override
+    public void initializeStateAndOpenOperators(
+            StreamTaskStateInitializer streamTaskStateInitializer) {}
+
+    @Override
+    public void finishOperators(StreamTaskActionExecutor actionExecutor) 
throws Exception {}
+
+    @Override
+    public void close() throws IOException {
+        super.close();
+    }
+
+    // ------------------------------------------------------------------------

Review comment:
       It seems the following private methods are not used for 
`FinishedOperatorChain` ? 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessorFactory.java
##########
@@ -289,8 +290,12 @@ public StreamTaskSourceOutput(
         }
 
         @Override
-        public void emitStreamStatus(StreamStatus streamStatus) throws 
Exception {
+        public void emitStreamStatus(StreamStatus streamStatus) {
             chainedOutput.emitStreamStatus(streamStatus);
         }
     }
+
+    private interface FinishedOnRestoreSourceWatermarkBypass {

Review comment:
       This interface is not used?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
##########
@@ -712,18 +714,24 @@ public void 
testOperatorSkipLifeCycleIfFinishedOnRestore() throws Exception {
         try (StreamTaskMailboxTestHarness<String> harness =
                 new StreamTaskMailboxTestHarnessBuilder<>(
                                 OneInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO)
+                        .setCollectNetworkEvents()
                         .addInput(BasicTypeInfo.STRING_TYPE_INFO, 3)
+                        .setCollectNetworkEvents()

Review comment:
       The setting is repeated.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskChainedSourcesCheckpointingTest.java
##########
@@ -507,9 +507,6 @@ public void notifyEndOfData() throws IOException {
                     sourceReader.getLifeCycleMonitor().assertCallTimes(0, 
LifeCyclePhase.values());
                 } else if (wrapper.getStreamOperator()

Review comment:
       Remove the empty branch ? 

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -161,7 +164,11 @@ public OperatorChain(
         Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap =
                 new HashMap<>(outEdgesInOrder.size());
         this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
-        this.finishedOnRestore = finishedOnRestore;
+        this.finishedOnRestoreInput =
+                this.isFinishedOnRestore()

Review comment:
       Perhaps we could make `getFinishedOnRestoreInputOrDefault` an abstract 
method to avoid after we have the separate subclasses we still do the judge of 
if it is `FinishedOnRestore`, but in consider there are might some 
implementation complexity I would not strongly argue for that for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to