codenohup commented on code in PR #25842:
URL: https://github.com/apache/flink/pull/25842#discussion_r1896325844
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java:
##########
@@ -48,17 +55,79 @@ public class TieredInternalShuffleMaster {
private final ShuffleMasterContext shuffleMasterContext;
- public TieredInternalShuffleMaster(ShuffleMasterContext
shuffleMasterContext) {
+ private final boolean useExternalTier;
+
+ public TieredInternalShuffleMaster(
+ ShuffleMasterContext shuffleMasterContext,
+ ShuffleDescriptorRetriever shuffleDescriptorRetriever) {
this.shuffleMasterContext = shuffleMasterContext;
Configuration conf = shuffleMasterContext.getConfiguration();
+ String externalTierFactoryClass =
+ conf.get(
+ NettyShuffleEnvironmentOptions
+
.NETWORK_HYBRID_SHUFFLE_EXTERNAL_REMOTE_TIER_FACTORY_CLASS_NAME);
+ this.useExternalTier = externalTierFactoryClass != null;
TieredStorageConfiguration tieredStorageConfiguration =
TieredStorageConfiguration.fromConfiguration(conf);
TieredStorageResourceRegistry resourceRegistry = new
TieredStorageResourceRegistry();
- List<TierMasterAgent> tierFactories =
+ List<Tuple2<String, TierMasterAgent>> tierFactories =
tieredStorageConfiguration.getTierFactories().stream()
- .map(tierFactory ->
tierFactory.createMasterAgent(resourceRegistry))
+ .map(
+ tierFactory ->
+ Tuple2.of(
+ tierFactory.identifier(),
+
tierFactory.createMasterAgent(resourceRegistry)))
.collect(Collectors.toList());
- this.tieredStorageMasterClient = new
TieredStorageMasterClient(tierFactories);
+ this.tieredStorageMasterClient =
+ new TieredStorageMasterClient(tierFactories,
shuffleDescriptorRetriever);
+ }
+
+ public boolean supportsBatchSnapshot() {
+ return useExternalTier;
+ }
+
+ public void snapshotState(
+ CompletableFuture<AllTieredShuffleMasterSnapshots> snapshotFuture,
+ ShuffleMasterSnapshotContext context,
+ JobID jobId) {
+ // only external tier supports snapshot for now.
+ if (useExternalTier) {
+ tieredStorageMasterClient.snapshotState(snapshotFuture, context,
jobId);
+ } else {
+ snapshotFuture.complete(AllTieredShuffleMasterSnapshots.empty());
Review Comment:
If `useExternalTier` is false, the `snapshotState` may not be invoked,
making the else condition unnecessary.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredPartitionWithMetrics.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
+
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMetrics;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Partition with shuffle metrics for tiered storage. */
+public class TieredPartitionWithMetrics {
+ private final TierShuffleDescriptor shuffleDescriptor;
+ private final ShuffleMetrics partitionMetrics;
+
+ public TieredPartitionWithMetrics(
+ TierShuffleDescriptor shuffleDescriptor, ShuffleMetrics
partitionMetrics) {
+ this.shuffleDescriptor = checkNotNull(shuffleDescriptor);
+ this.partitionMetrics = checkNotNull(partitionMetrics);
+ }
+
+ public ShuffleMetrics getPartitionMetrics() {
+ return partitionMetrics;
+ }
+
+ public TierShuffleDescriptor getPartition() {
Review Comment:
This is a weird name for a method, why _getPartition_ return
_TierShuffleDescriptor_
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java:
##########
@@ -65,12 +100,127 @@ public void releasePartition(ShuffleDescriptor
shuffleDescriptor) {
if (tierShuffleDescriptors != null &&
!tierShuffleDescriptors.isEmpty()) {
checkState(tierShuffleDescriptors.size() == tiers.size());
for (int i = 0; i < tierShuffleDescriptors.size(); i++) {
- tiers.get(i).releasePartition(tierShuffleDescriptors.get(i));
+
tiers.get(i).f1.releasePartition(tierShuffleDescriptors.get(i));
}
}
}
+ public void snapshotState(
+ CompletableFuture<AllTieredShuffleMasterSnapshots> snapshotFuture,
+ ShuffleMasterSnapshotContext context,
+ JobID jobId) {
+ snapshotStateInternal(
+ snapshotFuture, (agent, future) -> agent.snapshotState(future,
context, jobId));
+ }
+
+ public void
snapshotState(CompletableFuture<AllTieredShuffleMasterSnapshots>
snapshotFuture) {
+ snapshotStateInternal(snapshotFuture, TierMasterAgent::snapshotState);
+ }
+
+ private void snapshotStateInternal(
+ CompletableFuture<AllTieredShuffleMasterSnapshots> snapshotFuture,
+ BiConsumer<TierMasterAgent,
CompletableFuture<TieredShuffleMasterSnapshot>>
+ masterAgentConsumer) {
+ List<CompletableFuture<Tuple2<String, TieredShuffleMasterSnapshot>>>
futures =
+ new ArrayList<>(tiers.size());
+ for (Tuple2<String, TierMasterAgent> tier : tiers) {
+ CompletableFuture<TieredShuffleMasterSnapshot> future = new
CompletableFuture<>();
+ futures.add(future.thenApply(snap -> Tuple2.of(tier.f0, snap)));
+ masterAgentConsumer.accept(tier.f1, future);
+ }
+
+ FutureUtils.combineAll(futures)
+ .thenAccept(
+ snapshotWithIdentifiers ->
+ snapshotFuture.complete(
+ new AllTieredShuffleMasterSnapshots(
+ snapshotWithIdentifiers)));
+ }
+
+ public void restoreState(TieredInternalShuffleMasterSnapshot
clusterSnapshot) {
+ checkState(clusterSnapshot != null);
+ AllTieredShuffleMasterSnapshots allTierSnapshots =
clusterSnapshot.getAllTierSnapshots();
+ Collection<Tuple2<String, TieredShuffleMasterSnapshot>> snapshots =
+ allTierSnapshots.getSnapshots();
+ for (Tuple2<String, TieredShuffleMasterSnapshot> identifierWithSnap :
snapshots) {
+ String identifier = identifierWithSnap.f0;
+
tierMasterAgentMap.get(identifier).restoreState(identifierWithSnap.f1);
+ }
+ }
+
+ public void restoreState(List<TieredInternalShuffleMasterSnapshot>
snapshots, JobID jobId) {
+ for (TieredInternalShuffleMasterSnapshot internalSnapshot : snapshots)
{
+ checkState(internalSnapshot != null);
+ AllTieredShuffleMasterSnapshots allTierSnapshots =
+ internalSnapshot.getAllTierSnapshots();
+ Collection<Tuple2<String, TieredShuffleMasterSnapshot>>
tierSnapshots =
+ allTierSnapshots.getSnapshots();
+ for (Tuple2<String, TieredShuffleMasterSnapshot>
identifierWithSnap : tierSnapshots) {
+ String identifier = identifierWithSnap.f0;
+
tierMasterAgentMap.get(identifier).restoreState(identifierWithSnap.f1, jobId);
+ }
+ }
+ }
+
+ public CompletableFuture<Collection<PartitionWithMetrics>>
getPartitionWithMetrics(
+ JobShuffleContext jobShuffleContext,
+ Duration timeout,
+ Set<ResultPartitionID> expectedPartitions) {
+ JobID jobId = jobShuffleContext.getJobId();
+ if (!allPartitionInRemote) {
+ return jobShuffleContext.getPartitionWithMetrics(timeout,
expectedPartitions);
+ }
+
+ List<CompletableFuture<Map<ResultPartitionID,
TieredPartitionWithMetrics>>> futures =
+ new ArrayList<>(tiers.size());
+ for (Tuple2<String, TierMasterAgent> tier : tiers) {
+ CompletableFuture<Map<ResultPartitionID,
TieredPartitionWithMetrics>>
+ tierPartitionMapFuture =
+ tier.f1.getPartitionWithMetrics(jobId, timeout,
expectedPartitions);
+ futures.add(tierPartitionMapFuture);
+ }
+ return FutureUtils.combineAll(futures)
+ .thenApply(
+ allPartitions -> {
+ int partitionNums = allPartitions.size();
Review Comment:
```suggestion
int tierNumber = allPartitions.size();
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredPartitionWithMetrics.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
+
+import
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMetrics;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Partition with shuffle metrics for tiered storage. */
+public class TieredPartitionWithMetrics {
Review Comment:
The _TieredPartitionWithMetrics_ may be unnecessary, as the
_TierShuffleDescriptor_ field is not utilized.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java:
##########
@@ -48,17 +55,79 @@ public class TieredInternalShuffleMaster {
private final ShuffleMasterContext shuffleMasterContext;
- public TieredInternalShuffleMaster(ShuffleMasterContext
shuffleMasterContext) {
+ private final boolean useExternalTier;
+
+ public TieredInternalShuffleMaster(
+ ShuffleMasterContext shuffleMasterContext,
+ ShuffleDescriptorRetriever shuffleDescriptorRetriever) {
this.shuffleMasterContext = shuffleMasterContext;
Configuration conf = shuffleMasterContext.getConfiguration();
+ String externalTierFactoryClass =
+ conf.get(
+ NettyShuffleEnvironmentOptions
+
.NETWORK_HYBRID_SHUFFLE_EXTERNAL_REMOTE_TIER_FACTORY_CLASS_NAME);
+ this.useExternalTier = externalTierFactoryClass != null;
TieredStorageConfiguration tieredStorageConfiguration =
TieredStorageConfiguration.fromConfiguration(conf);
TieredStorageResourceRegistry resourceRegistry = new
TieredStorageResourceRegistry();
- List<TierMasterAgent> tierFactories =
+ List<Tuple2<String, TierMasterAgent>> tierFactories =
tieredStorageConfiguration.getTierFactories().stream()
- .map(tierFactory ->
tierFactory.createMasterAgent(resourceRegistry))
+ .map(
+ tierFactory ->
+ Tuple2.of(
+ tierFactory.identifier(),
+
tierFactory.createMasterAgent(resourceRegistry)))
.collect(Collectors.toList());
- this.tieredStorageMasterClient = new
TieredStorageMasterClient(tierFactories);
+ this.tieredStorageMasterClient =
+ new TieredStorageMasterClient(tierFactories,
shuffleDescriptorRetriever);
+ }
+
+ public boolean supportsBatchSnapshot() {
+ return useExternalTier;
+ }
+
+ public void snapshotState(
+ CompletableFuture<AllTieredShuffleMasterSnapshots> snapshotFuture,
+ ShuffleMasterSnapshotContext context,
+ JobID jobId) {
+ // only external tier supports snapshot for now.
+ if (useExternalTier) {
+ tieredStorageMasterClient.snapshotState(snapshotFuture, context,
jobId);
+ } else {
+ snapshotFuture.complete(AllTieredShuffleMasterSnapshots.empty());
+ }
+ }
+
+ public void
snapshotState(CompletableFuture<AllTieredShuffleMasterSnapshots>
snapshotFuture) {
+ if (useExternalTier) {
+ tieredStorageMasterClient.snapshotState(snapshotFuture);
+ } else {
+ snapshotFuture.complete(AllTieredShuffleMasterSnapshots.empty());
+ }
+ }
+
+ public void restoreState(List<TieredInternalShuffleMasterSnapshot>
snapshots, JobID jobId) {
+ if (useExternalTier) {
Review Comment:
If this function invoked, the `useExternalTier` must be true
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java:
##########
@@ -48,17 +55,79 @@ public class TieredInternalShuffleMaster {
private final ShuffleMasterContext shuffleMasterContext;
- public TieredInternalShuffleMaster(ShuffleMasterContext
shuffleMasterContext) {
+ private final boolean useExternalTier;
+
+ public TieredInternalShuffleMaster(
+ ShuffleMasterContext shuffleMasterContext,
+ ShuffleDescriptorRetriever shuffleDescriptorRetriever) {
this.shuffleMasterContext = shuffleMasterContext;
Configuration conf = shuffleMasterContext.getConfiguration();
+ String externalTierFactoryClass =
+ conf.get(
+ NettyShuffleEnvironmentOptions
+
.NETWORK_HYBRID_SHUFFLE_EXTERNAL_REMOTE_TIER_FACTORY_CLASS_NAME);
+ this.useExternalTier = externalTierFactoryClass != null;
TieredStorageConfiguration tieredStorageConfiguration =
TieredStorageConfiguration.fromConfiguration(conf);
TieredStorageResourceRegistry resourceRegistry = new
TieredStorageResourceRegistry();
- List<TierMasterAgent> tierFactories =
+ List<Tuple2<String, TierMasterAgent>> tierFactories =
tieredStorageConfiguration.getTierFactories().stream()
- .map(tierFactory ->
tierFactory.createMasterAgent(resourceRegistry))
+ .map(
+ tierFactory ->
+ Tuple2.of(
+ tierFactory.identifier(),
+
tierFactory.createMasterAgent(resourceRegistry)))
.collect(Collectors.toList());
- this.tieredStorageMasterClient = new
TieredStorageMasterClient(tierFactories);
+ this.tieredStorageMasterClient =
+ new TieredStorageMasterClient(tierFactories,
shuffleDescriptorRetriever);
+ }
+
+ public boolean supportsBatchSnapshot() {
+ return useExternalTier;
Review Comment:
maybe `useOnlyExternalTier`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]