gaoyunhaii commented on code in PR #20534:
URL: https://github.com/apache/flink/pull/20534#discussion_r964342154
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java:
##########
@@ -57,15 +59,32 @@ void stopTrackingAndReleasePartitions(
Collection<ResultPartitionID> resultPartitionIds, boolean
releaseOnShuffleMaster);
/**
- * Releases the job partitions and promotes the cluster partitions, and
stops the tracking of
- * partitions that were released/promoted.
+ * Promotes the given partitions, and stops the tracking of partitions
that were promoted.
+ *
+ * @param resultPartitionIds ID of the partition containing both job
partitions and cluster
+ * partitions.
+ * @return Future that will be completed if the partitions are promoted.
*/
- void stopTrackingAndReleaseOrPromotePartitions(
+ CompletableFuture<Void> stopTrackingAndPromotePartitions(
Collection<ResultPartitionID> resultPartitionIds);
/** Get all the partitions under tracking. */
Collection<ResultPartitionDeploymentDescriptor> getAllTrackedPartitions();
+ /** Get all the job partitions under tracking. */
Review Comment:
nit: Get -> `Gets`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java:
##########
@@ -114,11 +119,16 @@ public void stopTrackingAndReleasePartitions(
}
@Override
- public void stopTrackingAndReleaseOrPromotePartitions(
+ public CompletableFuture<Void> stopTrackingAndPromotePartitions(
Collection<ResultPartitionID> resultPartitionIds) {
+ List<CompletableFuture<Acknowledge>> promoteFutures = new
ArrayList<>();
stopTrackingAndHandlePartitions(
resultPartitionIds,
- (tmID, partitionDescs) ->
internalReleaseOrPromotePartitions(tmID, partitionDescs));
+ (tmID, partitionDescs) -> {
Review Comment:
nit: `{}` not needed
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java:
##########
@@ -57,15 +59,32 @@ void stopTrackingAndReleasePartitions(
Collection<ResultPartitionID> resultPartitionIds, boolean
releaseOnShuffleMaster);
/**
- * Releases the job partitions and promotes the cluster partitions, and
stops the tracking of
- * partitions that were released/promoted.
+ * Promotes the given partitions, and stops the tracking of partitions
that were promoted.
+ *
+ * @param resultPartitionIds ID of the partition containing both job
partitions and cluster
+ * partitions.
+ * @return Future that will be completed if the partitions are promoted.
*/
- void stopTrackingAndReleaseOrPromotePartitions(
+ CompletableFuture<Void> stopTrackingAndPromotePartitions(
Collection<ResultPartitionID> resultPartitionIds);
/** Get all the partitions under tracking. */
Collection<ResultPartitionDeploymentDescriptor> getAllTrackedPartitions();
+ /** Get all the job partitions under tracking. */
+ default Collection<ResultPartitionDeploymentDescriptor>
getAllTrackedJobPartitions() {
Review Comment:
I think currently we do not have a concept of `job partition`. Might change
it to `getAllTrackedNonClusterPartitions()` ?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java:
##########
@@ -57,15 +59,32 @@ void stopTrackingAndReleasePartitions(
Collection<ResultPartitionID> resultPartitionIds, boolean
releaseOnShuffleMaster);
/**
- * Releases the job partitions and promotes the cluster partitions, and
stops the tracking of
- * partitions that were released/promoted.
+ * Promotes the given partitions, and stops the tracking of partitions
that were promoted.
+ *
+ * @param resultPartitionIds ID of the partition containing both job
partitions and cluster
+ * partitions.
+ * @return Future that will be completed if the partitions are promoted.
*/
- void stopTrackingAndReleaseOrPromotePartitions(
+ CompletableFuture<Void> stopTrackingAndPromotePartitions(
Collection<ResultPartitionID> resultPartitionIds);
/** Get all the partitions under tracking. */
Collection<ResultPartitionDeploymentDescriptor> getAllTrackedPartitions();
+ /** Get all the job partitions under tracking. */
+ default Collection<ResultPartitionDeploymentDescriptor>
getAllTrackedJobPartitions() {
+ return getAllTrackedPartitions().stream()
+ .filter(descriptor ->
!descriptor.getPartitionType().isPersistent())
+ .collect(Collectors.toList());
+ }
+
+ /** Get all the cluster partitions under tracking. */
Review Comment:
nit: Get -> Gets
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]