xintongsong commented on code in PR #22316:
URL: https://github.com/apache/flink/pull/22316#discussion_r1190584128
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -50,4 +70,11 @@ public static TieredStorageConfiguration
fromConfiguration(Configuration conf) {
// reserved storage size, etc.), then set them to the builder.
return new TieredStorageConfiguration.Builder().build();
}
+
+ public static TieredStorageConfiguration fromConfiguration(
Review Comment:
If we have 2 `fromConfiguration()`, we would need to make sure they are
consistent, which increase the maintenance complexity. Alternatively, we can
call `fromConfiguration(Configuration)` in
`NettyShuffleEnvironmentConfiguration#fromConfiguration`, and make
`TieredStorageConfiguration` a field of `NettyShuffleEnvironmentConfiguration`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java:
##########
@@ -213,6 +214,8 @@ public class SingleInputGate extends IndexedInputGate {
private final BufferDebloater bufferDebloater;
private boolean shouldDrainOnEndOfData = true;
+ @Nullable private final TieredStorageConsumerClient
tieredStorageConsumerClient;
Review Comment:
Should explain what does the null value mean and when it can or cannot be
null.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java:
##########
@@ -30,18 +31,37 @@ public class TieredStorageConfiguration {
// TODO, after implementing the tier factory, add appreciate
implementations to the array.
private static final TierFactory[] DEFAULT_MEMORY_DISK_TIER_FACTORIES =
new TierFactory[0];
+ private static final boolean ENABLED_TIERED_STORAGE = false;
+ private final boolean enabledTieredStorage;
Review Comment:
This doesn't make sense. If tiered storage is disabled, there shouldn't be
this configuration at all.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java:
##########
@@ -259,6 +264,12 @@ public SingleInputGate(
this.unpooledSegment =
MemorySegmentFactory.allocateUnpooledSegment(segmentSize);
this.bufferDebloater = bufferDebloater;
this.throughputCalculator = checkNotNull(throughputCalculator);
+
+ this.tieredStorageConsumerClient =
+ tieredStorageConfiguration.enabledTieredStorage()
+ ? new TieredStorageConsumerClient(
+ tieredStorageConfiguration.getTierFactories())
+ : null;
Review Comment:
Should create this in `SingleInputGateFactory`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]