This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d55cf588e58 Flink 2 support prerequisites (#37133)
d55cf588e58 is described below

commit d55cf588e586e0b04716c9dbca52874c54899a3c
Author: Yi Hu <[email protected]>
AuthorDate: Tue Jan 6 20:41:40 2026 -0500

    Flink 2 support prerequisites (#37133)
    
    * Honor getUseDataStreamForBatch pipeline option for Flink portable runner
    
    * Refactor gradle scripts in preparation for Flink 2 support
    
    * Create a PostCommit run validate runner tests on legacy DataSet
---
 .github/workflows/README.md                        |   1 +
 .../beam_PostCommit_Java_PVR_Flink_Batch.yml       | 106 +++++++++++++++++
 runners/flink/flink_runner.gradle                  | 125 ++++++++++++++-------
 .../flink_job_server_container.gradle              |  10 +-
 runners/flink/job-server/flink_job_server.gradle   |  31 +++--
 .../beam/runners/flink/FlinkPipelineRunner.java    |   4 +-
 .../wrappers/streaming/DoFnOperator.java           |   0
 .../flink/streaming/MemoryStateBackendWrapper.java |   0
 .../runners/flink/streaming/StreamSources.java     |   0
 9 files changed, 222 insertions(+), 55 deletions(-)

diff --git a/.github/workflows/README.md b/.github/workflows/README.md
index f01d2a1257b..283be9c2b1f 100644
--- a/.github/workflows/README.md
+++ b/.github/workflows/README.md
@@ -344,6 +344,7 @@ PostCommit Jobs run in a schedule against master branch and 
generally do not get
 | [ PostCommit Java Nexmark Direct 
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Direct.yml)
 | N/A |`beam_PostCommit_Java_Nexmark_Direct.json`| 
[![.github/workflows/beam_PostCommit_Java_Nexmark_Direct.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Direct.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Direct.yml?query=event%3Aschedule)
 |
 | [ PostCommit Java Nexmark Flink 
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Flink.yml)
 | N/A |`beam_PostCommit_Java_Nexmark_Flink.json`| 
[![.github/workflows/beam_PostCommit_Java_Nexmark_Flink.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Flink.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Flink.yml?query=event%3Aschedule)
 |
 | [ PostCommit Java Nexmark Spark 
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Spark.yml)
 | N/A |`beam_PostCommit_Java_Nexmark_Spark.json`| 
[![.github/workflows/beam_PostCommit_Java_Nexmark_Spark.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Spark.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_Nexmark_Spark.yml?query=event%3Aschedule)
 |
+| [ PostCommit Java PVR Flink Batch 
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml)
 | N/A |`beam_PostCommit_Java_PVR_Flink_Batch.json`| 
[![.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml?query=event%3Aschedule)
 |
 | [ PostCommit Java PVR Flink Streaming 
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml)
 | N/A |`beam_PostCommit_Java_PVR_Flink_Streaming.json`| 
[![.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml?query=event%3Asch
 [...]
 | [ PostCommit Java PVR Samza 
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Samza.yml)
 | N/A |`beam_PostCommit_Java_PVR_Samza.json`| 
[![.github/workflows/beam_PostCommit_Java_PVR_Samza.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Samza.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_PVR_Samza.yml?query=event%3Aschedule)
 |
 | [ PostCommit Java SingleStoreIO IT 
](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml)
 | N/A |`beam_PostCommit_Java_SingleStoreIO_IT.json`| 
[![.github/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml/badge.svg?event=schedule)](https://github.com/apache/beam/actions/workflows/beam_PostCommit_Java_SingleStoreIO_IT.yml?query=event%3Aschedule)
 |
diff --git a/.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml 
b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml
new file mode 100644
index 00000000000..0a808f2f861
--- /dev/null
+++ b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: PostCommit Java PVR Flink Batch
+
+on:
+  push:
+    tags: ['v*']
+    branches: ['master', 'release-*']
+    paths:
+      - 'runners/flink/**'
+      - 'runners/java-fn-execution/**'
+      - 'sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/**'
+      - '.github/workflows/beam_PostCommit_Java_PVR_Flink_Batch.yml'
+  pull_request_target:
+    branches: ['master', 'release-*']
+    paths:
+      - 'release/trigger_all_tests.json'
+      - '.github/trigger_files/beam_PostCommit_Java_PVR_Flink_Batch.json'
+  schedule:
+    - cron: '15 2/6 * * *'
+  workflow_dispatch:
+
+# This allows a subsequently queued workflow run to interrupt previous runs
+concurrency:
+  group: '${{ github.workflow }} @ ${{ github.event.issue.number || 
github.event.pull_request.head.label || github.sha || github.head_ref || 
github.ref }}-${{ github.event.schedule || github.event.comment.id || 
github.event.sender.login }}'
+  cancel-in-progress: true
+
+#Setting explicit permissions for the action to avoid the default permissions 
which are `write-all` in case of pull_request_target event
+permissions:
+  actions: write
+  pull-requests: write
+  checks: write
+  contents: read
+  deployments: read
+  id-token: none
+  issues: write
+  discussions: read
+  packages: read
+  pages: read
+  repository-projects: read
+  security-events: read
+  statuses: read
+
+env:
+  DEVELOCITY_ACCESS_KEY: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
+  GRADLE_ENTERPRISE_CACHE_USERNAME: ${{ secrets.GE_CACHE_USERNAME }}
+  GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }}
+
+jobs:
+  beam_PostCommit_Java_PVR_Flink_Batch:
+    name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
+    strategy:
+      matrix:
+        job_name: ["beam_PostCommit_Java_PVR_Flink_Batch"]
+        job_phrase: ["Run Java_PVR_Flink_Batch PostCommit"]
+    timeout-minutes: 240
+    runs-on: [self-hosted, ubuntu-20.04, highmem]
+    if: |
+      github.event_name == 'push' ||
+      github.event_name == 'pull_request_target' ||
+      (github.event_name == 'schedule' && github.repository == 'apache/beam') 
||
+      github.event_name == 'workflow_dispatch' ||
+      github.event.comment.body == 'Run Java_PVR_Flink_Batch PostCommit'
+    steps:
+      - uses: actions/checkout@v4
+      - name: Setup repository
+        uses: ./.github/actions/setup-action
+        with:
+          comment_phrase: ${{ matrix.job_phrase }}
+          github_token: ${{ secrets.GITHUB_TOKEN }}
+          github_job: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
+      - name: Setup environment
+        uses: ./.github/actions/setup-environment-action
+      - name: run validatesPortableRunnerBatch script
+        uses: ./.github/actions/gradle-command-self-hosted-action
+        with:
+          gradle-command: 
:runners:flink:1.20:job-server:validatesPortableRunnerBatchDataSet
+        env:
+          CLOUDSDK_CONFIG: ${{ env.KUBELET_GCLOUD_CONFIG_PATH }}
+      - name: Archive JUnit Test Results
+        uses: actions/upload-artifact@v4
+        if: ${{ !success() }}
+        with:
+          name: JUnit Test Results
+          path: "**/build/reports/tests/"
+      - name: Upload test report
+        uses: actions/upload-artifact@v4
+        with:
+          name: java-code-coverage-report
+          path: "**/build/test-results/**/*.xml"
+# TODO: Investigate 'Max retries exceeded' issue with 
EnricoMi/publish-unit-test-result-action@v2.
diff --git a/runners/flink/flink_runner.gradle 
b/runners/flink/flink_runner.gradle
index 52f9631f455..af90c22cfb0 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -28,7 +28,8 @@ import groovy.json.JsonOutput
 def base_path = ".."
 
 def overrides(versions, type, base_path) {
-  versions.collect { "${base_path}/${it}/src/${type}/java" } + 
["./src/${type}/java"]
+  // order is important
+  ["${base_path}/src/${type}/java"] + versions.collect { 
"${base_path}/${it}/src/${type}/java" } + ["./src/${type}/java"]
 }
 
 def all_versions = flink_versions.split(",")
@@ -49,7 +50,8 @@ applyJavaNature(
     automaticModuleName: 'org.apache.beam.runners.flink',
     archivesBaseName: archivesBaseName,
     // flink runner jars are in same package name. Publish javadoc once.
-    exportJavadoc: project.ext.flink_version.startsWith(all_versions.first())
+    exportJavadoc: project.ext.flink_version.startsWith(all_versions.first()),
+    requireJavaVersion: project.ext.flink_major.startsWith('2') ? 
JavaVersion.VERSION_11 : null
 )
 
 description = "Apache Beam :: Runners :: Flink $flink_version"
@@ -68,10 +70,16 @@ evaluationDependsOn(":examples:java")
  */
 def sourceOverridesBase = 
project.layout.buildDirectory.dir('source-overrides/src').get()
 
-def copySourceOverrides = tasks.register('copySourceOverrides', Copy) {
-  it.from main_source_overrides
-  it.into "${sourceOverridesBase}/main/java"
-  it.duplicatesStrategy DuplicatesStrategy.INCLUDE
+def copySourceOverrides = tasks.register('copySourceOverrides', Copy) { 
copyTask ->
+  copyTask.from main_source_overrides
+  copyTask.into "${sourceOverridesBase}/main/java"
+  copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE
+
+  if (project.ext.has('excluded_files') && 
project.ext.excluded_files.containsKey('main')) {
+    project.ext.excluded_files.main.each { file ->
+      copyTask.exclude "**/${file}"
+    }
+  }
 }
 
 def copyResourcesOverrides = tasks.register('copyResourcesOverrides', Copy) {
@@ -80,10 +88,16 @@ def copyResourcesOverrides = 
tasks.register('copyResourcesOverrides', Copy) {
   it.duplicatesStrategy DuplicatesStrategy.INCLUDE
 }
 
-def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) {
-  it.from test_source_overrides
-  it.into "${sourceOverridesBase}/test/java"
-  it.duplicatesStrategy DuplicatesStrategy.INCLUDE
+def copyTestSourceOverrides = tasks.register('copyTestSourceOverrides', Copy) 
{ copyTask ->
+  copyTask.from test_source_overrides
+  copyTask.into "${sourceOverridesBase}/test/java"
+  copyTask.duplicatesStrategy DuplicatesStrategy.INCLUDE
+
+  if (project.ext.has('excluded_files') && 
project.ext.excluded_files.containsKey('test')) {
+    project.ext.excluded_files.test.each { file ->
+      copyTask.exclude "**/${file}"
+    }
+  }
 }
 
 def copyTestResourcesOverrides = tasks.register('copyTestResourcesOverrides', 
Copy) {
@@ -92,45 +106,69 @@ def copyTestResourcesOverrides = 
tasks.register('copyTestResourcesOverrides', Co
   it.duplicatesStrategy DuplicatesStrategy.INCLUDE
 }
 
-// add dependency to gradle Java plugin defined tasks
-compileJava.dependsOn copySourceOverrides
-processResources.dependsOn copyResourcesOverrides
-compileTestJava.dependsOn copyTestSourceOverrides
-processTestResources.dependsOn copyTestResourcesOverrides
-
-// add dependency BeamModulePlugin defined custom tasks
-// they are defined only when certain flags are provided (e.g. -Prelease; 
-Ppublishing, etc)
-def sourcesJar = project.tasks.findByName('sourcesJar')
-if (sourcesJar != null) {
-  sourcesJar.dependsOn copySourceOverrides
-  sourcesJar.dependsOn copyResourcesOverrides
-}
-def testSourcesJar = project.tasks.findByName('testSourcesJar')
-if (testSourcesJar != null) {
-  testSourcesJar.dependsOn copyTestSourceOverrides
-  testSourcesJar.dependsOn copyTestResourcesOverrides
-}
+def use_override = (flink_major != all_versions.first())
+def sourceBase = "${project.projectDir}/../src"
 
-/*
+if (use_override) {
+  // Copy original+version specific sources to a tmp dir and use it as 
sourceSet
+  // add dependency to gradle Java plugin defined tasks
+  compileJava.dependsOn copySourceOverrides
+  processResources.dependsOn copyResourcesOverrides
+  compileTestJava.dependsOn copyTestSourceOverrides
+  processTestResources.dependsOn copyTestResourcesOverrides
+
+  // add dependency BeamModulePlugin defined custom tasks
+  // they are defined only when certain flags are provided (e.g. -Prelease; 
-Ppublishing, etc)
+  def sourcesJar = project.tasks.findByName('sourcesJar')
+  if (sourcesJar != null) {
+    sourcesJar.dependsOn copySourceOverrides
+    sourcesJar.dependsOn copyResourcesOverrides
+  }
+  def testSourcesJar = project.tasks.findByName('testSourcesJar')
+  if (testSourcesJar != null) {
+    testSourcesJar.dependsOn copyTestSourceOverrides
+    testSourcesJar.dependsOn copyTestResourcesOverrides
+  }
+  /*
  * We have to explicitly set all directories here to make sure each
  * version of Flink has the correct overrides set.
  */
-def sourceBase = "${project.projectDir}/../src"
-sourceSets {
-  main {
-    java {
-      srcDirs = ["${sourceBase}/main/java", "${sourceOverridesBase}/main/java"]
+  sourceSets {
+    main {
+      java {
+        srcDirs = ["${sourceOverridesBase}/main/java"]
+      }
+      resources {
+        srcDirs = ["${sourceBase}/main/resources", 
"${sourceOverridesBase}/main/resources"]
+      }
     }
-    resources {
-      srcDirs = ["${sourceBase}/main/resources", 
"${sourceOverridesBase}/main/resources"]
+    test {
+      java {
+        srcDirs = ["${sourceOverridesBase}/test/java"]
+      }
+      resources {
+        srcDirs = ["${sourceBase}/test/resources", 
"${sourceOverridesBase}/test/resources"]
+      }
     }
   }
-  test {
-    java {
-      srcDirs = ["${sourceBase}/test/java", "${sourceOverridesBase}/test/java"]
+} else {
+  // Use the original sources directly for the lowest supported Flink version.
+  sourceSets {
+    main {
+      java {
+        srcDirs = ["${sourceBase}/main/java"]
+      }
+      resources {
+        srcDirs = ["${sourceBase}/main/resources"]
+      }
     }
-    resources {
-      srcDirs = ["${sourceBase}/test/resources", 
"${sourceOverridesBase}/test/resources"]
+    test {
+      java {
+        srcDirs = ["${sourceBase}/test/java"]
+      }
+      resources {
+        srcDirs = ["${sourceBase}/test/resources"]
+      }
     }
   }
 }
@@ -196,7 +234,10 @@ dependencies {
 
   implementation "org.apache.flink:flink-core:$flink_version"
   implementation "org.apache.flink:flink-metrics-core:$flink_version"
-  implementation "org.apache.flink:flink-java:$flink_version"
+  if (project.ext.flink_major.startsWith('1')) {
+    // FLINK-36336: dataset API removed in Flink 2
+    implementation "org.apache.flink:flink-java:$flink_version"
+  }
 
   implementation "org.apache.flink:flink-runtime:$flink_version"
   implementation "org.apache.flink:flink-metrics-core:$flink_version"
diff --git 
a/runners/flink/job-server-container/flink_job_server_container.gradle 
b/runners/flink/job-server-container/flink_job_server_container.gradle
index 3f30a1aac1f..cf492b46929 100644
--- a/runners/flink/job-server-container/flink_job_server_container.gradle
+++ b/runners/flink/job-server-container/flink_job_server_container.gradle
@@ -53,15 +53,19 @@ task copyDockerfileDependencies(type: Copy) {
 }
 
 def pushContainers = project.rootProject.hasProperty(["isRelease"]) || 
project.rootProject.hasProperty("push-containers")
+def containerName = project.parent.name.startsWith("2") ? "flink_job_server" : 
"flink${project.parent.name}_job_server"
+def containerTag = project.rootProject.hasProperty(["docker-tag"]) ? 
project.rootProject["docker-tag"] : project.sdk_version
+if (project.parent.name.startsWith("2")) {
+  containerTag += "-flink${project.parent.name}"
+}
 
 docker {
   name containerImageName(
-          name: project.docker_image_default_repo_prefix + 
"flink${project.parent.name}_job_server",
+          name: project.docker_image_default_repo_prefix + containerName,
           root: project.rootProject.hasProperty(["docker-repository-root"]) ?
                   project.rootProject["docker-repository-root"] :
                   project.docker_image_default_repo_root,
-          tag: project.rootProject.hasProperty(["docker-tag"]) ?
-                  project.rootProject["docker-tag"] : project.sdk_version)
+          tag: containerTag)
   // tags used by dockerTag task
   tags containerImageTags()
   files "./build/"
diff --git a/runners/flink/job-server/flink_job_server.gradle 
b/runners/flink/job-server/flink_job_server.gradle
index d8a818ff84c..b85f8fc98aa 100644
--- a/runners/flink/job-server/flink_job_server.gradle
+++ b/runners/flink/job-server/flink_job_server.gradle
@@ -29,6 +29,11 @@ apply plugin: 'application'
 // we need to set mainClassName before applying shadow plugin
 mainClassName = "org.apache.beam.runners.flink.FlinkJobServerDriver"
 
+// Resolve the Flink project name (and version) the job-server is based on
+def flinkRunnerProject = "${project.path.replace(":job-server", "")}"
+evaluationDependsOn(flinkRunnerProject)
+boolean isFlink2 = project(flinkRunnerProject).ext.flink_major.startsWith('2')
+
 applyJavaNature(
   automaticModuleName: 'org.apache.beam.runners.flink.jobserver',
   archivesBaseName: project.hasProperty('archives_base_name') ? 
archives_base_name : archivesBaseName,
@@ -37,11 +42,9 @@ applyJavaNature(
   shadowClosure: {
     append "reference.conf"
   },
+  requireJavaVersion: isFlink2 ? JavaVersion.VERSION_11 : null
 )
 
-// Resolve the Flink project name (and version) the job-server is based on
-def flinkRunnerProject = "${project.path.replace(":job-server", "")}"
-
 description = project(flinkRunnerProject).description + " :: Job Server"
 
 /*
@@ -126,11 +129,12 @@ runShadow {
     jvmArgs += 
["-Dorg.slf4j.simpleLogger.defaultLogLevel=${project.property('logLevel')}"]
 }
 
-def portableValidatesRunnerTask(String name, boolean streaming, boolean 
checkpointing, boolean docker) {
+def portableValidatesRunnerTask(String name, String mode, boolean 
checkpointing, boolean docker) {
   def pipelineOptions = [
       // Limit resource consumption via parallelism
       "--parallelism=2",
   ]
+  boolean streaming = (mode == "streaming")
   if (streaming) {
     pipelineOptions += "--streaming"
     if (checkpointing) {
@@ -138,6 +142,9 @@ def portableValidatesRunnerTask(String name, boolean 
streaming, boolean checkpoi
       pipelineOptions += "--shutdownSourcesAfterIdleMs=60000"
     }
   }
+  if (mode == "batch") {
+    pipelineOptions += "--useDataStreamForBatch=true"
+  }
   createPortableValidatesRunnerTask(
     name: "validatesPortableRunner${name}",
     jobServerDriver: "org.apache.beam.runners.flink.FlinkJobServerDriver",
@@ -186,7 +193,9 @@ def portableValidatesRunnerTask(String name, boolean 
streaming, boolean checkpoi
           excludeCategories 
'org.apache.beam.sdk.testing.UsesTriggeredSideInputs'
           return
         }
-
+        if (mode == "batch") {
+            excludeCategories 
'org.apache.beam.sdk.testing.UsesTriggeredSideInputs'
+        }
         excludeCategories 
'org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo'
         excludeCategories 
'org.apache.beam.sdk.testing.UsesUnboundedPCollections'
         excludeCategories 'org.apache.beam.sdk.testing.UsesTestStream'
@@ -214,13 +223,17 @@ def portableValidatesRunnerTask(String name, boolean 
streaming, boolean checkpoi
   )
 }
 
-project.ext.validatesPortableRunnerDocker = 
portableValidatesRunnerTask("Docker", false, false, true)
-project.ext.validatesPortableRunnerBatch = 
portableValidatesRunnerTask("Batch", false, false, false)
-project.ext.validatesPortableRunnerStreaming = 
portableValidatesRunnerTask("Streaming", true, false, false)
-project.ext.validatesPortableRunnerStreamingCheckpoint = 
portableValidatesRunnerTask("StreamingCheckpointing", true, true, false)
+project.ext.validatesPortableRunnerDocker = 
portableValidatesRunnerTask("Docker", "batch", false, true)
+project.ext.validatesPortableRunnerBatchDataSet = 
portableValidatesRunnerTask("BatchDataSet", "batch-dataset", false, false)
+project.ext.validatesPortableRunnerBatch = 
portableValidatesRunnerTask("Batch", "batch", false, false)
+project.ext.validatesPortableRunnerStreaming = 
portableValidatesRunnerTask("Streaming", "streaming", false, false)
+project.ext.validatesPortableRunnerStreamingCheckpoint = 
portableValidatesRunnerTask("StreamingCheckpointing", "streaming", true, false)
 
 tasks.register("validatesPortableRunner") {
   dependsOn validatesPortableRunnerDocker
+  if (!isFlink2) {
+    dependsOn validatesPortableRunnerBatchDataSet
+  }
   dependsOn validatesPortableRunnerBatch
   dependsOn validatesPortableRunnerStreaming
   dependsOn validatesPortableRunnerStreamingCheckpoint
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index c9559a39270..11175129d7e 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -84,7 +84,9 @@ public class FlinkPipelineRunner implements 
PortablePipelineRunner {
     
SdkHarnessOptions.getConfiguredLoggerFromOptions(pipelineOptions.as(SdkHarnessOptions.class));
 
     FlinkPortablePipelineTranslator<?> translator;
-    if (!pipelineOptions.isStreaming() && !hasUnboundedPCollections(pipeline)) 
{
+    if (!pipelineOptions.getUseDataStreamForBatch()
+        && !pipelineOptions.isStreaming()
+        && !hasUnboundedPCollections(pipeline)) {
       // TODO: Do we need to inspect for unbounded sources before fusing?
       translator = FlinkBatchPortablePipelineTranslator.createTranslator();
     } else {
diff --git 
a/runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
similarity index 100%
rename from 
runners/flink/1.17/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
rename to 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
diff --git 
a/runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java
similarity index 100%
rename from 
runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java
rename to 
runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/MemoryStateBackendWrapper.java
diff --git 
a/runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java
similarity index 100%
rename from 
runners/flink/1.17/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java
rename to 
runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamSources.java

Reply via email to