pnowojski commented on a change in pull request #14831: URL: https://github.com/apache/flink/pull/14831#discussion_r570906591
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AbstractNonSourceStreamTask.java ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.security.FlinkSecurityManager; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler; +import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox; + +import javax.annotation.Nullable; + +/** Base class for non-source tasks which need to trigger {@link CheckpointBarrierHandler}. */ +public abstract class AbstractNonSourceStreamTask<OUT, OP extends StreamOperator<OUT>> + extends StreamTask<OUT, OP> { + + protected AbstractNonSourceStreamTask(Environment env) throws Exception { + super(env); + } + + protected AbstractNonSourceStreamTask(Environment env, @Nullable TimerService timerService) + throws Exception { + super(env, timerService); + } + + protected AbstractNonSourceStreamTask( + Environment environment, + @Nullable TimerService timerService, + Thread.UncaughtExceptionHandler uncaughtExceptionHandler) + throws Exception { + super(environment, timerService, uncaughtExceptionHandler); + } + + protected AbstractNonSourceStreamTask( + Environment environment, + @Nullable TimerService timerService, + Thread.UncaughtExceptionHandler uncaughtExceptionHandler, + StreamTaskActionExecutor actionExecutor) + throws Exception { + super(environment, timerService, uncaughtExceptionHandler, actionExecutor); + } + + protected AbstractNonSourceStreamTask( + Environment environment, + @Nullable TimerService timerService, + Thread.UncaughtExceptionHandler uncaughtExceptionHandler, + StreamTaskActionExecutor actionExecutor, + TaskMailbox mailbox) + throws Exception { + super(environment, timerService, uncaughtExceptionHandler, actionExecutor, mailbox); + } + + @Nullable + protected abstract CheckpointBarrierHandler getCheckpointBarrierHandler(); + + @Override + protected boolean triggerCheckpoint( + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions, + boolean advanceToEndOfEventTime) + throws Exception { + + FlinkSecurityManager.monitorUserSystemExitForCurrentThread(); + + try { + CheckpointBarrierHandler checkpointBarrierHandler = getCheckpointBarrierHandler(); + if (checkpointBarrierHandler == null) { + return false; + } Review comment: nit: `checkoNotNull(checkpointBarrierHandler)` ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java ########## @@ -53,6 +60,8 @@ @Internal public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> { + @Nullable private CheckpointBarrierHandler checkpointBarrierHandler; Review comment: Maybe move this field to `AbstractNonSourceStreamTask`? ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointableOneInputStreamTask.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; + +/** A test stream task that also response to the checkpoint trigger requirement. */ +public class CheckpointableOneInputStreamTask<IN, OUT> extends OneInputStreamTask<IN, OUT> { Review comment: `CheckpointableOneInputStreamTask` -> `TestingCheckpointableStreamTask`? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AbstractSourceStreamTask.java ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.security.FlinkSecurityManager; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox; + +import javax.annotation.Nullable; + +/** Base class for source stream tasks which need to trigger a new checkpoint. */ +public abstract class AbstractSourceStreamTask<OUT, OP extends StreamOperator<OUT>> + extends StreamTask<OUT, OP> { + + protected long latestAsyncCheckpointStartDelayNanos; + + protected AbstractSourceStreamTask(Environment env) throws Exception { + super(env); + } + + protected AbstractSourceStreamTask(Environment env, @Nullable TimerService timerService) + throws Exception { + super(env, timerService); + } + + protected AbstractSourceStreamTask( + Environment environment, + @Nullable TimerService timerService, + Thread.UncaughtExceptionHandler uncaughtExceptionHandler) + throws Exception { + super(environment, timerService, uncaughtExceptionHandler); + } + + protected AbstractSourceStreamTask( + Environment environment, + @Nullable TimerService timerService, + Thread.UncaughtExceptionHandler uncaughtExceptionHandler, + StreamTaskActionExecutor actionExecutor) + throws Exception { + super(environment, timerService, uncaughtExceptionHandler, actionExecutor); + } + + protected AbstractSourceStreamTask( + Environment environment, + @Nullable TimerService timerService, + Thread.UncaughtExceptionHandler uncaughtExceptionHandler, + StreamTaskActionExecutor actionExecutor, + TaskMailbox mailbox) + throws Exception { + super(environment, timerService, uncaughtExceptionHandler, actionExecutor, mailbox); + } + + @Override + protected boolean triggerCheckpoint( + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions, + boolean advanceToEndOfEventTime) + throws Exception { + + FlinkSecurityManager.monitorUserSystemExitForCurrentThread(); Review comment: We are already in the mailbox thread, so I don't understand why do we need to enable this/set this again? (I know this might be a pre-existing problem, and I've already notified the authors of this `FlinkSecurityManager` feature, but if they are going to change something, you might need to update/fix it in your PR as well after a rebase) ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AbstractSourceStreamTask.java ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.security.FlinkSecurityManager; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox; + +import javax.annotation.Nullable; + +/** Base class for source stream tasks which need to trigger a new checkpoint. */ +public abstract class AbstractSourceStreamTask<OUT, OP extends StreamOperator<OUT>> + extends StreamTask<OUT, OP> { + + protected long latestAsyncCheckpointStartDelayNanos; + + protected AbstractSourceStreamTask(Environment env) throws Exception { + super(env); + } + + protected AbstractSourceStreamTask(Environment env, @Nullable TimerService timerService) + throws Exception { + super(env, timerService); + } + + protected AbstractSourceStreamTask( + Environment environment, + @Nullable TimerService timerService, + Thread.UncaughtExceptionHandler uncaughtExceptionHandler) + throws Exception { + super(environment, timerService, uncaughtExceptionHandler); + } + + protected AbstractSourceStreamTask( + Environment environment, + @Nullable TimerService timerService, + Thread.UncaughtExceptionHandler uncaughtExceptionHandler, + StreamTaskActionExecutor actionExecutor) + throws Exception { + super(environment, timerService, uncaughtExceptionHandler, actionExecutor); + } + + protected AbstractSourceStreamTask( + Environment environment, + @Nullable TimerService timerService, + Thread.UncaughtExceptionHandler uncaughtExceptionHandler, + StreamTaskActionExecutor actionExecutor, + TaskMailbox mailbox) + throws Exception { + super(environment, timerService, uncaughtExceptionHandler, actionExecutor, mailbox); + } + + @Override + protected boolean triggerCheckpoint( + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions, + boolean advanceToEndOfEventTime) + throws Exception { + + FlinkSecurityManager.monitorUserSystemExitForCurrentThread(); + try { + latestAsyncCheckpointStartDelayNanos = + 1_000_000 + * Math.max( + 0, + System.currentTimeMillis() - checkpointMetaData.getTimestamp()); + + // No alignment if we inject a checkpoint + CheckpointMetricsBuilder checkpointMetrics = + new CheckpointMetricsBuilder() + .setAlignmentDurationNanos(0L) + .setBytesProcessedDuringAlignment(0L); + + subtaskCheckpointCoordinator.initCheckpoint( + checkpointMetaData.getCheckpointId(), checkpointOptions); + + boolean success = + performCheckpoint( + checkpointMetaData, + checkpointOptions, + checkpointMetrics, + advanceToEndOfEventTime); + if (!success) { + declineCheckpoint(checkpointMetaData.getCheckpointId()); + } + return success; + } catch (Exception e) { + // propagate exceptions only if the task is still in "running" state + if (isRunning) { + throw new Exception( + "Could not perform checkpoint " + + checkpointMetaData.getCheckpointId() + + " for operator " + + getName() + + '.', + e); + } else { + LOG.debug( + "Could not perform checkpoint {} for operator {} while the " + + "invokable was not in state running.", + checkpointMetaData.getCheckpointId(), + getName(), + e); + return false; + } + } finally { + FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread(); + } + } Review comment: Deduplicate with `AbstractNonSourceStreamTask`.? Most of the code could be still in the `StreamTask`, which would just call some small abstract method that would be implement differently in the `AbstractNonSourceStreamTask` and `AbstractSourceStreamTask`? ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java ########## @@ -961,6 +965,52 @@ public void testCheckpointBarrierMetrics() throws Exception { testHarness.waitForTaskCompletion(); } + @Test + public void testTriggerCheckpointViaRpcMessage() throws Exception { + // A global barrier handler to record the history of checkpoint trigger. + TestCheckpointBarrierHandler barrierHandler = + new TestCheckpointBarrierHandler(new DummyInvokable()); + OneInputStreamTaskTestHarness<String, String> testHarness = + new OneInputStreamTaskTestHarness<>( + (Environment env) -> + new OneInputStreamTask<String, String>(env) { + @Override + protected CheckpointBarrierHandler + getCheckpointBarrierHandler() { + return barrierHandler; + } + }, Review comment: Am I missing something? But isn't it dangerous what you are doing here? Basically in your test there will be existing two very different `CheckpointBarrierHandler` instances. One real instance created in `StreamTask#init` and passed to `CheckpointedInputGate` and then a completelly different one `TestCheckpointBarrierHandler ` returned from `getCheckpointBarrierHandler`? Why don't you just hook in the testing code in for example `TestOperator()` and track the triggered checkpoints from there? ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CheckpointableOneInputStreamTask.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; + +/** A test stream task that also response to the checkpoint trigger requirement. */ +public class CheckpointableOneInputStreamTask<IN, OUT> extends OneInputStreamTask<IN, OUT> { Review comment: Why do we need this class? Why is it a `OneInputStreamTask` that behaves the same as `AbstractSourceStreamTask`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
