[ https://issues.apache.org/jira/browse/BEAM-6225?focusedWorklogId=175594&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-175594 ]
ASF GitHub Bot logged work on BEAM-6225: ---------------------------------------- Author: ASF GitHub Bot Created on: 14/Dec/18 21:47 Start Date: 14/Dec/18 21:47 Worklog Time Spent: 10m Work Description: swegner closed pull request #7271: [BEAM-6225] Setup Jenkins Job to Run VR with ExecutableStage URL: https://github.com/apache/beam/pull/7271 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_DataflowPortabilityExecutableStage.groovy b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_DataflowPortabilityExecutableStage.groovy new file mode 100644 index 000000000000..62e73617b74e --- /dev/null +++ b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_DataflowPortabilityExecutableStage.groovy @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import CommonJobProperties as commonJobProperties +import PostcommitJobBuilder + + +// This job runs the suite of ValidatesRunner tests against the Dataflow +// runner. +PostcommitJobBuilder.postCommitJob('beam_PostCommit_Java_ValidatesRunner_DataflowPortabilityExecutableStage', + 'Run Dataflow Portability ExecutableStage ValidatesRunner', 'Google Cloud Dataflow Runner PortabilityApi ExecutableStage ValidatesRunner Tests', this) { + + description('Runs the ValidatesRunner suite on the Dataflow PortabilityApi runner with ExecutableStage code path enabled.') + + // Set common parameters. Sets a 3 hour timeout. + commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 400) + + // Publish all test results to Jenkins + publishers { + archiveJunit('**/build/test-results/**/*.xml') + } + + // Gradle goals for this job. + steps { + gradle { + rootBuildScriptDir(commonJobProperties.checkoutDir) + tasks(':beam-runners-google-cloud-dataflow-java:validatesRunnerFnApiWorkerExecutableStageTest') + // Increase parallel worker threads above processor limit since most time is + // spent waiting on Dataflow jobs. ValidatesRunner tests on Dataflow are slow + // because each one launches a Dataflow job with about 3 mins of overhead. + // 3 x num_cores strikes a good balance between maxing out parallelism without + // overloading the machines. + commonJobProperties.setGradleSwitches(delegate, 3 * Runtime.runtime.availableProcessors()) + } + } + + // [BEAM-6236] "use_executable_stage_bundle_execution" hasn't been rolled out. + disabled() +} diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle index c0b831c6e547..9c6aaf4f773a 100644 --- a/runners/google-cloud-dataflow-java/build.gradle +++ b/runners/google-cloud-dataflow-java/build.gradle @@ -241,17 +241,55 @@ task validatesRunnerFnApiWorkerTest(type: Test) { } } +task validatesRunnerFnApiWorkerExecutableStageTest(type: Test) { + group = "Verification" + description "Validates Dataflow PortabilityApi runner" + dependsOn ":beam-runners-google-cloud-dataflow-java-fn-api-worker:shadowJar" + dependsOn buildAndPushDockerContainer + + systemProperty "beamTestPipelineOptions", JsonOutput.toJson([ + "--runner=TestDataflowRunner", + "--project=${dataflowProject}", + "--tempRoot=${dataflowPostCommitTempRoot}", + "--dataflowWorkerJar=${dataflowFnApiWorkerJar}", + "--workerHarnessContainerImage=${dockerImageContainer}:${dockerTag}", + "--experiments=beam_fn_api,use_executable_stage_bundle_execution"] + ) + + // Increase test parallelism up to the number of Gradle workers. By default this is equal + // to the number of CPU cores, but can be increased by setting --max-workers=N. + maxParallelForks Integer.MAX_VALUE + classpath = configurations.validatesRunner + testClassesDirs = files(project(":beam-sdks-java-core").sourceSets.test.output.classesDirs) + // TODO(BEAM-6232): ViewTest tests sideinputs, which is not supported bu current bundle execution. + exclude '**/ViewTest.class' + useJUnit { + includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner' + commonExcludeCategories.each { + excludeCategories it + } + fnApiWorkerExcludeCategories.each { + excludeCategories it + } + // TODO(BEAM-6232): Support sideinput. + excludeCategories 'org.apache.beam.sdk.testing.UsesSideInputs' + // TODO(BEAM-6233): Support timer and state. + excludeCategories 'org.apache.beam.sdk.testing.UsesStatefulParDo' + // TODO(BEAM-6231): Triage failures. + excludeCategories 'org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported' + } +} + task validatesRunner { group = "Verification" description "Validates Dataflow runner" dependsOn validatesRunnerLegacyWorkerTest } -task validatesRunnerPortabilityApi { - group = "Verification" - description "Validates Dataflow PortabilityApi runner" - dependsOn validatesRunnerFnApiWorkerTest - dependsOn buildAndPushDockerContainer +task validatesRunnerPortabilityApiExecutableStage { + group = "Verification" + description "Validates Dataflow PortabilityApi runner" + dependsOn validatesRunnerFnApiWorkerExecutableStageTest } task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/DataflowPortabilityExecutableStageUnsupported.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/DataflowPortabilityExecutableStageUnsupported.java new file mode 100644 index 000000000000..a14885f1e626 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/DataflowPortabilityExecutableStageUnsupported.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +/** + * Category tag for validation tests which are not supported by Dataflow portable worker with + * use_exetuable_stage_bundle_execution, which needs more investigations. + */ +// TODO(BEAM-6231): Triage test failures introduced by using ExecutableStage. +public interface DataflowPortabilityExecutableStageUnsupported {} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputs.java new file mode 100644 index 000000000000..907c13ec5641 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputs.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +/** + * Category tag for validation tests which use sideinputs. Tests tagged with {@link UsesSideInputs} + * should be run for runners which support sideinputs. + */ +public interface UsesSideInputs {} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java index bf7ee6971ad8..160f709bd9fc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java @@ -297,7 +297,7 @@ public void testPAssertHashCodeIterableUnsupported() throws Exception { /** Basic test for {@code isEqualTo}. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testIsEqualTo() throws Exception { PCollection<Integer> pcollection = pipeline.apply(Create.of(43)); PAssert.thatSingleton(pcollection).isEqualTo(43); @@ -306,7 +306,7 @@ public void testIsEqualTo() throws Exception { /** Basic test for {@code isEqualTo}. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testWindowedIsEqualTo() throws Exception { PCollection<Integer> pcollection = pipeline @@ -326,7 +326,7 @@ public void testWindowedIsEqualTo() throws Exception { /** Basic test for {@code notEqualTo}. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testNotEqualTo() throws Exception { PCollection<Integer> pcollection = pipeline.apply(Create.of(43)); PAssert.thatSingleton(pcollection).notEqualTo(42); @@ -335,7 +335,7 @@ public void testNotEqualTo() throws Exception { /** Test that we throw an error for false assertion on singleton. */ @Test - @Category({ValidatesRunner.class, UsesFailureMessage.class}) + @Category({ValidatesRunner.class, UsesFailureMessage.class, UsesSideInputs.class}) public void testPAssertEqualsSingletonFalse() throws Exception { PCollection<Integer> pcollection = pipeline.apply(Create.of(42)); PAssert.thatSingleton("The value was not equal to 44", pcollection).isEqualTo(44); @@ -351,7 +351,7 @@ public void testPAssertEqualsSingletonFalse() throws Exception { /** Test that we throw an error for false assertion on singleton. */ @Test - @Category({ValidatesRunner.class, UsesFailureMessage.class}) + @Category({ValidatesRunner.class, UsesFailureMessage.class, UsesSideInputs.class}) public void testPAssertEqualsSingletonFalseDefaultReasonString() throws Exception { PCollection<Integer> pcollection = pipeline.apply(Create.of(42)); PAssert.thatSingleton(pcollection).isEqualTo(44); @@ -385,7 +385,7 @@ public void testGlobalWindowContainsInAnyOrder() throws Exception { /** Tests that windowed {@code containsInAnyOrder} is actually order-independent. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testWindowedContainsInAnyOrder() throws Exception { PCollection<Integer> pcollection = pipeline diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index 2d825dcef20a..21f7cdaac426 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -58,9 +58,11 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported; +import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesSideInputs; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.CombineTest.SharedTestBase.TestCombineFn.Accumulator; @@ -662,7 +664,7 @@ public void testSimpleCombineEmpty() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testBasicCombine() { runTestBasicCombine( Arrays.asList(KV.of("a", 1), KV.of("a", 1), KV.of("a", 4), KV.of("b", 1), KV.of("b", 13)), @@ -673,7 +675,7 @@ public void testBasicCombine() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testBasicCombineEmpty() { runTestBasicCombine(EMPTY_TABLE, ImmutableSet.of(), Collections.emptyList()); } @@ -950,7 +952,7 @@ public void testLambdaDisplayData() { @RunWith(JUnit4.class) public static class CombineWithContextTests extends SharedTestBase { @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) @SuppressWarnings({"rawtypes", "unchecked"}) public void testSimpleCombineWithContext() { runTestSimpleCombineWithContext( @@ -961,7 +963,7 @@ public void testSimpleCombineWithContext() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testSimpleCombineWithContextEmpty() { runTestSimpleCombineWithContext(EMPTY_TABLE, 0, Collections.emptyList(), new String[] {}); } @@ -1022,7 +1024,7 @@ public void testFixedWindowsCombine() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testFixedWindowsCombineWithContext() { PCollection<KV<String, Integer>> perKeyInput = pipeline @@ -1064,7 +1066,7 @@ public void testFixedWindowsCombineWithContext() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testSlidingWindowsCombine() { PCollection<String> input = pipeline @@ -1123,7 +1125,7 @@ public void testSlidingWindowsCombine() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testSlidingWindowsCombineWithContext() { // [a: 1, 1], [a: 4; b: 1], [b: 13] PCollection<KV<String, Integer>> perKeyInput = @@ -1174,7 +1176,7 @@ public void testSlidingWindowsCombineWithContext() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testGlobalCombineWithDefaultsAndTriggers() { PCollection<Integer> input = pipeline.apply(Create.of(1, 1)); @@ -1201,7 +1203,7 @@ public void testGlobalCombineWithDefaultsAndTriggers() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testSessionsCombine() { PCollection<KV<String, Integer>> input = pipeline @@ -1227,7 +1229,7 @@ public void testSessionsCombine() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testSessionsCombineWithContext() { PCollection<KV<String, Integer>> perKeyInput = pipeline.apply( @@ -1292,7 +1294,7 @@ public void testWindowedCombineEmpty() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testCombineGloballyAsSingletonView() { final PCollectionView<Integer> view = pipeline @@ -1318,7 +1320,7 @@ public void processElement(ProcessContext c) { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testWindowedCombineGloballyAsSingletonView() { FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(1)); final PCollectionView<Integer> view = @@ -1362,7 +1364,7 @@ public void processElement(ProcessContext c) { /** Tests creation of a global {@link Combine} via Java 8 lambda. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testCombineGloballyLambda() { PCollection<Integer> output = @@ -1384,7 +1386,7 @@ public void testCombineGloballyLambda() { /** Tests creation of a global {@link Combine} via a Java 8 method reference. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testCombineGloballyInstanceMethodReference() { PCollection<Integer> output = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java index aa25136fce5c..693598bf1f82 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesSideInputs; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Sessions; @@ -184,7 +185,7 @@ public void testFlattenMultipleCoders() throws CannotProvideCoderException { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testEmptyFlattenAsSideInput() { final PCollectionView<Iterable<String>> view = PCollectionList.<String>empty(p) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 2049e107744b..c6bd909130a8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported; +import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported; import org.apache.beam.sdk.testing.LargeKeys; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; @@ -254,7 +255,7 @@ public void testGroupByKeyDirectUnbounded() { * two values. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testTimestampCombinerEarliest() { p.apply( @@ -275,7 +276,7 @@ public void testTimestampCombinerEarliest() { * the windowing function customized to use the latest value. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testTimestampCombinerLatest() { p.apply( Create.timestamped( @@ -382,7 +383,7 @@ public void testLargeKeys100MB() throws Exception { @RunWith(JUnit4.class) public static class WindowTests extends SharedTestBase { @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testGroupByKeyAndWindows() { List<KV<String, Integer>> ungroupedPairs = Arrays.asList( @@ -423,7 +424,7 @@ public void testGroupByKeyAndWindows() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testGroupByKeyMultipleWindows() { PCollection<KV<String, Integer>> windowedInput = p.apply( @@ -453,7 +454,7 @@ public void testGroupByKeyMultipleWindows() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testGroupByKeyMergingWindows() { PCollection<KV<String, Integer>> windowedInput = p.apply( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index dab9f133c0f8..3264bdefee49 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -82,12 +82,14 @@ import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported; +import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.testing.UsesMapState; import org.apache.beam.sdk.testing.UsesSetState; +import org.apache.beam.sdk.testing.UsesSideInputs; import org.apache.beam.sdk.testing.UsesStatefulParDo; import org.apache.beam.sdk.testing.UsesTestStream; import org.apache.beam.sdk.testing.UsesTimersInParDo; @@ -342,7 +344,6 @@ public void testParDo() { pipeline.apply(Create.of(inputs)).apply(ParDo.of(new TestDoFn())); PAssert.that(output).satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); - pipeline.run(); } @@ -706,7 +707,7 @@ public void testParDoWritingToUndeclaredTag() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testParDoWithSideInputs() { List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -738,7 +739,7 @@ public void testParDoWithSideInputs() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testParDoWithSideInputsIsCumulative() { List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -772,7 +773,7 @@ public void testParDoWithSideInputsIsCumulative() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testMultiOutputParDoWithSideInputs() { List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -810,7 +811,7 @@ public void testMultiOutputParDoWithSideInputs() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testMultiOutputParDoWithSideInputsIsCumulative() { List<Integer> inputs = Arrays.asList(3, -42, 666); @@ -866,7 +867,7 @@ public void testParDoReadingFromUnknownSideInput() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testSideInputsWithMultipleWindows() { // Tests that the runner can safely run a DoFn that uses side inputs // on an input where the element is in multiple windows. The complication is @@ -1186,7 +1187,7 @@ public void testParDoWithErrorInFinishBatch() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testWindowingInStartAndFinishBundle() { final FixedWindows windowFn = FixedWindows.of(Duration.millis(1)); @@ -1961,7 +1962,7 @@ public void processElement( } @Test - @Category({ValidatesRunner.class, UsesStatefulParDo.class}) + @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesSideInputs.class}) public void testBagStateSideInput() { final PCollectionView<List<Integer>> listView = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java index 6c2c94b04b9e..5d9c7f258cb3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; @@ -149,7 +150,7 @@ public void testReshufflePreservesTimestamps() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testReshuffleAfterSessionsAndGroupByKey() { PCollection<KV<String, Iterable<Integer>>> input = @@ -170,7 +171,7 @@ public void testReshuffleAfterSessionsAndGroupByKey() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testReshuffleAfterFixedWindowsAndGroupByKey() { PCollection<KV<String, Iterable<Integer>>> input = @@ -191,7 +192,7 @@ public void testReshuffleAfterFixedWindowsAndGroupByKey() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testReshuffleAfterSlidingWindowsAndGroupByKey() { PCollection<KV<String, Iterable<Integer>>> input = @@ -212,7 +213,7 @@ public void testReshuffleAfterSlidingWindowsAndGroupByKey() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testReshuffleAfterFixedWindows() { PCollection<KV<String, Integer>> input = @@ -232,7 +233,7 @@ public void testReshuffleAfterFixedWindows() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testReshuffleAfterSlidingWindows() { PCollection<KV<String, Integer>> input = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index 08a26cb90c45..01e73352fd1d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -34,12 +34,14 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported; +import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.testing.UsesBoundedSplittableParDo; import org.apache.beam.sdk.testing.UsesParDoLifecycle; +import org.apache.beam.sdk.testing.UsesSideInputs; import org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs; import org.apache.beam.sdk.testing.UsesTestStream; import org.apache.beam.sdk.testing.UsesUnboundedSplittableParDo; @@ -152,7 +154,11 @@ private void testPairWithIndexBasic(IsBounded bounded) { } @Test - @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class}) + @Category({ + ValidatesRunner.class, + UsesBoundedSplittableParDo.class, + DataflowPortabilityExecutableStageUnsupported.class + }) public void testPairWithIndexWindowedTimestampedBounded() { testPairWithIndexWindowedTimestamped(IsBounded.BOUNDED); } @@ -351,7 +357,7 @@ private static SDFWithSideInputBase sdfWithSideInput( } @Test - @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class}) + @Category({ValidatesRunner.class, UsesBoundedSplittableParDo.class, UsesSideInputs.class}) public void testSideInputBounded() { testSideInput(IsBounded.BOUNDED); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java index ff476e30a6df..002bc4bddd3e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java @@ -32,9 +32,11 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesSideInputs; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; @@ -118,7 +120,7 @@ public void processElement(ProcessContext c) { @Rule public final transient TestPipeline p = TestPipeline.create(); @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testCoGroupByKeyGetOnly() { final TupleTag<String> tag1 = new TupleTag<>(); final TupleTag<String> tag2 = new TupleTag<>(); @@ -248,7 +250,7 @@ public void testCoGroupByKeyGetOnly() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, UsesSideInputs.class}) public void testCoGroupByKey() { final TupleTag<String> namesTag = new TupleTag<>(); final TupleTag<String> addressesTag = new TupleTag<>(); @@ -468,7 +470,7 @@ public void testCoGroupByKeyHandleResults() { /** Tests the pipeline end-to-end with FixedWindows. */ @SuppressWarnings("unchecked") @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testCoGroupByKeyWithWindowing() { TupleTag<String> clicksTag = new TupleTag<>(); TupleTag<String> purchasesTag = new TupleTag<>(); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 6b1be69af64a..5345213a6de5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -52,6 +52,7 @@ import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported; +import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.UsesCustomWindowMerging; @@ -439,7 +440,7 @@ public Boolean apply(Long input) { * the windowing function default, the end of the window. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testTimestampCombinerDefault() { pipeline.enableAbandonedNodeEnforcement(true); @@ -473,7 +474,7 @@ public void processElement(ProcessContext c) throws Exception { * the windowing function customized to use the end of the window. */ @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testTimestampCombinerEndOfWindow() { pipeline.enableAbandonedNodeEnforcement(true); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java index 88bf613763c8..8b8faf8826e9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.testing.DataflowPortabilityExecutableStageUnsupported; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -103,7 +104,7 @@ private String output(String value, int count, int timestamp, int windowStart, i } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testPartitioningWindowing() { PCollection<String> input = p.apply( @@ -127,7 +128,7 @@ public void testPartitioningWindowing() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testNonPartitioningWindowing() { PCollection<String> input = p.apply( @@ -151,7 +152,7 @@ public void testNonPartitioningWindowing() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testMergingWindowing() { PCollection<String> input = p.apply( @@ -169,7 +170,7 @@ public void testMergingWindowing() { } @Test - @Category(ValidatesRunner.class) + @Category({ValidatesRunner.class, DataflowPortabilityExecutableStageUnsupported.class}) public void testWindowPreservation() { PCollection<String> input1 = p.apply( ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 175594) Time Spent: 3h 40m (was: 3.5h) > Setup Jenkins VR job for new bundle processing code > --------------------------------------------------- > > Key: BEAM-6225 > URL: https://issues.apache.org/jira/browse/BEAM-6225 > Project: Beam > Issue Type: Task > Components: runner-dataflow > Reporter: Boyuan Zhang > Assignee: Boyuan Zhang > Priority: Major > Time Spent: 3h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)