This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 81828fd2f0b [Dataflow] Added Portable Runner alias to java runners 
(#38411)
81828fd2f0b is described below

commit 81828fd2f0b7aff2d6f6da9f5564394803531cd6
Author: TongruiLi <[email protected]>
AuthorDate: Fri May 8 16:05:44 2026 -0700

    [Dataflow] Added Portable Runner alias to java runners (#38411)
    
    * Added portable runner options to java runner
    
    * spotless
    
    * Added more tolerance to flaky test
    
    * Removed unused experiments
    
    * Added experiments back in
---
 .../beam/runners/dataflow/DataflowRunner.java      | 11 ++++++----
 .../beam/runners/dataflow/DataflowRunnerTest.java  | 24 ++++++++++++++++++----
 .../UnboundedScheduledExecutorServiceTest.java     |  2 +-
 3 files changed, 28 insertions(+), 9 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index ecc231ab825..299e7fa21ed 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1244,8 +1244,8 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     // Multi-language pipelines and pipelines that include upgrades should 
automatically be upgraded
     // to Runner v2.
     if (DataflowRunner.isMultiLanguagePipeline(pipeline) || 
includesTransformUpgrades(pipeline)) {
-      List<String> experiments = firstNonNull(options.getExperiments(), 
Collections.emptyList());
-      if (!experiments.contains("use_runner_v2")) {
+      if (!useUnifiedWorker(options)) {
+        List<String> experiments = firstNonNull(options.getExperiments(), 
Collections.emptyList());
         LOG.info(
             "Automatically enabling Dataflow Runner v2 since the pipeline used 
cross-language"
                 + " transforms or pipeline needed a transform upgrade.");
@@ -1256,7 +1256,9 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     if (useUnifiedWorker(options)) {
       if (hasExperiment(options, "disable_runner_v2")
           || hasExperiment(options, "disable_runner_v2_until_2023")
-          || hasExperiment(options, "disable_prime_runner_v2")) {
+          || hasExperiment(options, "disable_prime_runner_v2")
+          || hasExperiment(options, "disable_portable_runner")
+          || hasExperiment(options, "enable_streaming_java_runner")) {
         throw new IllegalArgumentException(
             "Runner V2 both disabled and enabled: at least one of 
['beam_fn_api', 'use_unified_worker', 'use_runner_v2', 
'use_portable_job_submission'] is set and also one of ['disable_runner_v2', 
'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set.");
       }
@@ -2729,7 +2731,8 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     return hasExperiment(options, "beam_fn_api")
         || hasExperiment(options, "use_runner_v2")
         || hasExperiment(options, "use_unified_worker")
-        || hasExperiment(options, "use_portable_job_submission");
+        || hasExperiment(options, "use_portable_job_submission")
+        || hasExperiment(options, "enable_portable_runner");
   }
 
   static void verifyDoFnSupported(
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 8c33123be6d..ab3b62a0aa1 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1783,7 +1783,11 @@ public class DataflowRunnerTest implements Serializable {
   public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws 
Exception {
     for (String experiment :
         ImmutableList.of(
-            "beam_fn_api", "use_runner_v2", "use_unified_worker", 
"use_portable_job_submission")) {
+            "beam_fn_api",
+            "use_runner_v2",
+            "use_unified_worker",
+            "use_portable_job_submission",
+            "enable_portable_runner")) {
       DataflowPipelineOptions options = buildPipelineOptions();
       ExperimentalOptions.addExperiment(options, experiment);
       Pipeline p = Pipeline.create(options);
@@ -1798,7 +1802,11 @@ public class DataflowRunnerTest implements Serializable {
 
     for (String experiment :
         ImmutableList.of(
-            "beam_fn_api", "use_runner_v2", "use_unified_worker", 
"use_portable_job_submission")) {
+            "beam_fn_api",
+            "use_runner_v2",
+            "use_unified_worker",
+            "use_portable_job_submission",
+            "enable_portable_runner")) {
       DataflowPipelineOptions options = buildPipelineOptions();
       options.setStreaming(true);
       ExperimentalOptions.addExperiment(options, experiment);
@@ -1822,10 +1830,18 @@ public class DataflowRunnerTest implements Serializable 
{
   public void 
testSettingConflictingEnableAndDisableExperimentsThrowsException() throws 
Exception {
     for (String experiment :
         ImmutableList.of(
-            "beam_fn_api", "use_runner_v2", "use_unified_worker", 
"use_portable_job_submission")) {
+            "beam_fn_api",
+            "use_runner_v2",
+            "use_unified_worker",
+            "use_portable_job_submission",
+            "enable_portable_runner")) {
       for (String disabledExperiment :
           ImmutableList.of(
-              "disable_runner_v2", "disable_runner_v2_until_2023", 
"disable_prime_runner_v2")) {
+              "disable_runner_v2",
+              "disable_runner_v2_until_2023",
+              "disable_prime_runner_v2",
+              "enable_streaming_java_runner",
+              "disable_portable_runner")) {
         DataflowPipelineOptions options = buildPipelineOptions();
         ExperimentalOptions.addExperiment(options, experiment);
         ExperimentalOptions.addExperiment(options, disabledExperiment);
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
index efbb0351978..bba10c84fab 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
@@ -625,7 +625,7 @@ public class UnboundedScheduledExecutorServiceTest {
     LOG.info("Created {} threads to execute at most 100 parallel tasks", 
largestPool);
     // Ideally we would never create more than 100, however with contention it 
is still possible
     // some extra threads will be created.
-    assertTrue(largestPool <= 110);
+    assertTrue(largestPool <= 120);
     executorService.shutdown();
   }
 }

Reply via email to