kezhuw commented on a change in pull request #15601: URL: https://github.com/apache/flink/pull/15601#discussion_r614086020
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ExecutionSubtaskAccess.java ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination; + +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.SerializedValue; + +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +final class ExecutionSubtaskAccess implements SubtaskAccess { + + private final Execution taskExecution; + private final OperatorID operator; + private final IncompleteFuturesTracker futuresTracker; + + ExecutionSubtaskAccess(Execution taskExecution, OperatorID operator) { + this.taskExecution = taskExecution; + this.operator = operator; + this.futuresTracker = new IncompleteFuturesTracker(); + + // this is a safeguard to speed up things: as soon as the task is in a terminal state, all + // the pending futures from events sent to that task should fail immediately + taskExecution + .getTerminalStateFuture() + .thenAccept( + (state) -> + futuresTracker.failAllFutures( + new FlinkException("Task is no longer running"))); + } + + @Override + public Callable<CompletableFuture<Acknowledge>> createEventSendAction( + SerializedValue<OperatorEvent> event) { + return () -> { + final ExecutionState state = taskExecution.getState(); + if (!(state == ExecutionState.RUNNING || state == ExecutionState.INITIALIZING)) { Review comment: Similar state inspection exists in `Execution.sendOperatorEvent`. I think it is a duplication. But I am not sure whether it is on purpose or not to make "fail immediately" more explicitly. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventValve.java ########## @@ -92,28 +77,21 @@ public boolean isShut() { * future. * * <p>If the valve is closed this buffers the event and returns an incomplete future. The future - * is completed with the original result once the valve is opened. If the event is never sent - * (because it gets dropped through a call to {@link #reset()} or {@link #resetForTask(int)}, - * then the returned future till be completed exceptionally. + * is completed with the original result once the valve is opened again. * * <p>This method makes no assumptions and gives no guarantees from which thread the result * future gets completed. */ public void sendEvent( - SerializedValue<OperatorEvent> event, - int subtask, + Callable<CompletableFuture<Acknowledge>> sendAction, Review comment: Should we document a bit about contract of `sendAction` ? #15605 makes me know that it should fail immediately for failed job, otherwise it will abort checkpoint unnecessary. It should belong tom #15605. I guess I messed up between prs. Feel free to consider this in either pr. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskAccess.java ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination; + +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.util.SerializedValue; + +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; + +/** A sender for operator events to a parallel instances of an operator. */ +interface SubtaskAccess { + + /** + * Sends the given serialized event to the operator's parallel instance with the given subtask Review comment: Is the javadoc outdated ? It would be nice if contract of created send action is referenced/linked here. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinatorTest.java ########## @@ -32,45 +32,52 @@ Licensed to the Apache Software Foundation (ASF) under one import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; /** Unit tests for {@link RecreateOnResetOperatorCoordinator}. */ public class RecreateOnResetOperatorCoordinatorTest { + private static final OperatorID OPERATOR_ID = new OperatorID(1234L, 5678L); private static final int NUM_SUBTASKS = 1; @Test - public void testQuiesceableContextNotQuiesced() throws TaskNotRunningException { + public void testQuiesceableContextForwardsProperties() { MockOperatorCoordinatorContext context = new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS); RecreateOnResetOperatorCoordinator.QuiesceableContext quiesceableContext = new RecreateOnResetOperatorCoordinator.QuiesceableContext(context); - TestingEvent event = new TestingEvent(); - quiesceableContext.sendEvent(event, 0); - quiesceableContext.failJob(new Exception()); - assertEquals(OPERATOR_ID, quiesceableContext.getOperatorId()); assertEquals(NUM_SUBTASKS, quiesceableContext.currentParallelism()); Review comment: `getUserCodeClassloader` forwarding ? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java ########## @@ -307,6 +321,42 @@ public void abortCurrentTriggering() { mainThreadExecutor.execute(eventValve::openValveAndUnmarkCheckpoint); } + // ------------------------------------------------------------------------ + // miscellaneous helpers + // ------------------------------------------------------------------------ + + private void setupAllSubtaskGateways() { + for (int i = 0; i < operatorParallelism; i++) { + setupSubtaskGateway(i); + } + } + + private void setupSubtaskGateway(int subtask) { + // this gets an access to the latest task execution attempt. + final SubtaskAccess sta = taskAccesses.getAccessForSubtask(subtask); + + final OperatorCoordinator.SubtaskGateway gateway = + new SubtaskGatewayImpl(sta, eventValve, mainThreadExecutor); + + // We need to do this synchronously here, otherwise we violate the contract that + // 'subtaskFailed()' will never overtake 'subtaskReady()'. + // An alternative, if we ever figure out that this cannot work synchronously here, + // is that we re-enqueue all actions (like 'subtaskFailed()' and 'subtaskRestored()') + // back into the main thread executor, rather than directly calling the OperatorCoordinator + FutureUtils.assertNoException( Review comment: Seems that exception from `subtaskReady` will quit process. I guess this is not what we want. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java ########## @@ -183,6 +184,8 @@ default void notifyCheckpointAborted(long checkpointId) {} */ void subtaskReset(int subtask, long checkpointId); + void subtaskReady(int subtask, SubtaskGateway gateway); Review comment: Document a bit ? PS: The docs of whole `OperatorCoordinator` is pretty nice! It helps me a lot in understanding. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
