Repository: incubator-beam Updated Branches: refs/heads/master 659f0b877 -> ac63fd6d4
Move Java 8 tests to their own module This allows easy setting of the compiler source and target version to 1.8 without any fragile plugin configuration. It also allows the import into Eclipse to easily set separate compliance levels, as this is done per-Eclipse-project. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e8b71008 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e8b71008 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e8b71008 Branch: refs/heads/master Commit: e8b71008c18439d4e30edf7e29bdaa58fa6e65f3 Parents: f06e2a9 Author: Kenneth Knowles <k...@google.com> Authored: Mon Feb 29 22:40:01 2016 -0800 Committer: bchambers <bchamb...@google.com> Committed: Thu Mar 17 11:12:36 2016 -0700 ---------------------------------------------------------------------- java8tests/pom.xml | 183 +++++++++++++++++++ .../sdk/transforms/CombineJava8Test.java | 133 ++++++++++++++ .../sdk/transforms/FilterJava8Test.java | 118 ++++++++++++ .../transforms/FlatMapElementsJava8Test.java | 84 +++++++++ .../sdk/transforms/MapElementsJava8Test.java | 77 ++++++++ .../sdk/transforms/PartitionJava8Test.java | 74 ++++++++ .../transforms/RemoveDuplicatesJava8Test.java | 98 ++++++++++ .../sdk/transforms/WithKeysJava8Test.java | 73 ++++++++ .../sdk/transforms/WithTimestampsJava8Test.java | 65 +++++++ pom.xml | 9 + sdk/pom.xml | 71 ------- .../sdk/transforms/CombineJava8Test.java | 133 -------------- .../sdk/transforms/FilterJava8Test.java | 118 ------------ .../transforms/FlatMapElementsJava8Test.java | 84 --------- .../sdk/transforms/MapElementsJava8Test.java | 77 -------- .../sdk/transforms/PartitionJava8Test.java | 74 -------- .../transforms/RemoveDuplicatesJava8Test.java | 99 ---------- .../sdk/transforms/WithKeysJava8Test.java | 74 -------- .../sdk/transforms/WithTimestampsJava8Test.java | 66 ------- 19 files changed, 914 insertions(+), 796 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/pom.xml ---------------------------------------------------------------------- diff --git a/java8tests/pom.xml b/java8tests/pom.xml new file mode 100644 index 0000000..de44ed4 --- /dev/null +++ b/java8tests/pom.xml @@ -0,0 +1,183 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!--~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + ~ Copyright (C) 2015 Google Inc. + ~ + ~ Licensed under the Apache License, Version 2.0 (the "License"); you may not + ~ use this file except in compliance with the License. You may obtain a copy of + ~ the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + ~ License for the specific language governing permissions and limitations under + ~ the License. + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>com.google.cloud.dataflow</groupId> + <artifactId>google-cloud-dataflow-java-sdk-parent</artifactId> + <version>1.6.0-SNAPSHOT</version> + </parent> + + <groupId>com.google.cloud.dataflow</groupId> + <artifactId>google-cloud-dataflow-java-java8tests-all</artifactId> + <name>Google Cloud Dataflow Java 8 Tests - All</name> + <description>Google Cloud Dataflow Java SDK provides a simple, Java-based + interface for processing virtually any size data using Google cloud + resources. This artifact includes tests of the SDK from a Java 8 + user.</description> + <url>http://cloud.google.com/dataflow</url> + + <packaging>jar</packaging> + + <profiles> + <profile> + <id>DataflowPipelineTests</id> + <properties> + <runIntegrationTestOnService>true</runIntegrationTestOnService> + <testGroups>com.google.cloud.dataflow.sdk.testing.RunnableOnService</testGroups> + <testParallelValue>both</testParallelValue> + </properties> + </profile> + </profiles> + + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <testSource>1.8</testSource> + <testTarget>1.8</testTarget> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <goals><goal>analyze-only</goal></goals> + <configuration> + <failOnWarning>true</failOnWarning> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.12</version> + <dependencies> + <dependency> + <groupId>com.puppycrawl.tools</groupId> + <artifactId>checkstyle</artifactId> + <version>6.6</version> + </dependency> + </dependencies> + <configuration> + <configLocation>../checkstyle.xml</configLocation> + <consoleOutput>true</consoleOutput> + <failOnViolation>true</failOnViolation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <includeResources>false</includeResources> + </configuration> + <executions> + <execution> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + + <!-- Source plugin for generating source and test-source JARs. --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <version>2.4</version> + <executions> + <execution> + <id>attach-sources</id> + <phase>compile</phase> + <goals> + <goal>jar</goal> + </goals> + </execution> + <execution> + <id>attach-test-sources</id> + <phase>test-compile</phase> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <id>default-jar</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + <execution> + <id>default-test-jar</id> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + + <!-- Coverage analysis for unit tests. --> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>com.google.cloud.dataflow</groupId> + <artifactId>google-cloud-dataflow-java-sdk-all</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <version>${joda.version}</version> + </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>${hamcrest.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java ---------------------------------------------------------------------- diff --git a/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java new file mode 100644 index 0000000..b569e49 --- /dev/null +++ b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java @@ -0,0 +1,133 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.transforms; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; + +/** + * Java 8 Tests for {@link Combine}. + */ +@RunWith(JUnit4.class) +@SuppressWarnings("serial") +public class CombineJava8Test implements Serializable { + + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + + /** + * Class for use in testing use of Java 8 method references. + */ + private static class Summer implements Serializable { + public int sum(Iterable<Integer> integers) { + int sum = 0; + for (int i : integers) { + sum += i; + } + return sum; + } + } + + /** + * Tests creation of a global {@link Combine} via Java 8 lambda. + */ + @Test + public void testCombineGloballyLambda() { + Pipeline pipeline = TestPipeline.create(); + + PCollection<Integer> output = pipeline + .apply(Create.of(1, 2, 3, 4)) + .apply(Combine.globally(integers -> { + int sum = 0; + for (int i : integers) { + sum += i; + } + return sum; + })); + + DataflowAssert.that(output).containsInAnyOrder(10); + pipeline.run(); + } + + /** + * Tests creation of a global {@link Combine} via a Java 8 method reference. + */ + @Test + public void testCombineGloballyInstanceMethodReference() { + Pipeline pipeline = TestPipeline.create(); + + PCollection<Integer> output = pipeline + .apply(Create.of(1, 2, 3, 4)) + .apply(Combine.globally(new Summer()::sum)); + + DataflowAssert.that(output).containsInAnyOrder(10); + pipeline.run(); + } + + /** + * Tests creation of a per-key {@link Combine} via a Java 8 lambda. + */ + @Test + public void testCombinePerKeyLambda() { + Pipeline pipeline = TestPipeline.create(); + + PCollection<KV<String, Integer>> output = pipeline + .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4))) + .apply(Combine.perKey(integers -> { + int sum = 0; + for (int i : integers) { + sum += i; + } + return sum; + })); + + DataflowAssert.that(output).containsInAnyOrder( + KV.of("a", 4), + KV.of("b", 2), + KV.of("c", 4)); + pipeline.run(); + } + + /** + * Tests creation of a per-key {@link Combine} via a Java 8 method reference. + */ + @Test + public void testCombinePerKeyInstanceMethodReference() { + Pipeline pipeline = TestPipeline.create(); + + PCollection<KV<String, Integer>> output = pipeline + .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4))) + .apply(Combine.perKey(new Summer()::sum)); + + DataflowAssert.that(output).containsInAnyOrder( + KV.of("a", 4), + KV.of("b", 2), + KV.of("c", 4)); + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java ---------------------------------------------------------------------- diff --git a/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java new file mode 100644 index 0000000..db65932 --- /dev/null +++ b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.transforms; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.RunnableOnService; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; + +/** + * Java 8 Tests for {@link Filter}. + */ +@RunWith(JUnit4.class) +@SuppressWarnings("serial") +public class FilterJava8Test implements Serializable { + + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + + @Test + @Category(RunnableOnService.class) + public void testIdentityFilterByPredicate() { + Pipeline pipeline = TestPipeline.create(); + + PCollection<Integer> output = pipeline + .apply(Create.of(591, 11789, 1257, 24578, 24799, 307)) + .apply(Filter.byPredicate(i -> true)); + + DataflowAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307); + pipeline.run(); + } + + @Test + public void testNoFilterByPredicate() { + Pipeline pipeline = TestPipeline.create(); + + PCollection<Integer> output = pipeline + .apply(Create.of(1, 2, 4, 5)) + .apply(Filter.byPredicate(i -> false)); + + DataflowAssert.that(output).empty(); + pipeline.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testFilterByPredicate() { + Pipeline pipeline = TestPipeline.create(); + + PCollection<Integer> output = pipeline + .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) + .apply(Filter.byPredicate(i -> i % 2 == 0)); + + DataflowAssert.that(output).containsInAnyOrder(2, 4, 6); + pipeline.run(); + } + + /** + * Confirms that in Java 8 style, where a lambda results in a rawtype, the output type token is + * not useful. If this test ever fails there may be simplifications available to us. + */ + @Test + public void testFilterParDoOutputTypeDescriptorRaw() throws Exception { + Pipeline pipeline = TestPipeline.create(); + + @SuppressWarnings({"unchecked", "rawtypes"}) + PCollection<String> output = pipeline + .apply(Create.of("hello")) + .apply(Filter.by(s -> true)); + + thrown.expect(CannotProvideCoderException.class); + pipeline.getCoderRegistry().getDefaultCoder(output.getTypeDescriptor()); + } + + @Test + @Category(RunnableOnService.class) + public void testFilterByMethodReference() { + Pipeline pipeline = TestPipeline.create(); + + PCollection<Integer> output = pipeline + .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) + .apply(Filter.byPredicate(new EvenFilter()::isEven)); + + DataflowAssert.that(output).containsInAnyOrder(2, 4, 6); + pipeline.run(); + } + + private static class EvenFilter implements Serializable { + public boolean isEven(int i) { + return i % 2 == 0; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java ---------------------------------------------------------------------- diff --git a/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java new file mode 100644 index 0000000..e0b946b --- /dev/null +++ b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java @@ -0,0 +1,84 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.transforms; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; +import com.google.common.collect.ImmutableList; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; +import java.util.List; + +/** + * Java 8 Tests for {@link FlatMapElements}. + */ +@RunWith(JUnit4.class) +public class FlatMapElementsJava8Test implements Serializable { + + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + + /** + * Basic test of {@link FlatMapElements} with a lambda (which is instantiated as a + * {@link SerializableFunction}). + */ + @Test + public void testFlatMapBasic() throws Exception { + Pipeline pipeline = TestPipeline.create(); + PCollection<Integer> output = pipeline + .apply(Create.of(1, 2, 3)) + .apply(FlatMapElements + // Note that the input type annotation is required. + .via((Integer i) -> ImmutableList.of(i, -i)) + .withOutputType(new TypeDescriptor<Integer>() {})); + + DataflowAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2); + pipeline.run(); + } + + /** + * Basic test of {@link FlatMapElements} with a method reference. + */ + @Test + public void testFlatMapMethodReference() throws Exception { + Pipeline pipeline = TestPipeline.create(); + PCollection<Integer> output = pipeline + .apply(Create.of(1, 2, 3)) + .apply(FlatMapElements + // Note that the input type annotation is required. + .via(new Negater()::numAndNegation) + .withOutputType(new TypeDescriptor<Integer>() {})); + + DataflowAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2); + pipeline.run(); + } + + private static class Negater implements Serializable { + public List<Integer> numAndNegation(int input) { + return ImmutableList.of(input, -input); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java ---------------------------------------------------------------------- diff --git a/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java new file mode 100644 index 0000000..123e680 --- /dev/null +++ b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.transforms; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; + +/** + * Java 8 tests for {@link MapElements}. + */ +@RunWith(JUnit4.class) +public class MapElementsJava8Test implements Serializable { + + /** + * Basic test of {@link MapElements} with a lambda (which is instantiated as a + * {@link SerializableFunction}). + */ + @Test + public void testMapBasic() throws Exception { + Pipeline pipeline = TestPipeline.create(); + PCollection<Integer> output = pipeline + .apply(Create.of(1, 2, 3)) + .apply(MapElements + // Note that the type annotation is required (for Java, not for Dataflow) + .via((Integer i) -> i * 2) + .withOutputType(new TypeDescriptor<Integer>() {})); + + DataflowAssert.that(output).containsInAnyOrder(6, 2, 4); + pipeline.run(); + } + + /** + * Basic test of {@link MapElements} with a method reference. + */ + @Test + public void testMapMethodReference() throws Exception { + Pipeline pipeline = TestPipeline.create(); + PCollection<Integer> output = pipeline + .apply(Create.of(1, 2, 3)) + .apply(MapElements + // Note that the type annotation is required (for Java, not for Dataflow) + .via(new Doubler()::doubleIt) + .withOutputType(new TypeDescriptor<Integer>() {})); + + DataflowAssert.that(output).containsInAnyOrder(6, 2, 4); + pipeline.run(); + } + + private static class Doubler implements Serializable { + public int doubleIt(int val) { + return val * 2; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java ---------------------------------------------------------------------- diff --git a/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java new file mode 100644 index 0000000..c459ada --- /dev/null +++ b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.google.cloud.dataflow.sdk.transforms; + +import static org.junit.Assert.assertEquals; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.values.PCollectionList; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; + +/** + * Java 8 Tests for {@link Filter}. + */ +@RunWith(JUnit4.class) +@SuppressWarnings("serial") +public class PartitionJava8Test implements Serializable { + + @Rule + public transient ExpectedException thrown = ExpectedException.none(); + + @Test + public void testModPartition() { + Pipeline pipeline = TestPipeline.create(); + + PCollectionList<Integer> outputs = pipeline + .apply(Create.of(1, 2, 4, 5)) + .apply(Partition.of(3, (element, numPartitions) -> element % numPartitions)); + assertEquals(3, outputs.size()); + DataflowAssert.that(outputs.get(0)).empty(); + DataflowAssert.that(outputs.get(1)).containsInAnyOrder(1, 4); + DataflowAssert.that(outputs.get(2)).containsInAnyOrder(2, 5); + pipeline.run(); + } + + /** + * Confirms that in Java 8 style, where a lambda results in a rawtype, the output type token is + * not useful. If this test ever fails there may be simplifications available to us. + */ + @Test + public void testPartitionFnOutputTypeDescriptorRaw() throws Exception { + Pipeline pipeline = TestPipeline.create(); + + PCollectionList<String> output = pipeline + .apply(Create.of("hello")) + .apply(Partition.of(1, (element, numPartitions) -> 0)); + + thrown.expect(CannotProvideCoderException.class); + pipeline.getCoderRegistry().getDefaultCoder(output.get(0).getTypeDescriptor()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java ---------------------------------------------------------------------- diff --git a/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java new file mode 100644 index 0000000..dfa1ca6 --- /dev/null +++ b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.transforms; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.HashSet; +import java.util.Set; + +/** + * Java 8 tests for {@link RemoveDuplicates}. + */ +@RunWith(JUnit4.class) +public class RemoveDuplicatesJava8Test { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void withLambdaRepresentativeValuesFnAndTypeDescriptorShouldApplyFn() { + TestPipeline p = TestPipeline.create(); + + Multimap<Integer, String> predupedContents = HashMultimap.create(); + predupedContents.put(3, "foo"); + predupedContents.put(4, "foos"); + predupedContents.put(6, "barbaz"); + predupedContents.put(6, "bazbar"); + PCollection<String> dupes = + p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo")); + PCollection<String> deduped = + dupes.apply(RemoveDuplicates.withRepresentativeValueFn((String s) -> s.length()) + .withRepresentativeType(TypeDescriptor.of(Integer.class))); + + DataflowAssert.that(deduped).satisfies((Iterable<String> strs) -> { + Set<Integer> seenLengths = new HashSet<>(); + for (String s : strs) { + assertThat(predupedContents.values(), hasItem(s)); + assertThat(seenLengths, not(contains(s.length()))); + seenLengths.add(s.length()); + } + return null; + }); + + p.run(); + } + + @Test + public void withLambdaRepresentativeValuesFnNoTypeDescriptorShouldThrow() { + TestPipeline p = TestPipeline.create(); + + Multimap<Integer, String> predupedContents = HashMultimap.create(); + predupedContents.put(3, "foo"); + predupedContents.put(4, "foos"); + predupedContents.put(6, "barbaz"); + predupedContents.put(6, "bazbar"); + PCollection<String> dupes = + p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo")); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("Unable to return a default Coder for RemoveRepresentativeDupes"); + thrown.expectMessage("Cannot provide a coder for type variable K"); + thrown.expectMessage("the actual type is unknown due to erasure."); + + // Thrown when applying a transform to the internal WithKeys that withRepresentativeValueFn is + // implemented with + dupes.apply("RemoveRepresentativeDupes", + RemoveDuplicates.withRepresentativeValueFn((String s) -> s.length())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java ---------------------------------------------------------------------- diff --git a/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java new file mode 100644 index 0000000..3771f78 --- /dev/null +++ b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.transforms; + +import com.google.cloud.dataflow.sdk.Pipeline.PipelineExecutionException; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.RunnableOnService; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TypeDescriptor; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Java 8 Tests for {@link WithKeys}. + */ +@RunWith(JUnit4.class) +public class WithKeysJava8Test { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + @Category(RunnableOnService.class) + public void withLambdaAndTypeDescriptorShouldSucceed() { + TestPipeline p = TestPipeline.create(); + + PCollection<String> values = p.apply(Create.of("1234", "3210", "0", "-12")); + PCollection<KV<Integer, String>> kvs = values.apply( + WithKeys.of((String s) -> Integer.valueOf(s)) + .withKeyType(TypeDescriptor.of(Integer.class))); + + DataflowAssert.that(kvs).containsInAnyOrder( + KV.of(1234, "1234"), KV.of(0, "0"), KV.of(-12, "-12"), KV.of(3210, "3210")); + + p.run(); + } + + @Test + public void withLambdaAndNoTypeDescriptorShouldThrow() { + TestPipeline p = TestPipeline.create(); + + PCollection<String> values = p.apply(Create.of("1234", "3210", "0", "-12")); + + values.apply("ApplyKeysWithWithKeys", WithKeys.of((String s) -> Integer.valueOf(s))); + + thrown.expect(PipelineExecutionException.class); + thrown.expectMessage("Unable to return a default Coder for ApplyKeysWithWithKeys"); + thrown.expectMessage("Cannot provide a coder for type variable K"); + thrown.expectMessage("the actual type is unknown due to erasure."); + + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java ---------------------------------------------------------------------- diff --git a/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java new file mode 100644 index 0000000..b2b6dbc --- /dev/null +++ b/java8tests/src/test/java/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.dataflow.sdk.transforms; + +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.RunnableOnService; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; + +/** + * Java 8 tests for {@link WithTimestamps}. + */ +@RunWith(JUnit4.class) +public class WithTimestampsJava8Test implements Serializable { + @Test + @Category(RunnableOnService.class) + public void withTimestampsLambdaShouldApplyTimestamps() { + TestPipeline p = TestPipeline.create(); + + String yearTwoThousand = "946684800000"; + PCollection<String> timestamped = + p.apply(Create.of("1234", "0", Integer.toString(Integer.MAX_VALUE), yearTwoThousand)) + .apply(WithTimestamps.of((String input) -> new Instant(Long.valueOf(yearTwoThousand)))); + + PCollection<KV<String, Instant>> timestampedVals = + timestamped.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() { + @Override + public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c) + throws Exception { + c.output(KV.of(c.element(), c.timestamp())); + } + })); + + DataflowAssert.that(timestamped) + .containsInAnyOrder(yearTwoThousand, "0", "1234", Integer.toString(Integer.MAX_VALUE)); + DataflowAssert.that(timestampedVals) + .containsInAnyOrder( + KV.of("0", new Instant(0)), + KV.of("1234", new Instant("1234")), + KV.of(Integer.toString(Integer.MAX_VALUE), new Instant(Integer.MAX_VALUE)), + KV.of(yearTwoThousand, new Instant(Long.valueOf(yearTwoThousand)))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6fb0b32..8a44ea9 100644 --- a/pom.xml +++ b/pom.xml @@ -100,6 +100,15 @@ <profiles> <profile> + <id>java8-tests</id> + <activation> + <jdk>[1.8,)</jdk> + </activation> + <modules> + <module>java8tests</module> + </modules> + </profile> + <profile> <id>doclint-java8-disable</id> <activation> <jdk>[1.8,)</jdk> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/pom.xml ---------------------------------------------------------------------- diff --git a/sdk/pom.xml b/sdk/pom.xml index d7e10a5..d652d80 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -54,77 +54,6 @@ <testParallelValue>both</testParallelValue> </properties> </profile> - - <profile> - <id>java8tests</id> - <activation> - <jdk>[1.8,)</jdk> - </activation> - - <build> - <plugins> - <!-- Tells Maven about the Java 8 test source root. --> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <executions> - <execution> - <id>add-java8-test-source</id> - <phase>initialize</phase> - <goals> - <goal>add-test-source</goal> - </goals> - <configuration> - <sources> - <source>${project.basedir}/src/test/java8</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - - <!-- Set `-source 1.8 -target 1.8` for Java 8 tests - and `-source 1.7 -target 1.7` for Java 7 tests --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <executions> - <execution> - <id>default-testCompile</id> - <phase>test-compile</phase> - <goals> - <goal>testCompile</goal> - </goals> - <configuration> - <testSource>1.7</testSource> - <testTarget>1.7</testTarget> - <testExcludes> - <!-- This pattern is brittle; we would prefer to filter on the directory - but that seems to be unavailable to us. --> - <exclude>**/*Java8Test.java</exclude> - </testExcludes> - </configuration> - </execution> - - <execution> - <id>java8-testCompile</id> - <phase>test-compile</phase> - <goals> - <goal>testCompile</goal> - </goals> - <configuration> - <testSource>1.8</testSource> - <testTarget>1.8</testTarget> - <testIncludes> - <include>**/*Java8Test.java</include> - </testIncludes> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - </profile> </profiles> <build> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java b/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java deleted file mode 100644 index b569e49..0000000 --- a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/CombineJava8Test.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.transforms; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; - -/** - * Java 8 Tests for {@link Combine}. - */ -@RunWith(JUnit4.class) -@SuppressWarnings("serial") -public class CombineJava8Test implements Serializable { - - @Rule - public transient ExpectedException thrown = ExpectedException.none(); - - /** - * Class for use in testing use of Java 8 method references. - */ - private static class Summer implements Serializable { - public int sum(Iterable<Integer> integers) { - int sum = 0; - for (int i : integers) { - sum += i; - } - return sum; - } - } - - /** - * Tests creation of a global {@link Combine} via Java 8 lambda. - */ - @Test - public void testCombineGloballyLambda() { - Pipeline pipeline = TestPipeline.create(); - - PCollection<Integer> output = pipeline - .apply(Create.of(1, 2, 3, 4)) - .apply(Combine.globally(integers -> { - int sum = 0; - for (int i : integers) { - sum += i; - } - return sum; - })); - - DataflowAssert.that(output).containsInAnyOrder(10); - pipeline.run(); - } - - /** - * Tests creation of a global {@link Combine} via a Java 8 method reference. - */ - @Test - public void testCombineGloballyInstanceMethodReference() { - Pipeline pipeline = TestPipeline.create(); - - PCollection<Integer> output = pipeline - .apply(Create.of(1, 2, 3, 4)) - .apply(Combine.globally(new Summer()::sum)); - - DataflowAssert.that(output).containsInAnyOrder(10); - pipeline.run(); - } - - /** - * Tests creation of a per-key {@link Combine} via a Java 8 lambda. - */ - @Test - public void testCombinePerKeyLambda() { - Pipeline pipeline = TestPipeline.create(); - - PCollection<KV<String, Integer>> output = pipeline - .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4))) - .apply(Combine.perKey(integers -> { - int sum = 0; - for (int i : integers) { - sum += i; - } - return sum; - })); - - DataflowAssert.that(output).containsInAnyOrder( - KV.of("a", 4), - KV.of("b", 2), - KV.of("c", 4)); - pipeline.run(); - } - - /** - * Tests creation of a per-key {@link Combine} via a Java 8 method reference. - */ - @Test - public void testCombinePerKeyInstanceMethodReference() { - Pipeline pipeline = TestPipeline.create(); - - PCollection<KV<String, Integer>> output = pipeline - .apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3), KV.of("c", 4))) - .apply(Combine.perKey(new Summer()::sum)); - - DataflowAssert.that(output).containsInAnyOrder( - KV.of("a", 4), - KV.of("b", 2), - KV.of("c", 4)); - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java b/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java deleted file mode 100644 index db65932..0000000 --- a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FilterJava8Test.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.transforms; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.testing.RunnableOnService; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; - -/** - * Java 8 Tests for {@link Filter}. - */ -@RunWith(JUnit4.class) -@SuppressWarnings("serial") -public class FilterJava8Test implements Serializable { - - @Rule - public transient ExpectedException thrown = ExpectedException.none(); - - @Test - @Category(RunnableOnService.class) - public void testIdentityFilterByPredicate() { - Pipeline pipeline = TestPipeline.create(); - - PCollection<Integer> output = pipeline - .apply(Create.of(591, 11789, 1257, 24578, 24799, 307)) - .apply(Filter.byPredicate(i -> true)); - - DataflowAssert.that(output).containsInAnyOrder(591, 11789, 1257, 24578, 24799, 307); - pipeline.run(); - } - - @Test - public void testNoFilterByPredicate() { - Pipeline pipeline = TestPipeline.create(); - - PCollection<Integer> output = pipeline - .apply(Create.of(1, 2, 4, 5)) - .apply(Filter.byPredicate(i -> false)); - - DataflowAssert.that(output).empty(); - pipeline.run(); - } - - @Test - @Category(RunnableOnService.class) - public void testFilterByPredicate() { - Pipeline pipeline = TestPipeline.create(); - - PCollection<Integer> output = pipeline - .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) - .apply(Filter.byPredicate(i -> i % 2 == 0)); - - DataflowAssert.that(output).containsInAnyOrder(2, 4, 6); - pipeline.run(); - } - - /** - * Confirms that in Java 8 style, where a lambda results in a rawtype, the output type token is - * not useful. If this test ever fails there may be simplifications available to us. - */ - @Test - public void testFilterParDoOutputTypeDescriptorRaw() throws Exception { - Pipeline pipeline = TestPipeline.create(); - - @SuppressWarnings({"unchecked", "rawtypes"}) - PCollection<String> output = pipeline - .apply(Create.of("hello")) - .apply(Filter.by(s -> true)); - - thrown.expect(CannotProvideCoderException.class); - pipeline.getCoderRegistry().getDefaultCoder(output.getTypeDescriptor()); - } - - @Test - @Category(RunnableOnService.class) - public void testFilterByMethodReference() { - Pipeline pipeline = TestPipeline.create(); - - PCollection<Integer> output = pipeline - .apply(Create.of(1, 2, 3, 4, 5, 6, 7)) - .apply(Filter.byPredicate(new EvenFilter()::isEven)); - - DataflowAssert.that(output).containsInAnyOrder(2, 4, 6); - pipeline.run(); - } - - private static class EvenFilter implements Serializable { - public boolean isEven(int i) { - return i % 2 == 0; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java b/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java deleted file mode 100644 index e0b946b..0000000 --- a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/FlatMapElementsJava8Test.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.transforms; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; -import com.google.common.collect.ImmutableList; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; -import java.util.List; - -/** - * Java 8 Tests for {@link FlatMapElements}. - */ -@RunWith(JUnit4.class) -public class FlatMapElementsJava8Test implements Serializable { - - @Rule - public transient ExpectedException thrown = ExpectedException.none(); - - /** - * Basic test of {@link FlatMapElements} with a lambda (which is instantiated as a - * {@link SerializableFunction}). - */ - @Test - public void testFlatMapBasic() throws Exception { - Pipeline pipeline = TestPipeline.create(); - PCollection<Integer> output = pipeline - .apply(Create.of(1, 2, 3)) - .apply(FlatMapElements - // Note that the input type annotation is required. - .via((Integer i) -> ImmutableList.of(i, -i)) - .withOutputType(new TypeDescriptor<Integer>() {})); - - DataflowAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2); - pipeline.run(); - } - - /** - * Basic test of {@link FlatMapElements} with a method reference. - */ - @Test - public void testFlatMapMethodReference() throws Exception { - Pipeline pipeline = TestPipeline.create(); - PCollection<Integer> output = pipeline - .apply(Create.of(1, 2, 3)) - .apply(FlatMapElements - // Note that the input type annotation is required. - .via(new Negater()::numAndNegation) - .withOutputType(new TypeDescriptor<Integer>() {})); - - DataflowAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2); - pipeline.run(); - } - - private static class Negater implements Serializable { - public List<Integer> numAndNegation(int input) { - return ImmutableList.of(input, -input); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java b/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java deleted file mode 100644 index 123e680..0000000 --- a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/MapElementsJava8Test.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.transforms; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; - -/** - * Java 8 tests for {@link MapElements}. - */ -@RunWith(JUnit4.class) -public class MapElementsJava8Test implements Serializable { - - /** - * Basic test of {@link MapElements} with a lambda (which is instantiated as a - * {@link SerializableFunction}). - */ - @Test - public void testMapBasic() throws Exception { - Pipeline pipeline = TestPipeline.create(); - PCollection<Integer> output = pipeline - .apply(Create.of(1, 2, 3)) - .apply(MapElements - // Note that the type annotation is required (for Java, not for Dataflow) - .via((Integer i) -> i * 2) - .withOutputType(new TypeDescriptor<Integer>() {})); - - DataflowAssert.that(output).containsInAnyOrder(6, 2, 4); - pipeline.run(); - } - - /** - * Basic test of {@link MapElements} with a method reference. - */ - @Test - public void testMapMethodReference() throws Exception { - Pipeline pipeline = TestPipeline.create(); - PCollection<Integer> output = pipeline - .apply(Create.of(1, 2, 3)) - .apply(MapElements - // Note that the type annotation is required (for Java, not for Dataflow) - .via(new Doubler()::doubleIt) - .withOutputType(new TypeDescriptor<Integer>() {})); - - DataflowAssert.that(output).containsInAnyOrder(6, 2, 4); - pipeline.run(); - } - - private static class Doubler implements Serializable { - public int doubleIt(int val) { - return val * 2; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java b/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java deleted file mode 100644 index c459ada..0000000 --- a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/PartitionJava8Test.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package com.google.cloud.dataflow.sdk.transforms; - -import static org.junit.Assert.assertEquals; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.values.PCollectionList; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; - -/** - * Java 8 Tests for {@link Filter}. - */ -@RunWith(JUnit4.class) -@SuppressWarnings("serial") -public class PartitionJava8Test implements Serializable { - - @Rule - public transient ExpectedException thrown = ExpectedException.none(); - - @Test - public void testModPartition() { - Pipeline pipeline = TestPipeline.create(); - - PCollectionList<Integer> outputs = pipeline - .apply(Create.of(1, 2, 4, 5)) - .apply(Partition.of(3, (element, numPartitions) -> element % numPartitions)); - assertEquals(3, outputs.size()); - DataflowAssert.that(outputs.get(0)).empty(); - DataflowAssert.that(outputs.get(1)).containsInAnyOrder(1, 4); - DataflowAssert.that(outputs.get(2)).containsInAnyOrder(2, 5); - pipeline.run(); - } - - /** - * Confirms that in Java 8 style, where a lambda results in a rawtype, the output type token is - * not useful. If this test ever fails there may be simplifications available to us. - */ - @Test - public void testPartitionFnOutputTypeDescriptorRaw() throws Exception { - Pipeline pipeline = TestPipeline.create(); - - PCollectionList<String> output = pipeline - .apply(Create.of("hello")) - .apply(Partition.of(1, (element, numPartitions) -> 0)); - - thrown.expect(CannotProvideCoderException.class); - pipeline.getCoderRegistry().getDefaultCoder(output.get(0).getTypeDescriptor()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java b/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java deleted file mode 100644 index d9e2180..0000000 --- a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/RemoveDuplicatesJava8Test.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.transforms; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertThat; - -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.HashSet; -import java.util.Set; - -/** - * Java 8 tests for {@link RemoveDuplicates}. - */ -@RunWith(JUnit4.class) -public class RemoveDuplicatesJava8Test { - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Test - public void withLambdaRepresentativeValuesFnAndTypeDescriptorShouldApplyFn() { - TestPipeline p = TestPipeline.create(); - - Multimap<Integer, String> predupedContents = HashMultimap.create(); - predupedContents.put(3, "foo"); - predupedContents.put(4, "foos"); - predupedContents.put(6, "barbaz"); - predupedContents.put(6, "bazbar"); - PCollection<String> dupes = - p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo")); - PCollection<String> deduped = - dupes.apply(RemoveDuplicates.withRepresentativeValueFn((String s) -> s.length()) - .withRepresentativeType(TypeDescriptor.of(Integer.class))); - - DataflowAssert.that(deduped).satisfies((Iterable<String> strs) -> { - Set<Integer> seenLengths = new HashSet<>(); - for (String s : strs) { - assertThat(predupedContents.values(), hasItem(s)); - assertThat(seenLengths, not(contains(s.length()))); - seenLengths.add(s.length()); - } - return null; - }); - - p.run(); - } - - @Test - public void withLambdaRepresentativeValuesFnNoTypeDescriptorShouldThrow() { - TestPipeline p = TestPipeline.create(); - - Multimap<Integer, String> predupedContents = HashMultimap.create(); - predupedContents.put(3, "foo"); - predupedContents.put(4, "foos"); - predupedContents.put(6, "barbaz"); - predupedContents.put(6, "bazbar"); - PCollection<String> dupes = - p.apply(Create.of("foo", "foos", "barbaz", "barbaz", "bazbar", "foo")); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("Unable to return a default Coder for RemoveRepresentativeDupes"); - thrown.expectMessage("Cannot provide a coder for type variable K"); - thrown.expectMessage("the actual type is unknown due to erasure."); - - // Thrown when applying a transform to the internal WithKeys that withRepresentativeValueFn is - // implemented with - dupes.apply("RemoveRepresentativeDupes", - RemoveDuplicates.withRepresentativeValueFn((String s) -> s.length())); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java b/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java deleted file mode 100644 index c10af29..0000000 --- a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithKeysJava8Test.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.transforms; - -import com.google.cloud.dataflow.sdk.Pipeline.PipelineExecutionException; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.testing.RunnableOnService; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TypeDescriptor; - -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Java 8 Tests for {@link WithKeys}. - */ -@RunWith(JUnit4.class) -public class WithKeysJava8Test { - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Test - @Category(RunnableOnService.class) - public void withLambdaAndTypeDescriptorShouldSucceed() { - TestPipeline p = TestPipeline.create(); - - PCollection<String> values = p.apply(Create.of("1234", "3210", "0", "-12")); - PCollection<KV<Integer, String>> kvs = values.apply( - WithKeys.of((String s) -> Integer.valueOf(s)) - .withKeyType(TypeDescriptor.of(Integer.class))); - - DataflowAssert.that(kvs).containsInAnyOrder( - KV.of(1234, "1234"), KV.of(0, "0"), KV.of(-12, "-12"), KV.of(3210, "3210")); - - p.run(); - } - - @Test - public void withLambdaAndNoTypeDescriptorShouldThrow() { - TestPipeline p = TestPipeline.create(); - - PCollection<String> values = p.apply(Create.of("1234", "3210", "0", "-12")); - - values.apply("ApplyKeysWithWithKeys", WithKeys.of((String s) -> Integer.valueOf(s))); - - thrown.expect(PipelineExecutionException.class); - thrown.expectMessage("Unable to return a default Coder for ApplyKeysWithWithKeys"); - thrown.expectMessage("Cannot provide a coder for type variable K"); - thrown.expectMessage("the actual type is unknown due to erasure."); - - p.run(); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e8b71008/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java ---------------------------------------------------------------------- diff --git a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java b/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java deleted file mode 100644 index 50b5ff7..0000000 --- a/sdk/src/test/java8/com/google/cloud/dataflow/sdk/transforms/WithTimestampsJava8Test.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (C) 2015 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.dataflow.sdk.transforms; - -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.testing.RunnableOnService; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; - -/** - * Java 8 tests for {@link WithTimestamps}. - */ -@RunWith(JUnit4.class) -public class WithTimestampsJava8Test implements Serializable { - @Test - @Category(RunnableOnService.class) - public void withTimestampsLambdaShouldApplyTimestamps() { - TestPipeline p = TestPipeline.create(); - - String yearTwoThousand = "946684800000"; - PCollection<String> timestamped = - p.apply(Create.of("1234", "0", Integer.toString(Integer.MAX_VALUE), yearTwoThousand)) - .apply(WithTimestamps.of((String input) -> new Instant(Long.valueOf(yearTwoThousand)))); - - PCollection<KV<String, Instant>> timestampedVals = - timestamped.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() { - @Override - public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c) - throws Exception { - c.output(KV.of(c.element(), c.timestamp())); - } - })); - - DataflowAssert.that(timestamped) - .containsInAnyOrder(yearTwoThousand, "0", "1234", Integer.toString(Integer.MAX_VALUE)); - DataflowAssert.that(timestampedVals) - .containsInAnyOrder( - KV.of("0", new Instant(0)), - KV.of("1234", new Instant("1234")), - KV.of(Integer.toString(Integer.MAX_VALUE), new Instant(Integer.MAX_VALUE)), - KV.of(yearTwoThousand, new Instant(Long.valueOf(yearTwoThousand)))); - } -} -