This relied on a wrong functionality as described in BEAM-1444 and should be 
revisited there.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3d25b9cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3d25b9cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3d25b9cc

Branch: refs/heads/master
Commit: 3d25b9cc794ce39a0ac3051eacf28127da138449
Parents: add8716
Author: Sela <[email protected]>
Authored: Sun Feb 12 19:47:00 2017 +0200
Committer: Sela <[email protected]>
Committed: Mon Feb 20 11:30:33 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/TestSparkRunner.java     | 11 +++++-----
 .../translation/streaming/UnboundedDataset.java |  1 -
 .../beam/runners/spark/WatermarkTest.java       | 19 +++++++++++++++--
 .../streaming/FlattenStreamingTest.java         | 22 --------------------
 .../streaming/TrackStreamingSourcesTest.java    | 18 +++++++++++++++-
 .../apache/beam/sdk/testing/RegexMatcher.java   | 17 +++++++++++++++
 6 files changed, 57 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3d25b9cc/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index a634dd4..8b8f9ba 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -23,8 +23,8 @@ import static org.hamcrest.Matchers.is;
 
 import org.apache.beam.runners.core.UnboundedReadFromBoundedSource;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
-import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
@@ -108,14 +108,15 @@ public final class TestSparkRunner extends 
PipelineRunner<SparkPipelineResult> {
 
   @Override
   public SparkPipelineResult run(Pipeline pipeline) {
+    // clear state of Aggregators, Metrics and Watermarks if exists.
+    AggregatorsAccumulator.clear();
+    SparkMetricsContainer.clear();
+    GlobalWatermarkHolder.clear();
+
     TestPipelineOptions testPipelineOptions = 
pipeline.getOptions().as(TestPipelineOptions.class);
     SparkPipelineResult result = delegate.run(pipeline);
     result.waitUntilFinish();
 
-    // clear state of Aggregators, Metrics and Watermarks.
-    AggregatorsAccumulator.clear();
-    SparkMetricsContainer.clear();
-    GlobalWatermarkHolder.clear();
 
     // make sure the test pipeline finished successfully.
     State resultState = result.getState();

http://git-wip-us.apache.org/repos/asf/beam/blob/3d25b9cc/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
index 08d1ab6..8b65dca 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.spark.translation.streaming;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;

http://git-wip-us.apache.org/repos/asf/beam/blob/3d25b9cc/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java
index 0b56403..e7a5481 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/WatermarkTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.spark;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -172,8 +189,6 @@ public class WatermarkTest {
 
     
p.run().waitUntilFinish(Duration.millis(options.getBatchIntervalMillis()).multipliedBy(3));
 
-    System.out.println(WatermarksDoFn.strings);
-
     // this is a hacky way to assert but it will do until triggers are 
supported.
     assertThat(
         WatermarksDoFn.strings,

http://git-wip-us.apache.org/repos/asf/beam/blob/3d25b9cc/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index 530721b..fc40bbd 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.spark.translation.streaming;
 
-import com.google.common.collect.ImmutableList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -27,7 +26,6 @@ import 
org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreamin
 import 
org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -81,24 +79,4 @@ public class FlattenStreamingTest {
     PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION, 
Duration.standardSeconds(1L));
   }
 
-  @Test
-  public void testFlattenBoundedUnbounded() throws Exception {
-    SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
-    options.setStreaming(true);
-
-    Pipeline p = Pipeline.create(options);
-    PCollection<String> w1 =
-        
p.apply(CreateStream.fromQueue(WORDS_QUEUE_1)).setCoder(StringUtf8Coder.of());
-    PCollection<String> windowedW1 =
-        
w1.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
-    PCollection<String> w2 =
-        
p.apply(Create.of(ImmutableList.copyOf(WORDS_ARRAY_2)).withCoder(StringUtf8Coder.of()));
-    PCollection<String> windowedW2 =
-        
w2.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))));
-    PCollectionList<String> list = 
PCollectionList.of(windowedW1).and(windowedW2);
-    PCollection<String> union = list.apply(Flatten.<String>pCollections());
-
-    PAssertStreaming.runAndAssertContents(p, union, EXPECTED_UNION, 
Duration.standardSeconds(1L));
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3d25b9cc/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
index f102ac8..fbe5777 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.spark.translation.streaming;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -26,7 +43,6 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.spark.SparkStatusTracker;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;

http://git-wip-us.apache.org/repos/asf/beam/blob/3d25b9cc/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RegexMatcher.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RegexMatcher.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RegexMatcher.java
index a4b14fe..aab4a09 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RegexMatcher.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/RegexMatcher.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.sdk.testing;
 
 import java.util.regex.Pattern;

Reply via email to