This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 06d045f77cfb67a08d7e397d3c06ef8189bbe68a Author: JunRuiLee <[email protected]> AuthorDate: Sun Sep 15 00:41:25 2024 +0800 [FLINK-36249][streaming-java] Align DataSet API with DataStream API to support job configuration passing. --- .../src/main/java/org/apache/flink/api/common/Plan.java | 11 +++++++++++ .../org/apache/flink/api/java/ExecutionEnvironment.java | 6 ++++-- .../java/org/apache/flink/api/java/utils/PlanGenerator.java | 8 +++++++- .../flink/optimizer/plantranslate/JobGraphGenerator.java | 2 ++ .../org/apache/flink/runtime/jobgraph/JobGraphBuilder.java | 12 ++++++++++++ .../flink/test/example/failing/TaskFailureITCase.java | 13 +++++++++++-- 6 files changed, 47 insertions(+), 5 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java index a5e3845e351..a2aa52b32fd 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry; import org.apache.flink.api.common.operators.GenericDataSinkBase; import org.apache.flink.api.common.operators.Operator; import org.apache.flink.api.dag.Pipeline; +import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Visitable; import org.apache.flink.util.Visitor; @@ -70,6 +71,8 @@ public class Plan implements Visitable<Operator<?>>, Pipeline { /** The ID of the Job that this dataflow plan belongs to. */ private JobID jobId; + private Configuration jobConfiguration = new Configuration(); + // ------------------------------------------------------------------------ /** @@ -355,6 +358,14 @@ public class Plan implements Visitable<Operator<?>>, Pipeline { return Math.max(visitor.maxDop, this.defaultParallelism); } + public void setJobConfiguration(Configuration jobConfiguration) { + this.jobConfiguration = jobConfiguration; + } + + public Configuration getJobConfiguration() { + return jobConfiguration; + } + // -------------------------------------------------------------------------------------------- private static final class MaxDopVisitor implements Visitor<Operator<?>> { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index e60ad9b5f27..94e0a50bd26 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -133,7 +133,7 @@ public class ExecutionEnvironment { private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new ArrayList<>(); - private final ExecutionConfig config = new ExecutionConfig(); + private final ExecutionConfig config; /** * Result from the latest execution, to make it retrievable when using eager execution methods. @@ -186,6 +186,7 @@ public class ExecutionEnvironment { final ClassLoader userClassloader) { this.executorServiceLoader = checkNotNull(executorServiceLoader); this.configuration = new Configuration(checkNotNull(configuration)); + this.config = new ExecutionConfig(this.configuration); this.userClassloader = userClassloader == null ? getClass().getClassLoader() : userClassloader; @@ -1183,7 +1184,8 @@ public class ExecutionEnvironment { } final PlanGenerator generator = - new PlanGenerator(sinks, config, getParallelism(), cacheFile, jobName); + new PlanGenerator( + sinks, config, getParallelism(), cacheFile, jobName, configuration); final Plan plan = generator.generate(); // clear all the sinks such that the next execution does not redo everything diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/PlanGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/PlanGenerator.java index 9d476739e24..97f9a07b20f 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/utils/PlanGenerator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/PlanGenerator.java @@ -28,6 +28,7 @@ import org.apache.flink.api.java.operators.DataSink; import org.apache.flink.api.java.operators.OperatorTranslation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; +import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Visitor; import org.slf4j.Logger; @@ -52,17 +53,21 @@ public class PlanGenerator { private final List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile; private final String jobName; + private final Configuration jobConfiguration; + public PlanGenerator( List<DataSink<?>> sinks, ExecutionConfig config, int defaultParallelism, List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile, - String jobName) { + String jobName, + Configuration jobConfiguration) { this.sinks = checkNotNull(sinks); this.config = checkNotNull(config); this.cacheFile = checkNotNull(cacheFile); this.jobName = checkNotNull(jobName); this.defaultParallelism = defaultParallelism; + this.jobConfiguration = checkNotNull(jobConfiguration); } public Plan generate() { @@ -87,6 +92,7 @@ public class PlanGenerator { plan.setDefaultParallelism(defaultParallelism); } plan.setExecutionConfig(config); + plan.setJobConfiguration(jobConfiguration); return plan; } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index 9f674c80478..9b3fb1c2bf5 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -211,6 +211,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> { this.sharingGroup = new SlotSharingGroup(); ExecutionConfig executionConfig = program.getOriginalPlan().getExecutionConfig(); + Configuration jobConfiguration = program.getOriginalPlan().getJobConfiguration(); // this starts the traversal that generates the job graph program.accept(this); @@ -263,6 +264,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> { .setJobId(jobId) .setJobName(program.getJobName()) .setExecutionConfig(executionConfig) + .setJobConfiguration(jobConfiguration) .addJobVertices(vertices.values()) .addJobVertices(auxVertices) .addUserArtifacts(userArtifacts) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraphBuilder.java index b34a5c4e8fc..eb0519c9f13 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraphBuilder.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobgraph; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.util.SerializedValue; @@ -57,6 +58,8 @@ public class JobGraphBuilder { @Nullable private SavepointRestoreSettings savepointRestoreSettings = null; + @Nullable private Configuration jobConfiguration = null; + private JobGraphBuilder(JobType jobType) { this.jobType = jobType; } @@ -109,6 +112,11 @@ public class JobGraphBuilder { return this; } + public JobGraphBuilder setJobConfiguration(Configuration jobConfiguration) { + this.jobConfiguration = jobConfiguration; + return this; + } + public JobGraph build() { final JobGraph jobGraph = new JobGraph(jobId, jobName, jobVertices.toArray(new JobVertex[0])); @@ -136,6 +144,10 @@ public class JobGraphBuilder { jobGraph.setClasspaths(classpaths); } + if (jobConfiguration != null) { + jobGraph.setJobConfiguration(jobConfiguration); + } + return jobGraph; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java index 1c7a591d39b..e40c911f80f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java @@ -21,13 +21,15 @@ package org.apache.flink.test.example.failing; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.test.util.JavaProgramTestBaseJUnit4; import org.junit.Assert; +import java.time.Duration; import java.util.List; import static org.apache.flink.test.util.TestBaseUtils.compareResultAsText; @@ -66,9 +68,16 @@ public class TaskFailureITCase extends JavaProgramTestBaseJUnit4 { } private void executeTask(MapFunction<Long, Long> mapper, int retries) throws Exception { + Configuration configuration = new Configuration(); + configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); + configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, retries); + configuration.set( + RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofMillis(0)); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(retries, 0)); + env.configure(configuration, Thread.currentThread().getContextClassLoader()); + List<Long> result = env.generateSequence(1, 9).map(mapper).collect(); compareResultAsText(result, "1\n2\n3\n4\n5\n6\n7\n8\n9"); }
