[BEAM-151] Break out Dataflow runner dependency to separate test file This allows for moving the Dataflow specific portion of the test to the Dataflow runner maven module.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a18ab871 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a18ab871 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a18ab871 Branch: refs/heads/master Commit: a18ab871f8f1b9a3219cee2ecd5d519af23efa87 Parents: 7f0a2f7 Author: Luke Cwik <[email protected]> Authored: Fri Mar 25 16:09:18 2016 -0700 Committer: Luke Cwik <[email protected]> Committed: Thu Apr 7 11:18:32 2016 -0700 ---------------------------------------------------------------------- .../sdk/transforms/DataflowGroupByKeyTest.java | 110 +++++++++++++++++++ .../dataflow/sdk/transforms/GroupByKeyTest.java | 62 ----------- 2 files changed, 110 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a18ab871/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java new file mode 100644 index 0000000..b05e7a2 --- /dev/null +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/DataflowGroupByKeyTest.java @@ -0,0 +1,110 @@ +/* + * Copyright (C) 2016 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.transforms; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder; +import com.google.cloud.dataflow.sdk.coders.KvCoder; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; +import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.util.NoopPathValidator; +import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PBegin; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; + +import org.joda.time.Duration; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Arrays; +import java.util.List; + +/** Tests for {@link GroupByKey} for the {@link DataflowPipelineRunner}. */ +@RunWith(JUnit4.class) +public class DataflowGroupByKeyTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + /** + * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey} + * is not expanded. This is used for verifying that even without expansion the proper errors show + * up. + */ + private Pipeline createTestServiceRunner() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setRunner(DataflowPipelineRunner.class); + options.setProject("someproject"); + options.setStagingLocation("gs://staging"); + options.setPathValidatorClass(NoopPathValidator.class); + options.setDataflowClient(null); + return Pipeline.create(options); + } + + @Test + public void testInvalidWindowsService() { + Pipeline p = createTestServiceRunner(); + + List<KV<String, Integer>> ungroupedPairs = Arrays.asList(); + + PCollection<KV<String, Integer>> input = + p.apply(Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) + .apply(Window.<KV<String, Integer>>into( + Sessions.withGapDuration(Duration.standardMinutes(1)))); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("GroupByKey must have a valid Window merge function"); + input + .apply("GroupByKey", GroupByKey.<String, Integer>create()) + .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create()); + } + + @Test + public void testGroupByKeyServiceUnbounded() { + Pipeline p = createTestServiceRunner(); + + PCollection<KV<String, Integer>> input = + p.apply( + new PTransform<PBegin, PCollection<KV<String, Integer>>>() { + @Override + public PCollection<KV<String, Integer>> apply(PBegin input) { + return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal( + input.getPipeline(), + WindowingStrategy.globalDefault(), + PCollection.IsBounded.UNBOUNDED) + .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {}); + } + }); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage( + "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without " + + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey."); + + input.apply("GroupByKey", GroupByKey.<String, Integer>create()); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a18ab871/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java index bb64f60..6fb811e 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java @@ -28,10 +28,8 @@ import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder; import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.coders.MapCoder; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; import com.google.cloud.dataflow.sdk.options.DirectPipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.testing.RunnableOnService; @@ -41,7 +39,6 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFns; import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.util.NoopPathValidator; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PBegin; @@ -241,21 +238,6 @@ public class GroupByKeyTest { Duration.standardMinutes(1))))); } - /** - * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey} - * is not expanded. This is used for verifying that even without expansion the proper errors show - * up. - */ - private Pipeline createTestServiceRunner() { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setRunner(DataflowPipelineRunner.class); - options.setProject("someproject"); - options.setStagingLocation("gs://staging"); - options.setPathValidatorClass(NoopPathValidator.class); - options.setDataflowClient(null); - return Pipeline.create(options); - } - private Pipeline createTestDirectRunner() { DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class); options.setRunner(DirectPipelineRunner.class); @@ -282,25 +264,6 @@ public class GroupByKeyTest { } @Test - public void testInvalidWindowsService() { - Pipeline p = createTestServiceRunner(); - - List<KV<String, Integer>> ungroupedPairs = Arrays.asList(); - - PCollection<KV<String, Integer>> input = - p.apply(Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))) - .apply(Window.<KV<String, Integer>>into( - Sessions.withGapDuration(Duration.standardMinutes(1)))); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("GroupByKey must have a valid Window merge function"); - input - .apply("GroupByKey", GroupByKey.<String, Integer>create()) - .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create()); - } - - @Test public void testRemerge() { Pipeline p = TestPipeline.create(); @@ -350,31 +313,6 @@ public class GroupByKeyTest { input.apply("GroupByKey", GroupByKey.<String, Integer>create()); } - @Test - public void testGroupByKeyServiceUnbounded() { - Pipeline p = createTestServiceRunner(); - - PCollection<KV<String, Integer>> input = - p.apply( - new PTransform<PBegin, PCollection<KV<String, Integer>>>() { - @Override - public PCollection<KV<String, Integer>> apply(PBegin input) { - return PCollection.<KV<String, Integer>>createPrimitiveOutputInternal( - input.getPipeline(), - WindowingStrategy.globalDefault(), - PCollection.IsBounded.UNBOUNDED) - .setTypeDescriptorInternal(new TypeDescriptor<KV<String, Integer>>() {}); - } - }); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage( - "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without " - + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey."); - - input.apply("GroupByKey", GroupByKey.<String, Integer>create()); - } - /** * Tests that when two elements are combined via a GroupByKey their output timestamp agrees * with the windowing function customized to actually be the same as the default, the earlier of
