1996fanrui commented on code in PR #24025:
URL: https://github.com/apache/flink/pull/24025#discussion_r1442527329
##########
flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java:
##########
@@ -228,6 +229,21 @@ public static List<Tuple2<String, DistributedCacheEntry>>
parseCachedFilesFromSt
.collect(Collectors.toList());
}
+ @Internal
+ public static List<String> parseStringFromCachedFiles(
Review Comment:
Would you mind adding a test to check `parseStringFromCachedFiles` and
`parseCachedFilesFromString` can be converted each other?
##########
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##########
@@ -1201,6 +1212,9 @@ public void configure(ReadableConfig configuration,
ClassLoader classLoader) {
.getOptional(ExecutionOptions.SNAPSHOT_COMPRESSION)
.ifPresent(this::setUseSnapshotCompression);
RestartStrategies.fromConfiguration(configuration).ifPresent(this::setRestartStrategy);
+ configuration
+ .getOptional(RestartStrategyOptions.RESTART_STRATEGY)
+ .ifPresent(s -> this.setRestartStrategy(configuration));
Review Comment:
nit: Why don't `this.setRestartStrategy(
new RestartStrategies
.FallbackRestartStrategyConfiguration())` here?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java:
##########
@@ -614,7 +563,9 @@ private Collection<Integer>
legacyTransform(Transformation<?> transform) {
if (transform.getBufferTimeout() >= 0) {
streamGraph.setBufferTimeout(transform.getId(),
transform.getBufferTimeout());
} else {
- streamGraph.setBufferTimeout(transform.getId(),
defaultBufferTimeout);
+ streamGraph.setBufferTimeout(
+ transform.getId(),
+
configuration.get(ExecutionOptions.BUFFER_TIMEOUT).toMillis());
Review Comment:
Should we consider `BUFFER_TIMEOUT_ENABLED` here?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java:
##########
@@ -385,7 +335,9 @@ private void configureStreamGraph(final StreamGraph graph) {
if (shouldExecuteInBatchMode) {
configureStreamGraphBatch(graph);
- setDefaultBufferTimeout(-1);
+ configuration.set(
+ ExecutionOptions.BUFFER_TIMEOUT,
+
Duration.ofMillis(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT));
Review Comment:
If we consider `BUFFER_TIMEOUT_ENABLED` when getBufferTimeout, it's better
to set `BUFFER_TIMEOUT_ENABLED` to false here.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java:
##########
@@ -453,7 +440,10 @@ public StreamExecutionEnvironment setBufferTimeout(long
timeoutMillis) {
throw new IllegalArgumentException("Timeout of buffer must be
non-negative or -1");
}
- this.bufferTimeout = timeoutMillis;
+ if (timeoutMillis == ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT)
{
+ this.configuration.set(ExecutionOptions.BUFFER_TIMEOUT_ENABLED,
false);
+ }
+ this.configuration.set(ExecutionOptions.BUFFER_TIMEOUT,
Duration.ofMillis(timeoutMillis));
return this;
Review Comment:
I'm curious could we deprecate `setBufferTimeout` and `getBufferTimeout` and
remove them in 2.0?
I didn't notice any benefit for them, and `DISABLED_NETWORK_BUFFER_TIMEOUT`
is more clear for users than -1. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]