[
https://issues.apache.org/jira/browse/BEAM-1620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16284596#comment-16284596
]
ASF GitHub Bot commented on BEAM-1620:
--------------------------------------
kennknowles closed pull request #2947: [BEAM-1620] Run ValidatesRunner on
Dataflow streaming in postcommit
URL: https://github.com/apache/beam/pull/2947
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy
b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy
index 2c739e359e3..c54b2e46b3e 100644
---
a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy
+++
b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow.groovy
@@ -41,5 +41,20 @@ mavenJob('beam_PostCommit_Java_ValidatesRunner_Dataflow') {
'Run Dataflow ValidatesRunner')
// Maven goals for this job.
- goals('-B -e clean verify -am -pl runners/google-cloud-dataflow-java
-DforkCount=0 -DvalidatesRunnerPipelineOptions=\'[
"--runner=TestDataflowRunner", "--project=apache-beam-testing",
"--tempRoot=gs://temp-storage-for-validates-runner-tests/" ]\'')
+ goals([
+ 'clean',
+ 'verify',
+ '--projects runners/google-cloud-dataflow-java',
+ '--batch-mode',
+ '--errors',
+ '--also-make',
+ '-DforkCount=0',
+ '-Ddataflow.skipStreamingITs=true',
+ '-Ddataflow.skipBatchITs=false',
+ '''-DvalidatesRunnerPipelineOptions='[
+ "--runner=TestDataflowRunner",
+ "--project=apache-beam-testing",
+ "--tempRoot=gs://temp-storage-for-validates-runner-tests/"
+ ]' '''
+ ].join(' '))
}
diff --git
a/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.groovy
b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.groovy
new file mode 100644
index 00000000000..8e80ae16db3
--- /dev/null
+++
b/.test-infra/jenkins/job_beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.groovy
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import common_job_properties
+
+// This job runs the suite of ValidatesRunner tests against the Dataflow
+// runner.
+mavenJob('beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming') {
+ description('Runs the ValidatesRunner suite on the Dataflow runner in
streaming mode.')
+
+ // Set common parameters.
+ common_job_properties.setTopLevelMainJobProperties(delegate, 'master', 120)
+
+ // Set maven parameters.
+ common_job_properties.setMavenConfig(delegate)
+
+ // Sets that this is a PostCommit job.
+ common_job_properties.setPostCommit(delegate)
+
+ // Allows triggering this build against pull requests.
+ common_job_properties.enablePhraseTriggeringFromPullRequest(
+ delegate,
+ 'Google Cloud Dataflow Runner Streaming ValidatesRunner Tests',
+ 'Run Dataflow ValidatesRunner Streaming')
+
+ // Maven goals for this job.
+ goals([
+ 'clean',
+ 'verify',
+ '--projects runners/google-cloud-dataflow-java',
+ '--batch-mode',
+ '--errors',
+ '--also-make',
+ '-DforkCount=0',
+ '-Ddataflow.skipStreamingITs=false',
+ '-Ddataflow.skipBatchITs=true',
+ '''-DvalidatesRunnerPipelineOptions='[
+ "--runner=TestDataflowRunner",
+ "--project=apache-beam-testing",
+ "--tempRoot=gs://temp-storage-for-validates-runner-tests/"
+ ]' '''
+ ].join(' '))
+}
diff --git a/runners/google-cloud-dataflow-java/pom.xml
b/runners/google-cloud-dataflow-java/pom.xml
index 2e0818107e7..81283f1919c 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -36,6 +36,8 @@
<dataflow.container_version>beam-master-20170926</dataflow.container_version>
<dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
<dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
+ <dataflow.skipStreamingITs>true</dataflow.skipStreamingITs>
+ <dataflow.skipBatchITs>false</dataflow.skipBatchITs>
</properties>
<profiles>
@@ -54,6 +56,15 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<executions>
+ <!--
+ Runs all the ValidatesRunner tests on the DataflowRunner
using the provided
+ command line arguments, with an additional system property
that forces the
+ DataflowRunner into batch mode.
+
+ Enabled by default in the validates-runner-tests profile. Use
+ -Ddataflow.skipBatchITs=true to skip this (for example, to
run only
+ streaming tests)
+ -->
<execution>
<id>validates-runner-tests</id>
<phase>integration-test</phase>
@@ -61,7 +72,7 @@
<goal>test</goal>
</goals>
<configuration>
- <skip>false</skip>
+ <skip>${dataflow.skipBatchITs}</skip>
<groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
<parallel>all</parallel>
<threadCount>4</threadCount>
@@ -69,7 +80,41 @@
<dependency>org.apache.beam:beam-sdks-java-core</dependency>
</dependenciesToScan>
<systemPropertyVariables>
-
<beamTestPipelineOptions>${validatesRunnerPipelineOptions}</beamTestPipelineOptions>
+ <beamTestPipelineOptions>
+ ${validatesRunnerPipelineOptions}
+ </beamTestPipelineOptions>
+
<beam.dataflow.forceStreaming>false</beam.dataflow.forceStreaming>
+ </systemPropertyVariables>
+ </configuration>
+ </execution>
+
+ <!--
+ Runs all the ValidatesRunner tests on the DataflowRunner
using the provided
+ command line arguments, with an additional system property
that forces the
+ DataflowRunner into streaming mode.
+
+ Skipped by default, use -Ddataflow.skipStreamingITs=false to
enable. You may
+ also want to use -Ddataflow.skipBatchITs=true to run only
streaming tests.
+ -->
+ <execution>
+ <id>streaming-validates-runner-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <skip>${dataflow.skipStreamingITs}</skip>
+
<groups>org.apache.beam.sdk.testing.ValidatesRunner</groups>
+ <parallel>all</parallel>
+ <threadCount>4</threadCount>
+ <dependenciesToScan>
+
<dependency>org.apache.beam:beam-sdks-java-core</dependency>
+ </dependenciesToScan>
+ <systemPropertyVariables>
+ <beamTestPipelineOptions>
+ ${validatesRunnerPipelineOptions}
+ </beamTestPipelineOptions>
+
<beam.dataflow.forceStreaming>true</beam.dataflow.forceStreaming>
</systemPropertyVariables>
</configuration>
</execution>
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
index 1abea99fcb6..9077483bf89 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
@@ -52,6 +52,8 @@
* @see TestPipeline
*/
public class TestDataflowRunner extends PipelineRunner<DataflowPipelineJob> {
+ @VisibleForTesting
+ static final String DATAFLOW_FORCE_STREAMING_PROPERTY =
"beam.dataflow.forceStreaming";
private static final String TENTATIVE_COUNTER = "tentative";
private static final Logger LOG =
LoggerFactory.getLogger(TestDataflowRunner.class);
@@ -74,6 +76,11 @@ public static TestDataflowRunner fromOptions(PipelineOptions
options) {
.join(dataflowOptions.getTempRoot(), dataflowOptions.getJobName(),
"output", "results");
dataflowOptions.setTempLocation(tempLocation);
+ String forceStreaming =
System.getProperty(DATAFLOW_FORCE_STREAMING_PROPERTY);
+ if (forceStreaming != null) {
+ dataflowOptions.setStreaming(Boolean.parseBoolean(forceStreaming));
+ }
+
return new TestDataflowRunner(
dataflowOptions,
DataflowClient.create(options.as(DataflowPipelineOptions.class)));
}
@@ -84,6 +91,11 @@ static TestDataflowRunner fromOptionsAndClient(
return new TestDataflowRunner(options, client);
}
+ @VisibleForTesting
+ TestDataflowPipelineOptions getOptions() {
+ return options;
+ }
+
@Override
public DataflowPipelineJob run(Pipeline pipeline) {
return run(pipeline, runner);
diff --git
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
index 15faf2612c9..a1296e194f4 100644
---
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
+++
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
@@ -19,6 +19,7 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
@@ -54,6 +55,7 @@
import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RestoreSystemProperties;
import org.apache.beam.sdk.testing.SerializableMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
@@ -80,6 +82,8 @@
@RunWith(JUnit4.class)
public class TestDataflowRunnerTest {
@Rule public ExpectedException expectedException = ExpectedException.none();
+ @Rule
+ public final RestoreSystemProperties restoreSystemProperties = new
RestoreSystemProperties();
@Mock private DataflowClient mockClient;
private TestDataflowPipelineOptions options;
@@ -98,6 +102,22 @@ public void setUp() throws Exception {
options.setPathValidatorClass(NoopPathValidator.class);
}
+ @Test
+ public void testForceStreamingTrue() {
+ options.setStreaming(false);
+ System.setProperty(TestDataflowRunner.DATAFLOW_FORCE_STREAMING_PROPERTY,
"true");
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
+ assertThat(runner.getOptions().isStreaming(), is(true));
+ }
+
+ @Test
+ public void testForceStreamingFalse() {
+ options.setStreaming(false);
+ System.setProperty(TestDataflowRunner.DATAFLOW_FORCE_STREAMING_PROPERTY,
"false");
+ TestDataflowRunner runner = TestDataflowRunner.fromOptions(options);
+ assertThat(runner.getOptions().isStreaming(), is(false));
+ }
+
@Test
public void testToString() {
assertEquals("TestDataflowRunner#TestAppName",
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add streaming Dataflow ValidatesRunner coverage
> -----------------------------------------------
>
> Key: BEAM-1620
> URL: https://issues.apache.org/jira/browse/BEAM-1620
> Project: Beam
> Issue Type: Test
> Components: runner-dataflow, testing
> Reporter: Kenneth Knowles
> Assignee: Jason Kuster
>
> Currently, the runner validation test suite is not run on Dataflow in
> streaming mode. In fact, it should be able to run - all the functionality is
> in place. I think this is just a matter of maven + Jenkins + making sure not
> to leak a bunch of streaming jobs.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)