This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 06d045f77cfb67a08d7e397d3c06ef8189bbe68a
Author: JunRuiLee <[email protected]>
AuthorDate: Sun Sep 15 00:41:25 2024 +0800

    [FLINK-36249][streaming-java] Align DataSet API with DataStream API to 
support job configuration passing.
---
 .../src/main/java/org/apache/flink/api/common/Plan.java     | 11 +++++++++++
 .../org/apache/flink/api/java/ExecutionEnvironment.java     |  6 ++++--
 .../java/org/apache/flink/api/java/utils/PlanGenerator.java |  8 +++++++-
 .../flink/optimizer/plantranslate/JobGraphGenerator.java    |  2 ++
 .../org/apache/flink/runtime/jobgraph/JobGraphBuilder.java  | 12 ++++++++++++
 .../flink/test/example/failing/TaskFailureITCase.java       | 13 +++++++++++--
 6 files changed, 47 insertions(+), 5 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java 
b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index a5e3845e351..a2aa52b32fd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.operators.GenericDataSinkBase;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Visitable;
 import org.apache.flink.util.Visitor;
 
@@ -70,6 +71,8 @@ public class Plan implements Visitable<Operator<?>>, Pipeline 
{
     /** The ID of the Job that this dataflow plan belongs to. */
     private JobID jobId;
 
+    private Configuration jobConfiguration = new Configuration();
+
     // ------------------------------------------------------------------------
 
     /**
@@ -355,6 +358,14 @@ public class Plan implements Visitable<Operator<?>>, 
Pipeline {
         return Math.max(visitor.maxDop, this.defaultParallelism);
     }
 
+    public void setJobConfiguration(Configuration jobConfiguration) {
+        this.jobConfiguration = jobConfiguration;
+    }
+
+    public Configuration getJobConfiguration() {
+        return jobConfiguration;
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     private static final class MaxDopVisitor implements Visitor<Operator<?>> {
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index e60ad9b5f27..94e0a50bd26 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -133,7 +133,7 @@ public class ExecutionEnvironment {
 
     private final List<Tuple2<String, DistributedCacheEntry>> cacheFile = new 
ArrayList<>();
 
-    private final ExecutionConfig config = new ExecutionConfig();
+    private final ExecutionConfig config;
 
     /**
      * Result from the latest execution, to make it retrievable when using 
eager execution methods.
@@ -186,6 +186,7 @@ public class ExecutionEnvironment {
             final ClassLoader userClassloader) {
         this.executorServiceLoader = checkNotNull(executorServiceLoader);
         this.configuration = new Configuration(checkNotNull(configuration));
+        this.config = new ExecutionConfig(this.configuration);
         this.userClassloader =
                 userClassloader == null ? getClass().getClassLoader() : 
userClassloader;
 
@@ -1183,7 +1184,8 @@ public class ExecutionEnvironment {
         }
 
         final PlanGenerator generator =
-                new PlanGenerator(sinks, config, getParallelism(), cacheFile, 
jobName);
+                new PlanGenerator(
+                        sinks, config, getParallelism(), cacheFile, jobName, 
configuration);
         final Plan plan = generator.generate();
 
         // clear all the sinks such that the next execution does not redo 
everything
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/utils/PlanGenerator.java 
b/flink-java/src/main/java/org/apache/flink/api/java/utils/PlanGenerator.java
index 9d476739e24..97f9a07b20f 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/utils/PlanGenerator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/utils/PlanGenerator.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.operators.DataSink;
 import org.apache.flink.api.java.operators.OperatorTranslation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Visitor;
 
 import org.slf4j.Logger;
@@ -52,17 +53,21 @@ public class PlanGenerator {
     private final List<Tuple2<String, DistributedCache.DistributedCacheEntry>> 
cacheFile;
     private final String jobName;
 
+    private final Configuration jobConfiguration;
+
     public PlanGenerator(
             List<DataSink<?>> sinks,
             ExecutionConfig config,
             int defaultParallelism,
             List<Tuple2<String, DistributedCache.DistributedCacheEntry>> 
cacheFile,
-            String jobName) {
+            String jobName,
+            Configuration jobConfiguration) {
         this.sinks = checkNotNull(sinks);
         this.config = checkNotNull(config);
         this.cacheFile = checkNotNull(cacheFile);
         this.jobName = checkNotNull(jobName);
         this.defaultParallelism = defaultParallelism;
+        this.jobConfiguration = checkNotNull(jobConfiguration);
     }
 
     public Plan generate() {
@@ -87,6 +92,7 @@ public class PlanGenerator {
             plan.setDefaultParallelism(defaultParallelism);
         }
         plan.setExecutionConfig(config);
+        plan.setJobConfiguration(jobConfiguration);
         return plan;
     }
 
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 9f674c80478..9b3fb1c2bf5 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -211,6 +211,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
         this.sharingGroup = new SlotSharingGroup();
 
         ExecutionConfig executionConfig = 
program.getOriginalPlan().getExecutionConfig();
+        Configuration jobConfiguration = 
program.getOriginalPlan().getJobConfiguration();
 
         // this starts the traversal that generates the job graph
         program.accept(this);
@@ -263,6 +264,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
                             .setJobId(jobId)
                             .setJobName(program.getJobName())
                             .setExecutionConfig(executionConfig)
+                            .setJobConfiguration(jobConfiguration)
                             .addJobVertices(vertices.values())
                             .addJobVertices(auxVertices)
                             .addUserArtifacts(userArtifacts)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraphBuilder.java
index b34a5c4e8fc..eb0519c9f13 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraphBuilder.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobgraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.util.SerializedValue;
 
@@ -57,6 +58,8 @@ public class JobGraphBuilder {
 
     @Nullable private SavepointRestoreSettings savepointRestoreSettings = null;
 
+    @Nullable private Configuration jobConfiguration = null;
+
     private JobGraphBuilder(JobType jobType) {
         this.jobType = jobType;
     }
@@ -109,6 +112,11 @@ public class JobGraphBuilder {
         return this;
     }
 
+    public JobGraphBuilder setJobConfiguration(Configuration jobConfiguration) 
{
+        this.jobConfiguration = jobConfiguration;
+        return this;
+    }
+
     public JobGraph build() {
         final JobGraph jobGraph =
                 new JobGraph(jobId, jobName, jobVertices.toArray(new 
JobVertex[0]));
@@ -136,6 +144,10 @@ public class JobGraphBuilder {
             jobGraph.setClasspaths(classpaths);
         }
 
+        if (jobConfiguration != null) {
+            jobGraph.setJobConfiguration(jobConfiguration);
+        }
+
         return jobGraph;
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java
index 1c7a591d39b..e40c911f80f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/example/failing/TaskFailureITCase.java
@@ -21,13 +21,15 @@ package org.apache.flink.test.example.failing;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.test.util.JavaProgramTestBaseJUnit4;
 
 import org.junit.Assert;
 
+import java.time.Duration;
 import java.util.List;
 
 import static org.apache.flink.test.util.TestBaseUtils.compareResultAsText;
@@ -66,9 +68,16 @@ public class TaskFailureITCase extends 
JavaProgramTestBaseJUnit4 {
     }
 
     private void executeTask(MapFunction<Long, Long> mapper, int retries) 
throws Exception {
+        Configuration configuration = new Configuration();
+        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, 
"fixed-delay");
+        
configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 
retries);
+        configuration.set(
+                RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, 
Duration.ofMillis(0));
+
         ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
-        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(retries, 
0));
+        env.configure(configuration, 
Thread.currentThread().getContextClassLoader());
+
         List<Long> result = env.generateSequence(1, 9).map(mapper).collect();
         compareResultAsText(result, "1\n2\n3\n4\n5\n6\n7\n8\n9");
     }

Reply via email to