reswqa commented on code in PR #24900:
URL: https://github.com/apache/flink/pull/24900#discussion_r1634238153
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java:
##########
@@ -148,12 +162,52 @@ public void close() throws IOException {
//
--------------------------------------------------------------------------------------------
private List<TierConsumerAgent> createTierConsumerAgents(
- List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs) {
+ List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
+ List<List<TierShuffleDescriptor>> shuffleDescriptors) {
ArrayList<TierConsumerAgent> tierConsumerAgents = new ArrayList<>();
+
+ List<List<TierShuffleDescriptor>> transformedTierShuffleDescriptors =
+ transformTierShuffleDescriptors(shuffleDescriptors);
+ // Each tier only requires one inner list of
transformedTierShuffleDescriptors, so the size
+ // of transformedTierShuffleDescriptors and the size of tierFactories
are the same.
+ checkState(transformedTierShuffleDescriptors.size() ==
tierFactories.size());
+ int index = 0;
Review Comment:
Why not use `for-i` loop if we do need the iteration index?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java:
##########
@@ -148,12 +162,52 @@ public void close() throws IOException {
//
--------------------------------------------------------------------------------------------
private List<TierConsumerAgent> createTierConsumerAgents(
- List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs) {
+ List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
+ List<List<TierShuffleDescriptor>> shuffleDescriptors) {
ArrayList<TierConsumerAgent> tierConsumerAgents = new ArrayList<>();
+
+ List<List<TierShuffleDescriptor>> transformedTierShuffleDescriptors =
+ transformTierShuffleDescriptors(shuffleDescriptors);
+ // Each tier only requires one inner list of
transformedTierShuffleDescriptors, so the size
+ // of transformedTierShuffleDescriptors and the size of tierFactories
are the same.
+ checkState(transformedTierShuffleDescriptors.size() ==
tierFactories.size());
+ int index = 0;
for (TierFactory tierFactory : tierFactories) {
tierConsumerAgents.add(
-
tierFactory.createConsumerAgent(tieredStorageConsumerSpecs, nettyService));
+ tierFactory.createConsumerAgent(
+ tieredStorageConsumerSpecs,
+ transformedTierShuffleDescriptors.get(index++),
+ nettyService));
}
return tierConsumerAgents;
}
+
+ /**
+ * Before transforming the shuffle descriptors, the number of tier shuffle
descriptors is
+ * numPartitions * numTiers (That means shuffleDescriptors.size() is
numPartitions, while the
+ * shuffleDescriptors.get(0).size() is numTiers). After transforming, the
number of tier shuffle
+ * descriptors is numTiers * numPartitions (That means
transformedList.size() is numTiers, while
+ * transformedList.get(0).size() is numPartitions).
+ */
+ private static List<List<TierShuffleDescriptor>>
transformTierShuffleDescriptors(
+ List<List<TierShuffleDescriptor>> shuffleDescriptors) {
+ int numTiers = 0;
+ int numDescriptors = shuffleDescriptors.size();
Review Comment:
Can be `numPartitions` according to the java doc.
##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -490,6 +490,24 @@ public enum CompressionCodec {
+ " is configured. The new mode is
currently in an experimental phase. It can be set to false to fallback to the
legacy mode "
+ " if something unexpected. Once the new
mode reaches a stable state, the legacy mode as well as the option will be
removed.");
+ /** The option to configure the tiered factory creator remote class name
for hybrid shuffle. */
+ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+ @Experimental
+ public static final ConfigOption<String>
+ NETWORK_HYBRID_SHUFFLE_EXTERNAL_REMOTE_TIER_FACTORY_CLASS_NAME =
+
key("taskmanager.network.hybrid-shuffle.external-remote-tier-factory.class")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The option configures the class that is
responsible for creating an "
+ + "external remote tier factory
for hybrid shuffle. Note that "
+ + "only Celeborn can be accepted
as the remote shuffle tier "
Review Comment:
only Celeborn -> only Apache Celeborn
How do we make sure that only Apache Celeborn is the valid option? I didn't
notice if we have the corresponding validation.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java:
##########
@@ -80,8 +83,14 @@ public NettyShuffleDescriptor buildRemote() {
}
public NettyShuffleDescriptor buildLocal() {
+ List<TierShuffleDescriptor> tierShuffleDescriptors = new ArrayList<>();
+ tierShuffleDescriptors.add(NoOpTierShuffleDescriptor.INSTANCE);
+ tierShuffleDescriptors.add(NoOpTierShuffleDescriptor.INSTANCE);
Review Comment:
Why we need this redundancy?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]