This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 81828fd2f0b [Dataflow] Added Portable Runner alias to java runners
(#38411)
81828fd2f0b is described below
commit 81828fd2f0b7aff2d6f6da9f5564394803531cd6
Author: TongruiLi <[email protected]>
AuthorDate: Fri May 8 16:05:44 2026 -0700
[Dataflow] Added Portable Runner alias to java runners (#38411)
* Added portable runner options to java runner
* spotless
* Added more tolerance to flaky test
* Removed unused experiments
* Added experiments back in
---
.../beam/runners/dataflow/DataflowRunner.java | 11 ++++++----
.../beam/runners/dataflow/DataflowRunnerTest.java | 24 ++++++++++++++++++----
.../UnboundedScheduledExecutorServiceTest.java | 2 +-
3 files changed, 28 insertions(+), 9 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index ecc231ab825..299e7fa21ed 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1244,8 +1244,8 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
// Multi-language pipelines and pipelines that include upgrades should
automatically be upgraded
// to Runner v2.
if (DataflowRunner.isMultiLanguagePipeline(pipeline) ||
includesTransformUpgrades(pipeline)) {
- List<String> experiments = firstNonNull(options.getExperiments(),
Collections.emptyList());
- if (!experiments.contains("use_runner_v2")) {
+ if (!useUnifiedWorker(options)) {
+ List<String> experiments = firstNonNull(options.getExperiments(),
Collections.emptyList());
LOG.info(
"Automatically enabling Dataflow Runner v2 since the pipeline used
cross-language"
+ " transforms or pipeline needed a transform upgrade.");
@@ -1256,7 +1256,9 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
if (useUnifiedWorker(options)) {
if (hasExperiment(options, "disable_runner_v2")
|| hasExperiment(options, "disable_runner_v2_until_2023")
- || hasExperiment(options, "disable_prime_runner_v2")) {
+ || hasExperiment(options, "disable_prime_runner_v2")
+ || hasExperiment(options, "disable_portable_runner")
+ || hasExperiment(options, "enable_streaming_java_runner")) {
throw new IllegalArgumentException(
"Runner V2 both disabled and enabled: at least one of
['beam_fn_api', 'use_unified_worker', 'use_runner_v2',
'use_portable_job_submission'] is set and also one of ['disable_runner_v2',
'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set.");
}
@@ -2729,7 +2731,8 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
return hasExperiment(options, "beam_fn_api")
|| hasExperiment(options, "use_runner_v2")
|| hasExperiment(options, "use_unified_worker")
- || hasExperiment(options, "use_portable_job_submission");
+ || hasExperiment(options, "use_portable_job_submission")
+ || hasExperiment(options, "enable_portable_runner");
}
static void verifyDoFnSupported(
diff --git
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 8c33123be6d..ab3b62a0aa1 100644
---
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -1783,7 +1783,11 @@ public class DataflowRunnerTest implements Serializable {
public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws
Exception {
for (String experiment :
ImmutableList.of(
- "beam_fn_api", "use_runner_v2", "use_unified_worker",
"use_portable_job_submission")) {
+ "beam_fn_api",
+ "use_runner_v2",
+ "use_unified_worker",
+ "use_portable_job_submission",
+ "enable_portable_runner")) {
DataflowPipelineOptions options = buildPipelineOptions();
ExperimentalOptions.addExperiment(options, experiment);
Pipeline p = Pipeline.create(options);
@@ -1798,7 +1802,11 @@ public class DataflowRunnerTest implements Serializable {
for (String experiment :
ImmutableList.of(
- "beam_fn_api", "use_runner_v2", "use_unified_worker",
"use_portable_job_submission")) {
+ "beam_fn_api",
+ "use_runner_v2",
+ "use_unified_worker",
+ "use_portable_job_submission",
+ "enable_portable_runner")) {
DataflowPipelineOptions options = buildPipelineOptions();
options.setStreaming(true);
ExperimentalOptions.addExperiment(options, experiment);
@@ -1822,10 +1830,18 @@ public class DataflowRunnerTest implements Serializable
{
public void
testSettingConflictingEnableAndDisableExperimentsThrowsException() throws
Exception {
for (String experiment :
ImmutableList.of(
- "beam_fn_api", "use_runner_v2", "use_unified_worker",
"use_portable_job_submission")) {
+ "beam_fn_api",
+ "use_runner_v2",
+ "use_unified_worker",
+ "use_portable_job_submission",
+ "enable_portable_runner")) {
for (String disabledExperiment :
ImmutableList.of(
- "disable_runner_v2", "disable_runner_v2_until_2023",
"disable_prime_runner_v2")) {
+ "disable_runner_v2",
+ "disable_runner_v2_until_2023",
+ "disable_prime_runner_v2",
+ "enable_streaming_java_runner",
+ "disable_portable_runner")) {
DataflowPipelineOptions options = buildPipelineOptions();
ExperimentalOptions.addExperiment(options, experiment);
ExperimentalOptions.addExperiment(options, disabledExperiment);
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
index efbb0351978..bba10c84fab 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/UnboundedScheduledExecutorServiceTest.java
@@ -625,7 +625,7 @@ public class UnboundedScheduledExecutorServiceTest {
LOG.info("Created {} threads to execute at most 100 parallel tasks",
largestPool);
// Ideally we would never create more than 100, however with contention it
is still possible
// some extra threads will be created.
- assertTrue(largestPool <= 110);
+ assertTrue(largestPool <= 120);
executorService.shutdown();
}
}