This is an automated email from the ASF dual-hosted git repository.
gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a9f2e20375f [FLINK-27523][runtime] Runtime supports producing and
consuming cached intermediate results
a9f2e20375f is described below
commit a9f2e20375f669a5f42944c3ba12a903a4624e43
Author: sxnan <[email protected]>
AuthorDate: Wed May 25 10:55:41 2022 +0800
[FLINK-27523][runtime] Runtime supports producing and consuming cached
intermediate results
This closes #19653.
---
.../TaskDeploymentDescriptorFactory.java | 76 +++++-
.../executiongraph/DefaultExecutionGraph.java | 7 +
.../executiongraph/EdgeManagerBuildUtil.java | 6 +-
.../InternalExecutionGraphAccessor.java | 8 +-
.../RegionPartitionGroupReleaseStrategy.java | 3 +-
.../network/partition/ClusterPartitionManager.java | 23 ++
.../io/network/partition/DataSetMetaInfo.java | 19 ++
.../partition/JobMasterPartitionTracker.java | 10 +
.../partition/JobMasterPartitionTrackerImpl.java | 37 +++
.../partition/ResourceManagerPartitionTracker.java | 10 +
.../ResourceManagerPartitionTrackerImpl.java | 21 +-
.../partition/TaskExecutorPartitionInfo.java | 15 +-
.../TaskExecutorPartitionTrackerImpl.java | 21 +-
.../apache/flink/runtime/jobgraph/JobVertex.java | 29 ++-
.../apache/flink/runtime/jobmaster/JobMaster.java | 1 +
.../runtime/resourcemanager/ResourceManager.java | 19 ++
...achedIntermediateDataSetCorruptedException.java | 44 ++++
.../flink/runtime/scheduler/DefaultScheduler.java | 34 ++-
.../scheduler/strategy/ConsumedPartitionGroup.java | 24 +-
.../flink/runtime/taskexecutor/TaskExecutor.java | 7 +-
.../partition/ClusterPartitionReport.java | 22 +-
.../TaskDeploymentDescriptorFactoryTest.java | 3 +-
.../JobMasterPartitionTrackerImplTest.java | 58 +++++
.../partition/NoOpJobMasterPartitionTracker.java | 13 +
.../NoOpResourceManagerPartitionTracker.java | 8 +
.../ResourceManagerPartitionTrackerImplTest.java | 67 ++++-
.../TaskExecutorPartitionTrackerImplTest.java | 80 +++++-
.../TestingJobMasterPartitionTracker.java | 13 +
.../jobmaster/JobIntermediateDatasetReuseTest.java | 270 +++++++++++++++++++++
.../ResourceManagerPartitionLifecycleTest.java | 36 ++-
.../utils/TestingResourceManagerGateway.java | 15 ++
.../adapter/DefaultExecutionVertexTest.java | 4 +-
.../runtime/scheduler/adaptive/ExecutingTest.java | 9 +
.../strategy/TestingSchedulingExecutionVertex.java | 8 +-
.../strategy/TestingSchedulingTopology.java | 3 +-
.../runtime/taskexecutor/TaskExecutorTest.java | 19 +-
36 files changed, 971 insertions(+), 71 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
index dbe20738531..6da8f4fabca 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
@@ -39,11 +39,13 @@ import
org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobType;
+import
org.apache.flink.runtime.scheduler.CachedIntermediateDataSetCorruptedException;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
import org.apache.flink.types.Either;
import org.apache.flink.util.CompressedSerializedValue;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import javax.annotation.Nullable;
@@ -51,7 +53,9 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
@@ -71,6 +75,8 @@ public class TaskDeploymentDescriptorFactory {
private final Function<IntermediateResultPartitionID,
IntermediateResultPartition>
resultPartitionRetriever;
private final BlobWriter blobWriter;
+ private final Map<IntermediateDataSetID, ShuffleDescriptor[]>
+ consumedClusterPartitionShuffleDescriptors;
private TaskDeploymentDescriptorFactory(
ExecutionAttemptID executionId,
@@ -81,7 +87,9 @@ public class TaskDeploymentDescriptorFactory {
List<ConsumedPartitionGroup> consumedPartitionGroups,
Function<IntermediateResultPartitionID,
IntermediateResultPartition>
resultPartitionRetriever,
- BlobWriter blobWriter) {
+ BlobWriter blobWriter,
+ Map<IntermediateDataSetID, ShuffleDescriptor[]>
+ consumedClusterPartitionShuffleDescriptors) {
this.executionId = executionId;
this.serializedJobInformation = serializedJobInformation;
this.taskInfo = taskInfo;
@@ -90,6 +98,8 @@ public class TaskDeploymentDescriptorFactory {
this.consumedPartitionGroups = consumedPartitionGroups;
this.resultPartitionRetriever = resultPartitionRetriever;
this.blobWriter = blobWriter;
+ this.consumedClusterPartitionShuffleDescriptors =
+ consumedClusterPartitionShuffleDescriptors;
}
public TaskDeploymentDescriptor createDeploymentDescriptor(
@@ -137,6 +147,19 @@ public class TaskDeploymentDescriptorFactory {
consumedIntermediateResult,
consumedPartitionGroup)));
}
+ for (Map.Entry<IntermediateDataSetID, ShuffleDescriptor[]> entry :
+ consumedClusterPartitionShuffleDescriptors.entrySet()) {
+ // For FLIP-205, the JobGraph generating side ensure that the
cluster partition is
+ // produced with only one subpartition. Therefore, we always
consume the partition with
+ // subpartition index of 0.
+ inputGates.add(
+ new InputGateDeploymentDescriptor(
+ entry.getKey(),
+ ResultPartitionType.BLOCKING_PERSISTENT,
+ 0,
+ entry.getValue()));
+ }
+
return inputGates;
}
@@ -231,9 +254,22 @@ public class TaskDeploymentDescriptorFactory {
}
public static TaskDeploymentDescriptorFactory fromExecutionVertex(
- ExecutionVertex executionVertex) throws IOException {
+ ExecutionVertex executionVertex)
+ throws IOException, CachedIntermediateDataSetCorruptedException {
InternalExecutionGraphAccessor internalExecutionGraphAccessor =
executionVertex.getExecutionGraphAccessor();
+ Map<IntermediateDataSetID, ShuffleDescriptor[]>
clusterPartitionShuffleDescriptors;
+ try {
+ clusterPartitionShuffleDescriptors =
+ getClusterPartitionShuffleDescriptors(executionVertex);
+ } catch (Throwable e) {
+ throw new CachedIntermediateDataSetCorruptedException(
+ e,
+ executionVertex
+ .getJobVertex()
+ .getJobVertex()
+ .getIntermediateDataSetIdsToConsume());
+ }
return new TaskDeploymentDescriptorFactory(
executionVertex.getCurrentExecutionAttempt().getAttemptId(),
@@ -244,7 +280,41 @@ public class TaskDeploymentDescriptorFactory {
internalExecutionGraphAccessor.getPartitionLocationConstraint(),
executionVertex.getAllConsumedPartitionGroups(),
internalExecutionGraphAccessor::getResultPartitionOrThrow,
- internalExecutionGraphAccessor.getBlobWriter());
+ internalExecutionGraphAccessor.getBlobWriter(),
+ clusterPartitionShuffleDescriptors);
+ }
+
+ private static Map<IntermediateDataSetID, ShuffleDescriptor[]>
+ getClusterPartitionShuffleDescriptors(ExecutionVertex
executionVertex) {
+ final InternalExecutionGraphAccessor internalExecutionGraphAccessor =
+ executionVertex.getExecutionGraphAccessor();
+ final List<IntermediateDataSetID> consumedClusterDataSetIds =
+
executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume();
+ Map<IntermediateDataSetID, ShuffleDescriptor[]>
clusterPartitionShuffleDescriptors =
+ new HashMap<>();
+
+ for (IntermediateDataSetID consumedClusterDataSetId :
consumedClusterDataSetIds) {
+ List<? extends ShuffleDescriptor> shuffleDescriptors =
+
internalExecutionGraphAccessor.getClusterPartitionShuffleDescriptors(
+ consumedClusterDataSetId);
+
+ // For FLIP-205, the job graph generating side makes sure that the
producer and consumer
+ // of the cluster partition have the same parallelism and each
consumer Task consumes
+ // one output partition of the producer.
+ Preconditions.checkState(
+ executionVertex.getTotalNumberOfParallelSubtasks() ==
shuffleDescriptors.size(),
+ "The parallelism (%s) of the cache consuming job vertex is
"
+ + "different from the number of shuffle
descriptors (%s) of the intermediate data set",
+ executionVertex.getTotalNumberOfParallelSubtasks(),
+ shuffleDescriptors.size());
+
+ clusterPartitionShuffleDescriptors.put(
+ consumedClusterDataSetId,
+ new ShuffleDescriptor[] {
+
shuffleDescriptors.get(executionVertex.getParallelSubtaskIndex())
+ });
+ }
+ return clusterPartitionShuffleDescriptors;
}
private static MaybeOffloaded<JobInformation> getSerializedJobInformation(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 713f185b8cf..a1e91a2d801 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -73,6 +73,7 @@ import
org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.StateBackend;
@@ -1602,4 +1603,10 @@ public class DefaultExecutionGraph implements
ExecutionGraph, InternalExecutionG
public ExecutionGraphID getExecutionGraphID() {
return executionGraphId;
}
+
+ @Override
+ public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+ IntermediateDataSetID intermediateDataSetID) {
+ return
partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetID);
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
index 9ba16aa8eb9..3aa6e59de0d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
@@ -182,7 +182,8 @@ public class EdgeManagerBuildUtil {
IntermediateResultPartitionID consumedPartitionId,
IntermediateResult intermediateResult) {
ConsumedPartitionGroup consumedPartitionGroup =
-
ConsumedPartitionGroup.fromSinglePartition(consumedPartitionId);
+ ConsumedPartitionGroup.fromSinglePartition(
+ consumedPartitionId,
intermediateResult.getResultType());
registerConsumedPartitionGroupToEdgeManager(consumedPartitionGroup,
intermediateResult);
return consumedPartitionGroup;
}
@@ -191,7 +192,8 @@ public class EdgeManagerBuildUtil {
List<IntermediateResultPartitionID> consumedPartitions,
IntermediateResult intermediateResult) {
ConsumedPartitionGroup consumedPartitionGroup =
-
ConsumedPartitionGroup.fromMultiplePartitions(consumedPartitions);
+ ConsumedPartitionGroup.fromMultiplePartitions(
+ consumedPartitions,
intermediateResult.getResultType());
registerConsumedPartitionGroupToEdgeManager(consumedPartitionGroup,
intermediateResult);
return consumedPartitionGroup;
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java
index 2e2cd3733bf..8ef23b2d834 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java
@@ -26,9 +26,11 @@ import
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.execution.ExecutionState;
import
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategy;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.types.Either;
import org.apache.flink.util.SerializedValue;
@@ -64,7 +66,7 @@ public interface InternalExecutionGraphAccessor {
@Nonnull
ComponentMainThreadExecutor getJobMasterMainThreadExecutor();
- ShuffleMaster<?> getShuffleMaster();
+ ShuffleMaster<? extends ShuffleDescriptor> getShuffleMaster();
JobMasterPartitionTracker getPartitionTracker();
@@ -114,4 +116,8 @@ public interface InternalExecutionGraphAccessor {
boolean isDynamic();
ExecutionGraphID getExecutionGraphID();
+
+ /** Get the shuffle descriptors of the cluster partitions ordered by
partition number. */
+ List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+ IntermediateDataSetID intermediateResultPartition);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionGroupReleaseStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionGroupReleaseStrategy.java
index 06490de9027..a837df72808 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionGroupReleaseStrategy.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionGroupReleaseStrategy.java
@@ -147,7 +147,8 @@ public class RegionPartitionGroupReleaseStrategy
for (ConsumedPartitionGroup consumedPartitionGroup :
consumedPartitionGroups) {
final ConsumerRegionGroupExecutionView consumerRegionGroup =
partitionGroupConsumerRegions.get(consumedPartitionGroup);
- if (consumerRegionGroup.isFinished()) {
+ if (consumerRegionGroup.isFinished()
+ &&
!consumedPartitionGroup.getResultPartitionType().isPersistent()) {
// At present, there's only one ConsumerVertexGroup for each
// ConsumedPartitionGroup, so if a ConsumedPartitionGroup is
fully consumed, all
// its partitions are releasable.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ClusterPartitionManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ClusterPartitionManager.java
index 2e2aa6722e6..9a08e3cc569 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ClusterPartitionManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ClusterPartitionManager.java
@@ -17,8 +17,12 @@
package org.apache.flink.runtime.io.network.partition;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -39,4 +43,23 @@ public interface ClusterPartitionManager {
* @return future that is completed once all partitions have been released
*/
CompletableFuture<Void> releaseClusterPartitions(IntermediateDataSetID
dataSetToRelease);
+
+ /**
+ * Report the cluster partitions status in the task executor.
+ *
+ * @param taskExecutorId The id of the task executor.
+ * @param clusterPartitionReport The status of the cluster partitions.
+ * @return future that is completed once the report have been processed.
+ */
+ CompletableFuture<Void> reportClusterPartitions(
+ ResourceID taskExecutorId, ClusterPartitionReport
clusterPartitionReport);
+
+ /**
+ * Get the shuffle descriptors of the cluster partitions ordered by
partition number.
+ *
+ * @param intermediateDataSetID The id of the dataset.
+ * @return shuffle descriptors of the cluster partitions.
+ */
+ CompletableFuture<List<ShuffleDescriptor>>
getClusterPartitionsShuffleDescriptors(
+ IntermediateDataSetID intermediateDataSetID);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java
index cb2a39f3159..ae446ad5bf4 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java
@@ -18,9 +18,14 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.util.Preconditions;
+import java.util.Comparator;
+import java.util.Map;
import java.util.OptionalInt;
+import java.util.SortedMap;
+import java.util.TreeMap;
/** Container for meta-data of a data set. */
public final class DataSetMetaInfo {
@@ -28,6 +33,10 @@ public final class DataSetMetaInfo {
private final int numRegisteredPartitions;
private final int numTotalPartitions;
+ private final SortedMap<ResultPartitionID, ShuffleDescriptor>
+ shuffleDescriptorsOrderByPartitionId =
+ new TreeMap<>(
+ Comparator.comparingInt(o ->
o.getPartitionId().getPartitionNumber()));
private DataSetMetaInfo(int numRegisteredPartitions, int
numTotalPartitions) {
this.numRegisteredPartitions = numRegisteredPartitions;
@@ -44,6 +53,16 @@ public final class DataSetMetaInfo {
return numTotalPartitions;
}
+ public DataSetMetaInfo addShuffleDescriptors(
+ Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors) {
+ this.shuffleDescriptorsOrderByPartitionId.putAll(shuffleDescriptors);
+ return this;
+ }
+
+ public Map<ResultPartitionID, ShuffleDescriptor> getShuffleDescriptors() {
+ return this.shuffleDescriptorsOrderByPartitionId;
+ }
+
static DataSetMetaInfo withoutNumRegisteredPartitions(int
numTotalPartitions) {
return new DataSetMetaInfo(UNKNOWN, numTotalPartitions);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
index cb7c2bef862..2c116e4951e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
@@ -19,8 +19,12 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import java.util.Collection;
+import java.util.List;
/**
* Utility for tracking partitions and issuing release calls to task executors
and shuffle masters.
@@ -61,4 +65,10 @@ public interface JobMasterPartitionTracker
/** Get all the partitions under tracking. */
Collection<ResultPartitionDeploymentDescriptor> getAllTrackedPartitions();
+
+ void connectToResourceManager(ResourceManagerGateway
resourceManagerGateway);
+
+ /** Get the shuffle descriptors of the cluster partitions ordered by
partition number. */
+ List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+ IntermediateDataSetID intermediateDataSetID);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
index 0e6a6c38810..8c8069f9760 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
@@ -20,11 +20,15 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.util.Preconditions;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -52,6 +56,9 @@ public class JobMasterPartitionTrackerImpl
private final ShuffleMaster<?> shuffleMaster;
private final PartitionTrackerFactory.TaskExecutorGatewayLookup
taskExecutorGatewayLookup;
+ private ResourceManagerGateway resourceManagerGateway;
+ private final Map<IntermediateDataSetID, List<ShuffleDescriptor>>
+ clusterPartitionShuffleDescriptors;
public JobMasterPartitionTrackerImpl(
JobID jobId,
@@ -61,6 +68,7 @@ public class JobMasterPartitionTrackerImpl
this.jobId = Preconditions.checkNotNull(jobId);
this.shuffleMaster = Preconditions.checkNotNull(shuffleMaster);
this.taskExecutorGatewayLookup = taskExecutorGatewayLookup;
+ this.clusterPartitionShuffleDescriptors = new HashMap<>();
}
@Override
@@ -118,6 +126,35 @@ public class JobMasterPartitionTrackerImpl
return
partitionInfos.values().stream().map(PartitionInfo::getMetaInfo).collect(toList());
}
+ @Override
+ public void connectToResourceManager(ResourceManagerGateway
resourceManagerGateway) {
+ this.resourceManagerGateway = resourceManagerGateway;
+ }
+
+ @Override
+ public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+ IntermediateDataSetID intermediateDataSetID) {
+ return clusterPartitionShuffleDescriptors.computeIfAbsent(
+ intermediateDataSetID,
this::requestShuffleDescriptorsFromResourceManager);
+ }
+
+ private List<ShuffleDescriptor>
requestShuffleDescriptorsFromResourceManager(
+ IntermediateDataSetID intermediateDataSetID) {
+ Preconditions.checkNotNull(
+ resourceManagerGateway, "JobMaster is not connected to
ResourceManager");
+ try {
+ return this.resourceManagerGateway
+
.getClusterPartitionsShuffleDescriptors(intermediateDataSetID)
+ .get();
+ } catch (Throwable e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to get shuffle descriptors of intermediate
dataset %s from ResourceManager",
+ intermediateDataSetID),
+ e);
+ }
+ }
+
private void stopTrackingAndHandlePartitions(
Collection<ResultPartitionID> resultPartitionIds,
BiConsumer<ResourceID,
Collection<ResultPartitionDeploymentDescriptor>>
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java
index dc8fb41b1d3..b6ee0d765e9 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java
@@ -19,8 +19,10 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -64,4 +66,12 @@ public interface ResourceManagerPartitionTracker {
* @return tracked datasets
*/
Map<IntermediateDataSetID, DataSetMetaInfo> listDataSets();
+
+ /**
+ * Returns all the shuffle descriptors of cluster partitions for the
intermediate dataset.
+ *
+ * @param dataSetID The id of the intermediate dataset.
+ * @return the shuffle descriptors.
+ */
+ List<ShuffleDescriptor>
getClusterPartitionShuffleDescriptors(IntermediateDataSetID dataSetID);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImpl.java
index be8534a7723..72a322f0118 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImpl.java
@@ -20,16 +20,19 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -100,6 +103,17 @@ public class ResourceManagerPartitionTrackerImpl
implements ResourceManagerParti
return partitionReleaseCompletionFuture;
}
+ @Override
+ public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+ IntermediateDataSetID dataSetID) {
+ final DataSetMetaInfo dataSetMetaInfo =
this.dataSetMetaInfo.get(dataSetID);
+ if (dataSetMetaInfo == null) {
+ return Collections.emptyList();
+ }
+
+ return new
ArrayList<>(dataSetMetaInfo.getShuffleDescriptors().values());
+ }
+
private void internalProcessClusterPartitionReport(
ResourceID taskExecutorId, ClusterPartitionReport
clusterPartitionReport) {
final Set<IntermediateDataSetID> dataSetsWithLostPartitions =
@@ -194,13 +208,16 @@ public class ResourceManagerPartitionTrackerImpl
implements ResourceManagerParti
if (dataSetMetaInfo == null) {
return DataSetMetaInfo
.withoutNumRegisteredPartitions(
-
entry.getNumTotalPartitions());
+
entry.getNumTotalPartitions())
+ .addShuffleDescriptors(
+
entry.getShuffleDescriptors());
} else {
// double check that the meta
data is consistent
Preconditions.checkState(
dataSetMetaInfo.getNumTotalPartitions()
==
entry.getNumTotalPartitions());
- return dataSetMetaInfo;
+ return
dataSetMetaInfo.addShuffleDescriptors(
+
entry.getShuffleDescriptors());
}
}));
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionInfo.java
index db0618a8453..ddbfbc5fb78 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionInfo.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import java.util.Objects;
@@ -28,17 +29,17 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
/** Encapsulates meta-information the TaskExecutor requires to be kept for
each partition. */
public final class TaskExecutorPartitionInfo {
- private final ResultPartitionID resultPartitionId;
private final IntermediateDataSetID intermediateDataSetId;
+ private final ShuffleDescriptor shuffleDescriptor;
private final int numberOfPartitions;
public TaskExecutorPartitionInfo(
- ResultPartitionID resultPartitionId,
+ ShuffleDescriptor shuffleDescriptor,
IntermediateDataSetID intermediateDataSetId,
int numberOfPartitions) {
- this.resultPartitionId = checkNotNull(resultPartitionId);
this.intermediateDataSetId = checkNotNull(intermediateDataSetId);
+ this.shuffleDescriptor = checkNotNull(shuffleDescriptor);
checkArgument(numberOfPartitions > 0);
this.numberOfPartitions = numberOfPartitions;
}
@@ -48,7 +49,7 @@ public final class TaskExecutorPartitionInfo {
}
public ResultPartitionID getResultPartitionId() {
- return resultPartitionId;
+ return shuffleDescriptor.getResultPartitionID();
}
public int getNumberOfPartitions() {
@@ -77,8 +78,12 @@ public final class TaskExecutorPartitionInfo {
public static TaskExecutorPartitionInfo from(
ResultPartitionDeploymentDescriptor
resultPartitionDeploymentDescriptor) {
return new TaskExecutorPartitionInfo(
-
resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID(),
+ resultPartitionDeploymentDescriptor.getShuffleDescriptor(),
resultPartitionDeploymentDescriptor.getResultId(),
resultPartitionDeploymentDescriptor.getTotalNumberOfPartitions());
}
+
+ public ShuffleDescriptor getShuffleDescriptor() {
+ return shuffleDescriptor;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java
index c34f77000d4..e183ab9c2dd 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
import org.apache.flink.util.CollectionUtil;
@@ -26,7 +27,6 @@ import org.apache.flink.util.Preconditions;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -91,7 +91,7 @@ public class TaskExecutorPartitionTrackerImpl
clusterPartitions.computeIfAbsent(
dataSetMetaInfo.getIntermediateDataSetId(),
ignored -> new
DataSetEntry(dataSetMetaInfo.getNumberOfPartitions()));
-
dataSetEntry.addPartition(partitionTrackerEntry.getResultPartitionId());
+
dataSetEntry.addPartition(partitionTrackerEntry.getMetaInfo().getShuffleDescriptor());
}
}
@@ -121,8 +121,8 @@ public class TaskExecutorPartitionTrackerImpl
entry ->
new
ClusterPartitionReport.ClusterPartitionReportEntry(
entry.getKey(),
-
entry.getValue().getPartitionIds(),
-
entry.getValue().getTotalNumberOfPartitions()))
+
entry.getValue().getTotalNumberOfPartitions(),
+
entry.getValue().getShuffleDescriptors()))
.collect(Collectors.toList());
return new ClusterPartitionReport(reportEntries);
@@ -130,23 +130,28 @@ public class TaskExecutorPartitionTrackerImpl
private static class DataSetEntry {
- private final Set<ResultPartitionID> partitionIds = new HashSet<>();
+ private final Map<ResultPartitionID, ShuffleDescriptor>
shuffleDescriptors =
+ new HashMap<>();
private final int totalNumberOfPartitions;
private DataSetEntry(int totalNumberOfPartitions) {
this.totalNumberOfPartitions = totalNumberOfPartitions;
}
- void addPartition(ResultPartitionID resultPartitionId) {
- partitionIds.add(resultPartitionId);
+ void addPartition(ShuffleDescriptor shuffleDescriptor) {
+ shuffleDescriptors.put(shuffleDescriptor.getResultPartitionID(),
shuffleDescriptor);
}
public Set<ResultPartitionID> getPartitionIds() {
- return partitionIds;
+ return shuffleDescriptors.keySet();
}
public int getTotalNumberOfPartitions() {
return totalNumberOfPartitions;
}
+
+ public Map<ResultPartitionID, ShuffleDescriptor>
getShuffleDescriptors() {
+ return shuffleDescriptors;
+ }
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 843ed765daf..d54056ef4f3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -142,6 +142,11 @@ public class JobVertex implements java.io.Serializable {
*/
private String resultOptimizerProperties;
+ /**
+ * The intermediateDataSetId of the cached intermediate dataset that the
job vertex consumes.
+ */
+ private final List<IntermediateDataSetID> intermediateDataSetIdsToConsume
= new ArrayList<>();
+
//
--------------------------------------------------------------------------------------------
/**
@@ -467,10 +472,6 @@ public class JobVertex implements java.io.Serializable {
}
//
--------------------------------------------------------------------------------------------
- private IntermediateDataSet createAndAddResultDataSet(ResultPartitionType
partitionType) {
- return createAndAddResultDataSet(new IntermediateDataSetID(),
partitionType);
- }
-
public IntermediateDataSet createAndAddResultDataSet(
IntermediateDataSetID id, ResultPartitionType partitionType) {
@@ -481,8 +482,18 @@ public class JobVertex implements java.io.Serializable {
public JobEdge connectNewDataSetAsInput(
JobVertex input, DistributionPattern distPattern,
ResultPartitionType partitionType) {
+ return connectNewDataSetAsInput(
+ input, distPattern, partitionType, new
IntermediateDataSetID());
+ }
- IntermediateDataSet dataSet =
input.createAndAddResultDataSet(partitionType);
+ public JobEdge connectNewDataSetAsInput(
+ JobVertex input,
+ DistributionPattern distPattern,
+ ResultPartitionType partitionType,
+ IntermediateDataSetID intermediateDataSetId) {
+
+ IntermediateDataSet dataSet =
+ input.createAndAddResultDataSet(intermediateDataSetId,
partitionType);
JobEdge edge = new JobEdge(dataSet, this, distPattern);
this.inputs.add(edge);
@@ -568,6 +579,14 @@ public class JobVertex implements java.io.Serializable {
this.resultOptimizerProperties = resultOptimizerProperties;
}
+ public void addIntermediateDataSetIdToConsume(IntermediateDataSetID
intermediateDataSetId) {
+ intermediateDataSetIdsToConsume.add(intermediateDataSetId);
+ }
+
+ public List<IntermediateDataSetID> getIntermediateDataSetIdsToConsume() {
+ return intermediateDataSetIdsToConsume;
+ }
+
//
--------------------------------------------------------------------------------------------
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 54b1954cc57..476802ca7f7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1125,6 +1125,7 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId>
resourceManagerGateway, resourceManagerResourceId);
slotPoolService.connectToResourceManager(resourceManagerGateway);
+ partitionTracker.connectToResourceManager(resourceManagerGateway);
resourceManagerHeartbeatManager.monitorTarget(
resourceManagerResourceId,
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3cd99ca0cf7..634b56f1e32 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.FileType;
@@ -72,6 +73,7 @@ import
org.apache.flink.runtime.taskexecutor.TaskExecutorHeartbeatPayload;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationRejection;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
+import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
@@ -81,6 +83,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -777,6 +780,22 @@ public abstract class ResourceManager<WorkerType extends
ResourceIDRetrievable>
return clusterPartitionTracker.releaseClusterPartitions(dataSetId);
}
+ @Override
+ public CompletableFuture<Void> reportClusterPartitions(
+ ResourceID taskExecutorId, ClusterPartitionReport
clusterPartitionReport) {
+ clusterPartitionTracker.processTaskExecutorClusterPartitionReport(
+ taskExecutorId, clusterPartitionReport);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<List<ShuffleDescriptor>>
getClusterPartitionsShuffleDescriptors(
+ IntermediateDataSetID intermediateDataSetID) {
+ return CompletableFuture.completedFuture(
+ clusterPartitionTracker.getClusterPartitionShuffleDescriptors(
+ intermediateDataSetID));
+ }
+
@Override
public CompletableFuture<Map<IntermediateDataSetID, DataSetMetaInfo>>
listDataSets() {
return
CompletableFuture.completedFuture(clusterPartitionTracker.listDataSets());
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/CachedIntermediateDataSetCorruptedException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/CachedIntermediateDataSetCorruptedException.java
new file mode 100644
index 00000000000..1740a79fd31
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/CachedIntermediateDataSetCorruptedException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.throwable.ThrowableAnnotation;
+import org.apache.flink.runtime.throwable.ThrowableType;
+
+import java.util.List;
+
+/** Indicates some task fail to consume cached intermediate dataset. */
+@ThrowableAnnotation(ThrowableType.NonRecoverableError)
+public class CachedIntermediateDataSetCorruptedException extends JobException {
+ private final List<IntermediateDataSetID> corruptedIntermediateDataSetID;
+
+ public CachedIntermediateDataSetCorruptedException(
+ Throwable cause, List<IntermediateDataSetID>
corruptedIntermediateDataSetID) {
+ super(
+ String.format(
+ "Corrupted intermediate dataset IDs: %s",
corruptedIntermediateDataSetID),
+ cause);
+ this.corruptedIntermediateDataSetID = corruptedIntermediateDataSetID;
+ }
+
+ public List<IntermediateDataSetID> getCorruptedIntermediateDataSetID() {
+ return corruptedIntermediateDataSetID;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 9a1f88feb66..e7869b6c9a9 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -36,6 +36,9 @@ import
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHa
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
import
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionException;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
@@ -62,6 +65,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -293,15 +297,39 @@ public class DefaultScheduler extends SchedulerBase
implements SchedulerOperatio
private void handleTaskFailure(
final ExecutionVertexID executionVertexId, @Nullable final
Throwable error) {
+ Throwable revisedError =
+ maybeTranslateToCachedIntermediateDataSetException(error,
executionVertexId);
final long timestamp = System.currentTimeMillis();
- setGlobalFailureCause(error, timestamp);
- notifyCoordinatorsAboutTaskFailure(executionVertexId, error);
+ setGlobalFailureCause(revisedError, timestamp);
+ notifyCoordinatorsAboutTaskFailure(executionVertexId, revisedError);
final FailureHandlingResult failureHandlingResult =
executionFailureHandler.getFailureHandlingResult(
- executionVertexId, error, timestamp);
+ executionVertexId, revisedError, timestamp);
maybeRestartTasks(failureHandlingResult);
}
+ private Throwable maybeTranslateToCachedIntermediateDataSetException(
+ @Nullable Throwable cause, ExecutionVertexID failedVertex) {
+ if (!(cause instanceof PartitionException)) {
+ return cause;
+ }
+
+ final List<IntermediateDataSetID> intermediateDataSetIdsToConsume =
+ getExecutionJobVertex(failedVertex.getJobVertexId())
+ .getJobVertex()
+ .getIntermediateDataSetIdsToConsume();
+ final IntermediateResultPartitionID failedPartitionId =
+ ((PartitionException) cause).getPartitionId().getPartitionId();
+
+ if (!intermediateDataSetIdsToConsume.contains(
+ failedPartitionId.getIntermediateDataSetID())) {
+ return cause;
+ }
+
+ return new CachedIntermediateDataSetCorruptedException(
+ cause,
Collections.singletonList(failedPartitionId.getIntermediateDataSetID()));
+ }
+
private void notifyCoordinatorsAboutTaskFailure(
final ExecutionVertexID executionVertexId, @Nullable final
Throwable error) {
final ExecutionJobVertex jobVertex =
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
index 790f3b436ae..6e4672c48e9 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
@@ -19,8 +19,10 @@
package org.apache.flink.runtime.scheduler.strategy;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.util.Preconditions;
import java.util.Collections;
import java.util.Iterator;
@@ -38,11 +40,16 @@ public class ConsumedPartitionGroup implements
Iterable<IntermediateResultPartit
private final IntermediateDataSetID intermediateDataSetID;
- private ConsumedPartitionGroup(List<IntermediateResultPartitionID>
resultPartitions) {
+ private final ResultPartitionType resultPartitionType;
+
+ private ConsumedPartitionGroup(
+ List<IntermediateResultPartitionID> resultPartitions,
+ ResultPartitionType resultPartitionType) {
checkArgument(
resultPartitions.size() > 0,
"The size of result partitions in the ConsumedPartitionGroup
should be larger than 0.");
this.intermediateDataSetID =
resultPartitions.get(0).getIntermediateDataSetID();
+ this.resultPartitionType =
Preconditions.checkNotNull(resultPartitionType);
// Sanity check: all the partitions in one ConsumedPartitionGroup
should have the same
// IntermediateDataSetID
@@ -56,13 +63,16 @@ public class ConsumedPartitionGroup implements
Iterable<IntermediateResultPartit
}
public static ConsumedPartitionGroup fromMultiplePartitions(
- List<IntermediateResultPartitionID> resultPartitions) {
- return new ConsumedPartitionGroup(resultPartitions);
+ List<IntermediateResultPartitionID> resultPartitions,
+ ResultPartitionType resultPartitionType) {
+ return new ConsumedPartitionGroup(resultPartitions,
resultPartitionType);
}
public static ConsumedPartitionGroup fromSinglePartition(
- IntermediateResultPartitionID resultPartition) {
- return new
ConsumedPartitionGroup(Collections.singletonList(resultPartition));
+ IntermediateResultPartitionID resultPartition,
+ ResultPartitionType resultPartitionType) {
+ return new ConsumedPartitionGroup(
+ Collections.singletonList(resultPartition),
resultPartitionType);
}
@Override
@@ -103,4 +113,8 @@ public class ConsumedPartitionGroup implements
Iterable<IntermediateResultPartit
public boolean areAllPartitionsFinished() {
return unfinishedPartitions.get() == 0;
}
+
+ public ResultPartitionType getResultPartitionType() {
+ return resultPartitionType;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 8e7ec03781a..55bc617f5b5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -907,7 +907,12 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
try {
partitionTracker.stopTrackingAndReleaseJobPartitions(partitionToRelease);
partitionTracker.promoteJobPartitions(partitionsToPromote);
-
+ if (establishedResourceManagerConnection != null) {
+ establishedResourceManagerConnection
+ .getResourceManagerGateway()
+ .reportClusterPartitions(
+ getResourceID(),
partitionTracker.createClusterPartitionReport());
+ }
closeJobManagerConnectionIfNoAllocatedResources(jobId);
} catch (Throwable t) {
// TODO: Do we still need this catch branch?
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/ClusterPartitionReport.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/ClusterPartitionReport.java
index 72f54808c54..a85ccf6efc7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/ClusterPartitionReport.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/ClusterPartitionReport.java
@@ -19,10 +19,12 @@ package org.apache.flink.runtime.taskexecutor.partition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
import java.util.Collection;
+import java.util.Map;
import java.util.Set;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -56,22 +58,22 @@ public class ClusterPartitionReport implements Serializable
{
private static final long serialVersionUID = -666517548300250601L;
private final IntermediateDataSetID dataSetId;
- private final Set<ResultPartitionID> hostedPartitions;
+ private final Map<ResultPartitionID, ShuffleDescriptor>
shuffleDescriptors;
private final int numTotalPartitions;
public ClusterPartitionReportEntry(
IntermediateDataSetID dataSetId,
- Set<ResultPartitionID> hostedPartitions,
- int numTotalPartitions) {
+ int numTotalPartitions,
+ Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors) {
Preconditions.checkNotNull(dataSetId);
- Preconditions.checkNotNull(hostedPartitions);
- Preconditions.checkArgument(!hostedPartitions.isEmpty());
+ Preconditions.checkNotNull(shuffleDescriptors);
+ Preconditions.checkArgument(!shuffleDescriptors.isEmpty());
Preconditions.checkArgument(numTotalPartitions > 0);
- Preconditions.checkState(hostedPartitions.size() <=
numTotalPartitions);
+ Preconditions.checkState(shuffleDescriptors.size() <=
numTotalPartitions);
this.dataSetId = dataSetId;
- this.hostedPartitions = hostedPartitions;
this.numTotalPartitions = numTotalPartitions;
+ this.shuffleDescriptors = shuffleDescriptors;
}
public IntermediateDataSetID getDataSetId() {
@@ -79,11 +81,15 @@ public class ClusterPartitionReport implements Serializable
{
}
public Set<ResultPartitionID> getHostedPartitions() {
- return hostedPartitions;
+ return shuffleDescriptors.keySet();
}
public int getNumTotalPartitions() {
return numTotalPartitions;
}
+
+ public Map<ResultPartitionID, ShuffleDescriptor>
getShuffleDescriptors() {
+ return shuffleDescriptors;
+ }
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
index 75bceb6882b..7dba9971c56 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import
org.apache.flink.runtime.scheduler.CachedIntermediateDataSetCorruptedException;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
@@ -161,7 +162,7 @@ public class TaskDeploymentDescriptorFactoryTest extends
TestLogger {
}
private static TaskDeploymentDescriptor
createTaskDeploymentDescriptor(ExecutionVertex ev)
- throws IOException {
+ throws IOException, CachedIntermediateDataSetCorruptedException {
return TaskDeploymentDescriptorFactory.fromExecutionVertex(ev)
.createDeploymentDescriptor(new AllocationID(), null,
Collections.emptyList());
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java
index 1f287c4a777..83af9e0c5bb 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
@@ -35,6 +36,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
@@ -297,6 +300,47 @@ public class JobMasterPartitionTrackerImplTest extends
TestLogger {
externallyReleasedPartitions,
containsInAnyOrder(jobPartitionId0, jobPartitionId1));
}
+ @Test
+ public void testGetShuffleDescriptors() {
+ final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
+ IntermediateDataSetID intermediateDataSetId = new
IntermediateDataSetID();
+
+ final Queue<ReleaseCall> taskExecutorReleaseCalls = new
ArrayBlockingQueue<>(4);
+ final JobMasterPartitionTrackerImpl partitionTracker =
+ new JobMasterPartitionTrackerImpl(
+ new JobID(),
+ shuffleMaster,
+ resourceId ->
+ Optional.of(
+ createTaskExecutorGateway(
+ resourceId,
taskExecutorReleaseCalls)));
+
+ TestingResourceManagerGateway resourceManagerGateway = new
TestingResourceManagerGateway();
+ partitionTracker.connectToResourceManager(resourceManagerGateway);
+
partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetId);
+
+ assertThat(
+ resourceManagerGateway.requestedIntermediateDataSetIds,
+ contains(intermediateDataSetId));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testGetShuffleDescriptorsBeforeConnectToResourceManager() {
+ final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
+ IntermediateDataSetID intermediateDataSetId = new
IntermediateDataSetID();
+
+ final Queue<ReleaseCall> taskExecutorReleaseCalls = new
ArrayBlockingQueue<>(4);
+ final JobMasterPartitionTrackerImpl partitionTracker =
+ new JobMasterPartitionTrackerImpl(
+ new JobID(),
+ shuffleMaster,
+ resourceId ->
+ Optional.of(
+ createTaskExecutorGateway(
+ resourceId,
taskExecutorReleaseCalls)));
+
partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetId);
+ }
+
private static TaskExecutorGateway createTaskExecutorGateway(
ResourceID taskExecutorId, Collection<ReleaseCall>
releaseOrPromoteCalls) {
return new TestingTaskExecutorGatewayBuilder()
@@ -329,6 +373,20 @@ public class JobMasterPartitionTrackerImplTest extends
TestLogger {
}
}
+ private static class TestingResourceManagerGateway
+ extends
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway {
+
+ private final List<IntermediateDataSetID>
requestedIntermediateDataSetIds =
+ new ArrayList<>();
+
+ @Override
+ public CompletableFuture<List<ShuffleDescriptor>>
getClusterPartitionsShuffleDescriptors(
+ IntermediateDataSetID intermediateDataSetID) {
+ requestedIntermediateDataSetIds.add(intermediateDataSetID);
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+ }
+
private static class ReleaseCall {
private final ResourceID taskExecutorId;
private final JobID jobId;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java
index 8572f2e63b0..087a29613e1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java
@@ -19,9 +19,13 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
/** No-op implementation of {@link JobMasterPartitionTracker}. */
public enum NoOpJobMasterPartitionTracker implements JobMasterPartitionTracker
{
@@ -53,6 +57,15 @@ public enum NoOpJobMasterPartitionTracker implements
JobMasterPartitionTracker {
return Collections.emptyList();
}
+ @Override
+ public void connectToResourceManager(ResourceManagerGateway
resourceManagerGateway) {}
+
+ @Override
+ public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+ IntermediateDataSetID intermediateDataSetID) {
+ return Collections.emptyList();
+ }
+
@Override
public Collection<PartitionTrackerEntry<ResourceID,
ResultPartitionDeploymentDescriptor>>
stopTrackingPartitions(Collection<ResultPartitionID>
resultPartitionIds) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResourceManagerPartitionTracker.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResourceManagerPartitionTracker.java
index 003cdb76f1a..be31e2bc131 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResourceManagerPartitionTracker.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResourceManagerPartitionTracker.java
@@ -19,9 +19,11 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -49,6 +51,12 @@ public enum NoOpResourceManagerPartitionTracker implements
ResourceManagerPartit
return Collections.emptyMap();
}
+ @Override
+ public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+ IntermediateDataSetID dataSetID) {
+ return Collections.emptyList();
+ }
+
@SuppressWarnings(
"unused") // unused parameter allows usage as a
ResourceManagerPartitionTrackerFactory
public static ResourceManagerPartitionTracker get(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImplTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImplTest.java
index c3925894d3e..83246ac5b20 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImplTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImplTest.java
@@ -19,7 +19,10 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
import org.apache.flink.util.TestLogger;
@@ -28,11 +31,12 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
@@ -187,6 +191,39 @@ public class ResourceManagerPartitionTrackerImplTest
extends TestLogger {
assertThat(tracker.areAllMapsEmpty(), is(true));
}
+ @Test
+ public void testGetClusterPartitionShuffleDescriptors() {
+ final ResourceManagerPartitionTrackerImpl tracker =
+ new ResourceManagerPartitionTrackerImpl(new
TestClusterPartitionReleaser());
+
+ assertThat(tracker.listDataSets().size(), is(0));
+
+ List<ResultPartitionID> resultPartitionIDS = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ resultPartitionIDS.add(
+ new ResultPartitionID(
+ new IntermediateResultPartitionID(DATA_SET_ID, i),
+ ExecutionAttemptID.randomId()));
+ }
+
+ for (ResultPartitionID resultPartitionID : resultPartitionIDS) {
+ report(tracker, TASK_EXECUTOR_ID_1, DATA_SET_ID, 100,
resultPartitionID);
+ }
+
+ final List<ShuffleDescriptor> shuffleDescriptors =
+ tracker.getClusterPartitionShuffleDescriptors(DATA_SET_ID);
+ assertThat(shuffleDescriptors.size(), is(100));
+ assertThat(
+ shuffleDescriptors.stream()
+ .map(ShuffleDescriptor::getResultPartitionID)
+ .collect(Collectors.toList()),
+ contains(resultPartitionIDS.toArray()));
+
+ reportEmpty(tracker, TASK_EXECUTOR_ID_1);
+ reportEmpty(tracker, TASK_EXECUTOR_ID_2);
+ assertThat(tracker.areAllMapsEmpty(), is(true));
+ }
+
private static void reportEmpty(
ResourceManagerPartitionTracker tracker, ResourceID...
taskExecutorIds) {
for (ResourceID taskExecutorId : taskExecutorIds) {
@@ -210,12 +247,34 @@ public class ResourceManagerPartitionTrackerImplTest
extends TestLogger {
IntermediateDataSetID dataSetId,
int numTotalPartitions,
ResultPartitionID... partitionId) {
+ final Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors =
+ Arrays.stream(partitionId)
+ .map(TestShuffleDescriptor::new)
+ .collect(
+ Collectors.toMap(
+
TestShuffleDescriptor::getResultPartitionID, d -> d));
return new ClusterPartitionReport(
Collections.singletonList(
new ClusterPartitionReport.ClusterPartitionReportEntry(
- dataSetId,
- new HashSet<>(Arrays.asList(partitionId)),
- numTotalPartitions)));
+ dataSetId, numTotalPartitions,
shuffleDescriptors)));
+ }
+
+ private static class TestShuffleDescriptor implements ShuffleDescriptor {
+ private final ResultPartitionID resultPartitionID;
+
+ TestShuffleDescriptor(ResultPartitionID resultPartitionID) {
+ this.resultPartitionID = resultPartitionID;
+ }
+
+ @Override
+ public ResultPartitionID getResultPartitionID() {
+ return resultPartitionID;
+ }
+
+ @Override
+ public Optional<ResourceID> storesLocalResourcesOn() {
+ return Optional.empty();
+ }
}
private static class TestClusterPartitionReleaser
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImplTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImplTest.java
index a8272949996..800a638176a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImplTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImplTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -26,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
@@ -39,6 +41,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import static org.hamcrest.CoreMatchers.not;
@@ -66,10 +69,16 @@ public class TaskExecutorPartitionTrackerImplTest extends
TestLogger {
partitionTracker.startTrackingPartition(
jobId,
- new TaskExecutorPartitionInfo(clusterPartitionId, dataSetId,
numberOfPartitions));
+ new TaskExecutorPartitionInfo(
+ new TestingShuffleDescriptor(clusterPartitionId),
+ dataSetId,
+ numberOfPartitions));
partitionTracker.startTrackingPartition(
jobId,
- new TaskExecutorPartitionInfo(jobPartitionId, dataSetId,
numberOfPartitions + 1));
+ new TaskExecutorPartitionInfo(
+ new TestingShuffleDescriptor(jobPartitionId),
+ dataSetId,
+ numberOfPartitions + 1));
partitionTracker.promoteJobPartitions(Collections.singleton(clusterPartitionId));
@@ -97,10 +106,16 @@ public class TaskExecutorPartitionTrackerImplTest extends
TestLogger {
new
TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment);
partitionTracker.startTrackingPartition(
new JobID(),
- new TaskExecutorPartitionInfo(resultPartitionId1, new
IntermediateDataSetID(), 1));
+ new TaskExecutorPartitionInfo(
+ new TestingShuffleDescriptor(resultPartitionId1),
+ new IntermediateDataSetID(),
+ 1));
partitionTracker.startTrackingPartition(
new JobID(),
- new TaskExecutorPartitionInfo(resultPartitionId2, new
IntermediateDataSetID(), 1));
+ new TaskExecutorPartitionInfo(
+ new TestingShuffleDescriptor(resultPartitionId2),
+ new IntermediateDataSetID(),
+ 1));
partitionTracker.stopTrackingAndReleaseJobPartitions(
Collections.singleton(resultPartitionId1));
@@ -123,10 +138,16 @@ public class TaskExecutorPartitionTrackerImplTest extends
TestLogger {
new
TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment);
partitionTracker.startTrackingPartition(
jobId1,
- new TaskExecutorPartitionInfo(resultPartitionId1, new
IntermediateDataSetID(), 1));
+ new TaskExecutorPartitionInfo(
+ new TestingShuffleDescriptor(resultPartitionId1),
+ new IntermediateDataSetID(),
+ 1));
partitionTracker.startTrackingPartition(
jobId2,
- new TaskExecutorPartitionInfo(resultPartitionId2, new
IntermediateDataSetID(), 1));
+ new TaskExecutorPartitionInfo(
+ new TestingShuffleDescriptor(resultPartitionId2),
+ new IntermediateDataSetID(),
+ 1));
partitionTracker.stopTrackingAndReleaseJobPartitionsFor(jobId1);
assertThat(shuffleReleaseFuture.get(), hasItem(resultPartitionId1));
@@ -147,10 +168,16 @@ public class TaskExecutorPartitionTrackerImplTest extends
TestLogger {
new
TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment);
partitionTracker.startTrackingPartition(
jobId,
- new TaskExecutorPartitionInfo(resultPartitionId1, new
IntermediateDataSetID(), 1));
+ new TaskExecutorPartitionInfo(
+ new TestingShuffleDescriptor(resultPartitionId1),
+ new IntermediateDataSetID(),
+ 1));
partitionTracker.startTrackingPartition(
jobId,
- new TaskExecutorPartitionInfo(resultPartitionId2, new
IntermediateDataSetID(), 1));
+ new TaskExecutorPartitionInfo(
+ new TestingShuffleDescriptor(resultPartitionId2),
+ new IntermediateDataSetID(),
+ 1));
partitionTracker.promoteJobPartitions(Collections.singleton(resultPartitionId1));
partitionTracker.stopTrackingAndReleaseJobPartitionsFor(jobId);
@@ -171,10 +198,16 @@ public class TaskExecutorPartitionTrackerImplTest extends
TestLogger {
new
TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment);
partitionTracker.startTrackingPartition(
new JobID(),
- new TaskExecutorPartitionInfo(resultPartitionId1, new
IntermediateDataSetID(), 1));
+ new TaskExecutorPartitionInfo(
+ new TestingShuffleDescriptor(resultPartitionId1),
+ new IntermediateDataSetID(),
+ 1));
partitionTracker.startTrackingPartition(
new JobID(),
- new TaskExecutorPartitionInfo(resultPartitionId2, new
IntermediateDataSetID(), 1));
+ new TaskExecutorPartitionInfo(
+ new TestingShuffleDescriptor(resultPartitionId2),
+ new IntermediateDataSetID(),
+ 1));
partitionTracker.promoteJobPartitions(Collections.singleton(resultPartitionId1));
partitionTracker.stopTrackingAndReleaseAllClusterPartitions();
@@ -197,9 +230,13 @@ public class TaskExecutorPartitionTrackerImplTest extends
TestLogger {
final TaskExecutorPartitionTracker partitionTracker =
new
TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment);
partitionTracker.startTrackingPartition(
- new JobID(), new TaskExecutorPartitionInfo(resultPartitionId1,
dataSetId1, 1));
+ new JobID(),
+ new TaskExecutorPartitionInfo(
+ new TestingShuffleDescriptor(resultPartitionId1),
dataSetId1, 1));
partitionTracker.startTrackingPartition(
- new JobID(), new TaskExecutorPartitionInfo(resultPartitionId2,
dataSetId2, 1));
+ new JobID(),
+ new TaskExecutorPartitionInfo(
+ new TestingShuffleDescriptor(resultPartitionId2),
dataSetId2, 1));
partitionTracker.promoteJobPartitions(Collections.singleton(resultPartitionId1));
partitionTracker.stopTrackingAndReleaseClusterPartitions(Collections.singleton(dataSetId1));
@@ -268,4 +305,23 @@ public class TaskExecutorPartitionTrackerImplTest extends
TestLogger {
backingShuffleEnvironment.close();
}
}
+
+ private static class TestingShuffleDescriptor implements ShuffleDescriptor
{
+ private final ResultPartitionID resultPartitionID;
+
+ private TestingShuffleDescriptor(ResultPartitionID resultPartitionID) {
+
+ this.resultPartitionID = resultPartitionID;
+ }
+
+ @Override
+ public ResultPartitionID getResultPartitionID() {
+ return resultPartitionID;
+ }
+
+ @Override
+ public Optional<ResourceID> storesLocalResourcesOn() {
+ return Optional.empty();
+ }
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java
index 8aecfd88180..8378befcfe0 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java
@@ -19,9 +19,13 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -126,6 +130,15 @@ public class TestingJobMasterPartitionTracker implements
JobMasterPartitionTrack
return getAllTrackedPartitionsSupplier.get();
}
+ @Override
+ public void connectToResourceManager(ResourceManagerGateway
resourceManagerGateway) {}
+
+ @Override
+ public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+ IntermediateDataSetID intermediateDataSetID) {
+ return Collections.emptyList();
+ }
+
@Override
public boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId)
{
return isTrackingPartitionsForFunction.apply(producingTaskExecutorId);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobIntermediateDatasetReuseTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobIntermediateDatasetReuseTest.java
new file mode 100644
index 00000000000..907bef037ae
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobIntermediateDatasetReuseTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import
org.apache.flink.runtime.scheduler.CachedIntermediateDataSetCorruptedException;
+import org.apache.flink.types.IntValue;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/** Integration tests for reusing persisted intermediate dataset */
+public class JobIntermediateDatasetReuseTest {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(JobIntermediateDatasetReuseTest.class);
+
+ @Test
+ public void testClusterPartitionReuse() throws Exception {
+ internalTestClusterPartitionReuse(1, 1, jobResult ->
assertTrue(jobResult.isSuccess()));
+ }
+
+ @Test
+ public void testClusterPartitionReuseMultipleParallelism() throws
Exception {
+ internalTestClusterPartitionReuse(64, 64, jobResult ->
assertTrue(jobResult.isSuccess()));
+ }
+
+ @Test
+ public void
testClusterPartitionReuseWithMoreConsumerParallelismThrowException()
+ throws Exception {
+ internalTestClusterPartitionReuse(
+ 1,
+ 2,
+ jobResult -> {
+ assertFalse(jobResult.isSuccess());
+
assertNotNull(getCachedIntermediateDataSetCorruptedException(jobResult));
+ });
+ }
+
+ @Test
+ public void
testClusterPartitionReuseWithLessConsumerParallelismThrowException()
+ throws Exception {
+ internalTestClusterPartitionReuse(
+ 2,
+ 1,
+ jobResult -> {
+ assertFalse(jobResult.isSuccess());
+
assertNotNull(getCachedIntermediateDataSetCorruptedException(jobResult));
+ });
+ }
+
+ private void internalTestClusterPartitionReuse(
+ int producerParallelism,
+ int consumerParallelism,
+ Consumer<JobResult> jobResultVerification)
+ throws Exception {
+ final TestingMiniClusterConfiguration miniClusterConfiguration =
+ TestingMiniClusterConfiguration.newBuilder().build();
+
+ try (TestingMiniCluster miniCluster =
+
TestingMiniCluster.newBuilder(miniClusterConfiguration).build()) {
+ miniCluster.start();
+
+ IntermediateDataSetID intermediateDataSetID = new
IntermediateDataSetID();
+ final JobGraph firstJobGraph =
+ createFirstJobGraph(producerParallelism,
intermediateDataSetID);
+ miniCluster.submitJob(firstJobGraph).get();
+ CompletableFuture<JobResult> jobResultFuture =
+ miniCluster.requestJobResult(firstJobGraph.getJobID());
+ JobResult jobResult = jobResultFuture.get();
+ assertTrue(jobResult.isSuccess());
+
+ final JobGraph secondJobGraph =
+ createSecondJobGraph(consumerParallelism,
intermediateDataSetID);
+ miniCluster.submitJob(secondJobGraph).get();
+ jobResultFuture =
miniCluster.requestJobResult(secondJobGraph.getJobID());
+ jobResult = jobResultFuture.get();
+ jobResultVerification.accept(jobResult);
+ }
+ }
+
+ @Test
+ public void testClusterPartitionReuseWithTMFail() throws Exception {
+ final TestingMiniClusterConfiguration miniClusterConfiguration =
+ TestingMiniClusterConfiguration.newBuilder().build();
+
+ try (TestingMiniCluster miniCluster =
+
TestingMiniCluster.newBuilder(miniClusterConfiguration).build()) {
+ miniCluster.start();
+
+ IntermediateDataSetID intermediateDataSetID = new
IntermediateDataSetID();
+ final JobGraph firstJobGraph = createFirstJobGraph(1,
intermediateDataSetID);
+ miniCluster.submitJob(firstJobGraph).get();
+ CompletableFuture<JobResult> jobResultFuture =
+ miniCluster.requestJobResult(firstJobGraph.getJobID());
+ JobResult jobResult = jobResultFuture.get();
+ assertTrue(jobResult.isSuccess());
+
+ miniCluster.terminateTaskManager(0);
+ miniCluster.startTaskManager();
+
+ final JobGraph secondJobGraph = createSecondJobGraph(1,
intermediateDataSetID);
+ final ExecutionConfig executionConfig = new ExecutionConfig();
+
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1024,
1000));
+ secondJobGraph.setExecutionConfig(executionConfig);
+ miniCluster.submitJob(secondJobGraph).get();
+ jobResultFuture =
miniCluster.requestJobResult(secondJobGraph.getJobID());
+ jobResult = jobResultFuture.get();
+ assertFalse(jobResult.isSuccess());
+ final CachedIntermediateDataSetCorruptedException exception =
+ getCachedIntermediateDataSetCorruptedException(jobResult);
+ assertNotNull(exception);
+ assertEquals(
+ intermediateDataSetID,
exception.getCorruptedIntermediateDataSetID().get(0));
+
+ firstJobGraph.setJobID(new JobID());
+ miniCluster.submitJob(firstJobGraph).get();
+ jobResultFuture =
miniCluster.requestJobResult(firstJobGraph.getJobID());
+ jobResult = jobResultFuture.get();
+ assertTrue(jobResult.isSuccess());
+
+ secondJobGraph.setJobID(new JobID());
+ miniCluster.submitJob(secondJobGraph).get();
+ jobResultFuture =
miniCluster.requestJobResult(secondJobGraph.getJobID());
+ jobResult = jobResultFuture.get();
+ assertTrue(jobResult.isSuccess());
+ }
+ }
+
+ private CachedIntermediateDataSetCorruptedException
+ getCachedIntermediateDataSetCorruptedException(JobResult
jobResult) {
+ assertTrue(jobResult.getSerializedThrowable().isPresent());
+ Throwable throwable =
+ jobResult
+ .getSerializedThrowable()
+ .get()
+
.deserializeError(Thread.currentThread().getContextClassLoader());
+ while (throwable != null) {
+ if (throwable instanceof
CachedIntermediateDataSetCorruptedException) {
+ return (CachedIntermediateDataSetCorruptedException) throwable;
+ }
+ throwable = throwable.getCause();
+ }
+ return null;
+ }
+
+ private JobGraph createSecondJobGraph(
+ int parallelism, IntermediateDataSetID intermediateDataSetID) {
+ final JobVertex receiver = new JobVertex("Receiver 2", null);
+ receiver.setParallelism(parallelism);
+ receiver.setInvokableClass(Receiver.class);
+ receiver.addIntermediateDataSetIdToConsume(intermediateDataSetID);
+
+ return new JobGraph(null, "Second Job", receiver);
+ }
+
+ private JobGraph createFirstJobGraph(
+ int parallelism, IntermediateDataSetID intermediateDataSetID) {
+ final JobVertex sender = new JobVertex("Sender");
+ sender.setParallelism(parallelism);
+ sender.setInvokableClass(Sender.class);
+
+ final JobVertex receiver = new JobVertex("Receiver");
+ receiver.setParallelism(parallelism);
+ receiver.setInvokableClass(Receiver.class);
+
+ receiver.connectNewDataSetAsInput(
+ sender,
+ DistributionPattern.POINTWISE,
+ ResultPartitionType.BLOCKING_PERSISTENT,
+ intermediateDataSetID);
+
+ return new JobGraph(null, "First Job", sender, receiver);
+ }
+
+ /**
+ * Basic sender {@link AbstractInvokable} which sends 100 record base on
its index to down
+ * stream.
+ */
+ public static class Sender extends AbstractInvokable {
+
+ public Sender(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ int index = getIndexInSubtaskGroup();
+ final RecordWriter<IntValue> writer =
+ new
RecordWriterBuilder<IntValue>().build(getEnvironment().getWriter(0));
+
+ try {
+ for (int i = index; i < index + 100; ++i) {
+ writer.emit(new IntValue(i));
+ LOG.debug("Sender({}) emit {}", index, i);
+ }
+ writer.flushAll();
+ } finally {
+ writer.close();
+ }
+ }
+ }
+
+ /**
+ * Basic receiver {@link AbstractInvokable} which verifies the sent
elements from the {@link
+ * Sender}.
+ */
+ public static class Receiver extends AbstractInvokable {
+
+ public Receiver(Environment environment) {
+ super(environment);
+ }
+
+ @Override
+ public void invoke() throws Exception {
+ int index = getIndexInSubtaskGroup();
+ final RecordReader<IntValue> reader =
+ new RecordReader<>(
+ getEnvironment().getInputGate(0),
+ IntValue.class,
+
getEnvironment().getTaskManagerInfo().getTmpDirectories());
+ for (int i = index; i < index + 100; ++i) {
+ final int value = reader.next().getValue();
+ LOG.debug("Receiver({}) received {}", index, value);
+ assertEquals(i, value);
+ }
+
+ assertNull(reader.next());
+ }
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
index 997a3ba59a8..c527fdeca31 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
@@ -27,6 +27,7 @@ import
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorHeartbeatPayload;
@@ -45,10 +46,12 @@ import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.instanceOf;
@@ -226,14 +229,20 @@ public class ResourceManagerPartitionLifecycleTest
extends TestLogger {
IntermediateDataSetID dataSetId,
int numTotalPartitions,
ResultPartitionID... partitionIds) {
+
+ final Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors =
+ Arrays.stream(partitionIds)
+ .map(TestingShuffleDescriptor::new)
+ .collect(
+ Collectors.toMap(
+
TestingShuffleDescriptor::getResultPartitionID, d -> d));
+
return new TaskExecutorHeartbeatPayload(
new SlotReport(),
new ClusterPartitionReport(
Collections.singletonList(
new
ClusterPartitionReport.ClusterPartitionReportEntry(
- dataSetId,
- new
HashSet<>(Arrays.asList(partitionIds)),
- numTotalPartitions))));
+ dataSetId, numTotalPartitions,
shuffleDescriptors))));
}
@FunctionalInterface
@@ -249,4 +258,23 @@ public class ResourceManagerPartitionLifecycleTest extends
TestLogger {
ResourceID taskExecutorId2)
throws Exception;
}
+
+ private static class TestingShuffleDescriptor implements ShuffleDescriptor
{
+
+ private final ResultPartitionID resultPartitionID;
+
+ private TestingShuffleDescriptor(ResultPartitionID resultPartitionID) {
+ this.resultPartitionID = resultPartitionID;
+ }
+
+ @Override
+ public ResultPartitionID getResultPartitionID() {
+ return resultPartitionID;
+ }
+
+ @Override
+ public Optional<ResourceID> storesLocalResourcesOn() {
+ return Optional.empty();
+ }
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index b3a38bd8140..9e9740245e5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -46,18 +46,21 @@ import
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorEx
import org.apache.flink.runtime.rest.messages.LogInfo;
import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.FileType;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorHeartbeatPayload;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
+import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.QuadFunction;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -492,4 +495,16 @@ public class TestingResourceManagerGateway implements
ResourceManagerGateway {
IntermediateDataSetID dataSetToRelease) {
return CompletableFuture.completedFuture(null);
}
+
+ @Override
+ public CompletableFuture<Void> reportClusterPartitions(
+ ResourceID taskExecutorId, ClusterPartitionReport
clusterPartitionReport) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<List<ShuffleDescriptor>>
getClusterPartitionsShuffleDescriptors(
+ IntermediateDataSetID intermediateDataSetID) {
+ return CompletableFuture.completedFuture(null);
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
index 491740f9bd6..f97116e58b2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
@@ -81,7 +81,9 @@ public class DefaultExecutionVertexTest extends TestLogger {
List<ConsumedPartitionGroup> consumedPartitionGroups =
Collections.singletonList(
-
ConsumedPartitionGroup.fromSinglePartition(intermediateResultPartitionId));
+ ConsumedPartitionGroup.fromSinglePartition(
+ intermediateResultPartitionId,
+ schedulingResultPartition.getResultType()));
Map<IntermediateResultPartitionID, DefaultResultPartition>
resultPartitionById =
Collections.singletonMap(intermediateResultPartitionId,
schedulingResultPartition);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index ce7936757b8..c5131d7513e 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -52,6 +52,7 @@ import
org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import
org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategy;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -63,6 +64,7 @@ import
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry
import
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import
org.apache.flink.runtime.scheduler.exceptionhistory.TestingAccessExecution;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.testutils.TestingUtils;
@@ -1021,5 +1023,12 @@ public class ExecutingTest extends TestLogger {
public ExecutionGraphID getExecutionGraphID() {
return new ExecutionGraphID();
}
+
+ @Override
+ public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+ IntermediateDataSetID intermediateResultPartition) {
+ throw new UnsupportedOperationException(
+ "This method is not supported by the
MockInternalExecutionGraphAccessor.");
+ }
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
index 32c81b6768b..4621680d360 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.scheduler.strategy;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.IterableUtils;
@@ -91,7 +92,8 @@ public class TestingSchedulingExecutionVertex implements
SchedulingExecutionVert
void addConsumedPartition(TestingSchedulingResultPartition
consumedPartition) {
final ConsumedPartitionGroup consumedPartitionGroup =
-
ConsumedPartitionGroup.fromSinglePartition(consumedPartition.getId());
+ ConsumedPartitionGroup.fromSinglePartition(
+ consumedPartition.getId(),
consumedPartition.getResultType());
consumedPartition.registerConsumedPartitionGroup(consumedPartitionGroup);
if (consumedPartition.getState() == ResultPartitionState.CONSUMABLE) {
@@ -143,6 +145,8 @@ public class TestingSchedulingExecutionVertex implements
SchedulingExecutionVert
Map<IntermediateResultPartitionID,
TestingSchedulingResultPartition>
resultPartitionsById) {
this.resultPartitionsById.putAll(resultPartitionsById);
+ final ResultPartitionType resultType =
+
resultPartitionsById.values().iterator().next().getResultType();
for (ConsumedPartitionGroup partitionGroup :
consumedPartitionGroups) {
List<IntermediateResultPartitionID> partitionIds =
@@ -151,7 +155,7 @@ public class TestingSchedulingExecutionVertex implements
SchedulingExecutionVert
partitionIds.add(partitionId);
}
this.consumedPartitionGroups.add(
-
ConsumedPartitionGroup.fromMultiplePartitions(partitionIds));
+
ConsumedPartitionGroup.fromMultiplePartitions(partitionIds, resultType));
}
return this;
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
index 82cf47fe01b..82463a1a2cb 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
@@ -352,7 +352,8 @@ public class TestingSchedulingTopology implements
SchedulingTopology {
ConsumedPartitionGroup.fromMultiplePartitions(
resultPartitions.stream()
.map(TestingSchedulingResultPartition::getId)
- .collect(Collectors.toList()));
+ .collect(Collectors.toList()),
+ resultPartitions.get(0).getResultType());
Map<IntermediateResultPartitionID,
TestingSchedulingResultPartition>
consumedPartitionById =
resultPartitions.stream()
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index b35de9f7f88..ea2fec1b5ff 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -78,6 +78,7 @@ import
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import
org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
@@ -138,6 +139,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
@@ -696,11 +698,24 @@ public class TaskExecutorTest extends TestLogger {
private static TaskExecutorPartitionTracker
createPartitionTrackerWithFixedPartitionReport(
ShuffleEnvironment<?, ?> shuffleEnvironment) {
+ ResultPartitionID resultPartitionID = new ResultPartitionID();
final ClusterPartitionReport.ClusterPartitionReportEntry
clusterPartitionReportEntry =
new ClusterPartitionReport.ClusterPartitionReportEntry(
new IntermediateDataSetID(),
- Collections.singleton(new ResultPartitionID()),
- 4);
+ 4,
+ Collections.singletonMap(
+ resultPartitionID,
+ new ShuffleDescriptor() {
+ @Override
+ public ResultPartitionID
getResultPartitionID() {
+ return resultPartitionID;
+ }
+
+ @Override
+ public Optional<ResourceID>
storesLocalResourcesOn() {
+ return Optional.empty();
+ }
+ }));
final ClusterPartitionReport clusterPartitionReport =
new
ClusterPartitionReport(Collections.singletonList(clusterPartitionReportEntry));