This relied on a wrong functionality as described in BEAM-1444 and should be revisited there.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3d25b9cc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3d25b9cc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3d25b9cc Branch: refs/heads/master Commit: 3d25b9cc794ce39a0ac3051eacf28127da138449 Parents: add8716 Author: Sela <[email protected]> Authored: Sun Feb 12 19:47:00 2017 +0200 Committer: Sela <[email protected]> Committed: Mon Feb 20 11:30:33 2017 +0200 ---------------------------------------------------------------------- .../beam/runners/spark/TestSparkRunner.java | 11 +++++----- .../translation/streaming/UnboundedDataset.java | 1 - .../beam/runners/spark/WatermarkTest.java | 19 +++++++++++++++-- .../streaming/FlattenStreamingTest.java | 22 -------------------- .../streaming/TrackStreamingSourcesTest.java | 18 +++++++++++++++- .../apache/beam/sdk/testing/RegexMatcher.java | 17 +++++++++++++++ 6 files changed, 57 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3d25b9cc/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index a634dd4..8b8f9ba 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -23,8 +23,8 @@ import static org.hamcrest.Matchers.is; import org.apache.beam.runners.core.UnboundedReadFromBoundedSource; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; -import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.runners.spark.metrics.SparkMetricsContainer; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource; @@ -108,14 +108,15 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { @Override public SparkPipelineResult run(Pipeline pipeline) { + // clear state of Aggregators, Metrics and Watermarks if exists. + AggregatorsAccumulator.clear(); + SparkMetricsContainer.clear(); + GlobalWatermarkHolder.clear(); + TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class); SparkPipelineResult result = delegate.run(pipeline); result.waitUntilFinish(); - // clear state of Aggregators, Metrics and Watermarks. - AggregatorsAccumulator.clear(); - SparkMetricsContainer.clear(); - GlobalWatermarkHolder.clear(); // make sure the test pipeline finished successfully. State resultState = result.getState(); http://git-wip-us.apache.org/repos/asf/beam/blob/3d25b9cc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java index 08d1ab6..8b65dca 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.spark.translation.streaming; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; - import java.util.ArrayList; import java.util.List; import java.util.Queue; http://git-wip-us.apache.org/repos/asf/beam/blob/3d25b9cc/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java index 0b56403..e7a5481 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.spark; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -172,8 +189,6 @@ public class WatermarkTest { p.run().waitUntilFinish(Duration.millis(options.getBatchIntervalMillis()).multipliedBy(3)); - System.out.println(WatermarksDoFn.strings); - // this is a hacky way to assert but it will do until triggers are supported. assertThat( WatermarksDoFn.strings, http://git-wip-us.apache.org/repos/asf/beam/blob/3d25b9cc/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index 530721b..fc40bbd 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.spark.translation.streaming; -import com.google.common.collect.ImmutableList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -27,7 +26,6 @@ import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreamin import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; @@ -81,24 +79,4 @@ public class FlattenStreamingTest { PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION, Duration.standardSeconds(1L)); } - @Test - public void testFlattenBoundedUnbounded() throws Exception { - SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); - options.setStreaming(true); - - Pipeline p = Pipeline.create(options); - PCollection<String> w1 = - p.apply(CreateStream.fromQueue(WORDS_QUEUE_1)).setCoder(StringUtf8Coder.of()); - PCollection<String> windowedW1 = - w1.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))); - PCollection<String> w2 = - p.apply(Create.of(ImmutableList.copyOf(WORDS_ARRAY_2)).withCoder(StringUtf8Coder.of())); - PCollection<String> windowedW2 = - w2.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))); - PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2); - PCollection<String> union = list.apply(Flatten.<String>pCollections()); - - PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION, Duration.standardSeconds(1L)); - } - } http://git-wip-us.apache.org/repos/asf/beam/blob/3d25b9cc/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java index f102ac8..fbe5777 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.spark.translation.streaming; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -26,7 +43,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PValue; -import org.apache.spark.SparkStatusTracker; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaStreamingContext; http://git-wip-us.apache.org/repos/asf/beam/blob/3d25b9cc/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RegexMatcher.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RegexMatcher.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RegexMatcher.java index a4b14fe..aab4a09 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RegexMatcher.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RegexMatcher.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.sdk.testing; import java.util.regex.Pattern;
