zhijiangW commented on a change in pull request #7856:
[FLINK-11776][coordination] Refactor to simplify the process of
scheduleOrUpdateConsumers
URL: https://github.com/apache/flink/pull/7856#discussion_r268961637
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
##########
@@ -818,71 +800,46 @@ else if (numConsumers == 0) {
consumerVertex.checkInputDependencyConstraints()) {
scheduleConsumer(consumerVertex);
}
-
- // double check to resolve race conditions
- if (consumerVertex.getExecutionState() ==
RUNNING) {
- consumerVertex.sendPartitionInfos();
- }
}
//
----------------------------------------------------------------
// Consumer is running => send update message now
+ // Consumer is deploying => cache the partition info
which would be
+ // sent after switching to running
//
----------------------------------------------------------------
- else {
- if (consumerState == RUNNING) {
- final LogicalSlot consumerSlot =
consumer.getAssignedResource();
-
- if (consumerSlot == null) {
- // The consumer has been reset
concurrently
- continue;
- }
-
- final TaskManagerLocation
partitionTaskManagerLocation = partition.getProducer()
-
.getCurrentAssignedResource().getTaskManagerLocation();
- final ResourceID partitionTaskManager =
partitionTaskManagerLocation.getResourceID();
-
- final ResourceID consumerTaskManager =
consumerSlot.getTaskManagerLocation().getResourceID();
-
- final ResultPartitionID partitionId =
new ResultPartitionID(partition.getPartitionId(), attemptId);
-
- final ResultPartitionLocation
partitionLocation;
+ else if (consumerState == DEPLOYING || consumerState ==
RUNNING) {
+ final LogicalSlot consumerSlot =
consumer.getAssignedResource();
+ if (consumerSlot == null) {
+ // The consumer has been reset
concurrently
+ continue;
+ }
- if
(consumerTaskManager.equals(partitionTaskManager)) {
- // Consuming task is deployed
to the same instance as the partition => local
- partitionLocation =
ResultPartitionLocation.createLocal();
- }
- else {
- // Different instances => remote
- final ConnectionID connectionId
= new ConnectionID(
-
partitionTaskManagerLocation,
-
partition.getIntermediateResult().getConnectionIndex());
+ final TaskManagerLocation
partitionTaskManagerLocation = partition.getProducer()
+
.getCurrentAssignedResource().getTaskManagerLocation();
+ final ResourceID partitionTaskManager =
partitionTaskManagerLocation.getResourceID();
+ final ResourceID consumerTaskManager =
consumerSlot.getTaskManagerLocation().getResourceID();
- partitionLocation =
ResultPartitionLocation.createRemote(connectionId);
- }
+ final ResultPartitionLocation partitionLocation;
+ if
(consumerTaskManager.equals(partitionTaskManager)) {
+ // Consuming task is deployed to the
same instance as the partition => local
+ partitionLocation =
ResultPartitionLocation.createLocal();
+ } else {
+ // Different instances => remote
+ final ConnectionID connectionId = new
ConnectionID(
+ partitionTaskManagerLocation,
+
partition.getIntermediateResult().getConnectionIndex());
+ partitionLocation =
ResultPartitionLocation.createRemote(connectionId);
+ }
- final InputChannelDeploymentDescriptor
descriptor = new InputChannelDeploymentDescriptor(
- partitionId,
partitionLocation);
+ final ResultPartitionID partitionId = new
ResultPartitionID(partition.getPartitionId(), attemptId);
Review comment:
I checked the process and it seemed not a bug before.
If one execution is marked finished, then this execution should be regarded
as a producer to update all its consumers, so it is safe to use `attemptId`
before.
Nevertheless, after we migrate the creation of `PartitionInfo` from
`Execution` as you suggested, we could use
`partition.getProducer().getCurrentExecutionAttempt().getAttemptId()` instead
to generate `ResultPartitionID` for simplifying the
`PartitionInfo#fromEdge(ExecutionEdge)`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services