Repository: incubator-beam Updated Branches: refs/heads/master 744b0474e -> 994febef4
Mark primitive display data tests RunnableOnService Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9e0b1b42 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9e0b1b42 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9e0b1b42 Branch: refs/heads/master Commit: 9e0b1b423ef722cdfbec0bde89d85faa760bf322 Parents: 744b047 Author: Scott Wegner <[email protected]> Authored: Thu Jul 7 13:49:30 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Fri Jul 8 12:51:39 2016 -0700 ---------------------------------------------------------------------- runners/google-cloud-dataflow-java/pom.xml | 10 --- .../runners/dataflow/io/DataflowAvroIOTest.java | 69 -------------- .../dataflow/io/DataflowBigQueryIOTest.java | 94 -------------------- .../dataflow/io/DataflowDatastoreIOTest.java | 66 -------------- .../dataflow/io/DataflowPubsubIOTest.java | 63 ------------- .../runners/dataflow/io/DataflowTextIOTest.java | 76 ---------------- .../transforms/DataflowCombineTest.java | 58 ------------ .../DataflowDisplayDataEvaluator.java | 72 --------------- .../transforms/DataflowMapElementsTest.java | 55 ------------ .../org/apache/beam/sdk/transforms/Combine.java | 5 ++ .../java/org/apache/beam/sdk/io/AvroIOTest.java | 34 +++++++ .../org/apache/beam/sdk/io/BigQueryIOTest.java | 75 +++++++++++++++- .../org/apache/beam/sdk/io/PubsubIOTest.java | 30 +++++++ .../java/org/apache/beam/sdk/io/TextIOTest.java | 32 +++++++ .../beam/sdk/io/datastore/V1Beta3Test.java | 33 +++++++ .../apache/beam/sdk/transforms/CombineTest.java | 19 ++++ .../beam/sdk/transforms/MapElementsTest.java | 22 +++++ .../display/DisplayDataEvaluator.java | 12 ++- 18 files changed, 258 insertions(+), 567 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/pom.xml ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml index 76e5f80..9cd1fb4 100644 --- a/runners/google-cloud-dataflow-java/pom.xml +++ b/runners/google-cloud-dataflow-java/pom.xml @@ -290,11 +290,6 @@ </dependency> <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </dependency> - - <dependency> <groupId>com.google.api-client</groupId> <artifactId>google-api-client</artifactId> </dependency> @@ -331,11 +326,6 @@ </dependency> <dependency> - <groupId>com.google.apis</groupId> - <artifactId>google-api-services-bigquery</artifactId> - </dependency> - - <dependency> <groupId>com.google.cloud.bigdataoss</groupId> <artifactId>util</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java deleted file mode 100644 index 006daa9..0000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowAvroIOTest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.io; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; - -import static org.hamcrest.Matchers.hasItem; -import static org.junit.Assert.assertThat; - -import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; - -import org.apache.avro.Schema; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Set; - -/** - * {@link DataflowRunner} specific tests for {@link AvroIO} transforms. - */ -@RunWith(JUnit4.class) -public class DataflowAvroIOTest { - @Test - public void testPrimitiveWriteDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - - AvroIO.Write.Bound<?> write = AvroIO.Write - .to("foo") - .withSchema(Schema.create(Schema.Type.STRING)) - .withoutValidation(); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat("AvroIO.Write should include the file pattern in its primitive transform", - displayData, hasItem(hasDisplayItem("fileNamePattern"))); - } - - @Test - public void testPrimitiveReadDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - - AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*") - .withSchema(Schema.create(Schema.Type.STRING)) - .withoutValidation(); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); - assertThat("AvroIO.Read should include the file pattern in its primitive transform", - displayData, hasItem(hasDisplayItem("filePattern"))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java deleted file mode 100644 index 2b13b9c..0000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.io; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; - -import static org.hamcrest.Matchers.hasItem; -import static org.junit.Assert.assertThat; - -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; -import org.apache.beam.sdk.io.BigQueryIO; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; - -import com.google.api.services.bigquery.model.TableSchema; - -import org.junit.Test; - -import java.util.Set; - -/** - * Unit tests for Dataflow usage of {@link BigQueryIO} transforms. - */ -public class DataflowBigQueryIOTest { - @Test - public void testTableSourcePrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - BigQueryIO.Read.Bound read = BigQueryIO.Read - .from("project:dataset.tableId") - .withoutValidation(); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); - assertThat("BigQueryIO.Read should include the table spec in its primitive display data", - displayData, hasItem(hasDisplayItem("table"))); - } - - @Test - public void testQuerySourcePrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - BigQueryIO.Read.Bound read = BigQueryIO.Read - .fromQuery("foobar") - .withoutValidation(); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); - assertThat("BigQueryIO.Read should include the query in its primitive display data", - displayData, hasItem(hasDisplayItem("query"))); - } - - @Test - public void testBatchSinkPrimitiveDisplayData() { - DataflowPipelineOptions options = DataflowDisplayDataEvaluator.getDefaultOptions(); - options.setStreaming(false); - testSinkPrimitiveDisplayData(options); - } - - @Test - public void testStreamingSinkPrimitiveDisplayData() { - DataflowPipelineOptions options = DataflowDisplayDataEvaluator.getDefaultOptions(); - options.setStreaming(true); - testSinkPrimitiveDisplayData(options); - } - - private void testSinkPrimitiveDisplayData(DataflowPipelineOptions options) { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(options); - - BigQueryIO.Write.Bound write = BigQueryIO.Write - .to("project:dataset.table") - .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2")) - .withoutValidation(); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat("BigQueryIO.Write should include the table spec in its primitive display data", - displayData, hasItem(hasDisplayItem("tableSpec"))); - - assertThat("BigQueryIO.Write should include the table schema in its primitive display data", - displayData, hasItem(hasDisplayItem("schema"))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java deleted file mode 100644 index 8cdf611..0000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.io; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; - -import static org.hamcrest.Matchers.hasItem; -import static org.junit.Assert.assertThat; - -import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; -import org.apache.beam.sdk.io.datastore.DatastoreIO; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.POutput; - -import com.google.datastore.v1beta3.Entity; -import com.google.datastore.v1beta3.Query; - -import org.junit.Test; - -import java.util.Set; - -/** - * Unit tests for Dataflow usage of {@link DatastoreIO} transforms. - */ -public class DataflowDatastoreIOTest { - @Test - public void testSourcePrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - PTransform<PBegin, ? extends POutput> read = DatastoreIO.v1beta3().read().withProjectId( - "myProject").withQuery(Query.newBuilder().build()); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); - assertThat("DatastoreIO read should include the project in its primitive display data", - displayData, hasItem(hasDisplayItem("projectId"))); - } - - @Test - public void testSinkPrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - PTransform<PCollection<Entity>, ?> write = - DatastoreIO.v1beta3().write().withProjectId("myProject"); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat("DatastoreIO write should include the project in its primitive display data", - displayData, hasItem(hasDisplayItem("projectId"))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java deleted file mode 100644 index 27bc2d9..0000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowPubsubIOTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.io; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; - -import static org.hamcrest.Matchers.hasItem; -import static org.junit.Assert.assertThat; - -import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; -import org.apache.beam.sdk.io.PubsubIO; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Set; - -/** - * {@link DataflowRunner} specific tests for {@link PubsubIO} transforms. - */ -@RunWith(JUnit4.class) -public class DataflowPubsubIOTest { - @Test - public void testPrimitiveWriteDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - PubsubIO.Write.Bound<?> write = PubsubIO.Write.topic("projects/project/topics/topic"); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat("PubsubIO.Write should include the topic in its primitive display data", - displayData, hasItem(hasDisplayItem("topic"))); - } - - @Test - public void testPrimitiveReadDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - PubsubIO.Read.Bound<String> read = - PubsubIO.Read.subscription("projects/project/subscriptions/subscription") - .maxNumRecords(1); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); - assertThat("PubsubIO.Read should include the subscription in its primitive display data", - displayData, hasItem(hasDisplayItem("subscription"))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java deleted file mode 100644 index 727ffdc..0000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowTextIOTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.io; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.startsWith; -import static org.junit.Assert.assertThat; - -import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.testing.TestDataflowPipelineOptions; -import org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; -import org.apache.beam.sdk.util.TestCredential; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Set; - -/** - * {@link DataflowRunner} specific tests for TextIO Read and Write transforms. - */ -@RunWith(JUnit4.class) -public class DataflowTextIOTest { - private TestDataflowPipelineOptions buildTestPipelineOptions() { - TestDataflowPipelineOptions options = - TestPipeline.testingPipelineOptions().as(TestDataflowPipelineOptions.class); - options.setGcpCredential(new TestCredential()); - return options; - } - - @Test - public void testPrimitiveWriteDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - - TextIO.Write.Bound<?> write = TextIO.Write.to("foobar"); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); - assertThat("TextIO.Write should include the file prefix in its primitive display data", - displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); - } - - @Test - public void testPrimitiveReadDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - - TextIO.Read.Bound<String> read = TextIO.Read - .from("foobar") - .withoutValidation(); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); - assertThat("TextIO.Read should include the file prefix in its primitive display data", - displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowCombineTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowCombineTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowCombineTest.java deleted file mode 100644 index 3af0cae..0000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowCombineTest.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.transforms; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; - -import static org.hamcrest.Matchers.hasItem; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.CombineTest; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.POutput; - -import org.junit.Test; - -import java.util.Set; - -/** - * Unit tests for Dataflow usage of {@link Combine} transforms. - */ -public class DataflowCombineTest { - @Test - public void testCombinePerKeyPrimitiveDisplayData() { - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - - CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts(); - PTransform<PCollection<KV<Integer, Integer>>, ? extends POutput> combine = - Combine.perKey(combineFn); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(combine, - KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); - - assertThat("Combine.perKey should include the combineFn in its primitive transform", - displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass()))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java deleted file mode 100644 index d809cc6..0000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowDisplayDataEvaluator.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.transforms; - -import org.apache.beam.runners.dataflow.DataflowRunner; -import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions; -import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; -import org.apache.beam.sdk.options.GcpOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; -import org.apache.beam.sdk.util.NoopCredentialFactory; -import org.apache.beam.sdk.util.NoopPathValidator; - -import com.google.common.collect.Lists; - -/** - * Factory methods for creating {@link DisplayDataEvaluator} instances against the - * {@link DataflowRunner}. - */ -public final class DataflowDisplayDataEvaluator { - /** Do not instantiate. */ - private DataflowDisplayDataEvaluator() {} - - /** - * Retrieve a set of default {@link DataflowPipelineOptions} which can be used to build - * dataflow pipelines for evaluating display data. - */ - public static DataflowPipelineOptions getDefaultOptions() { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - - options.setRunner(DataflowRunner.class); - options.setProject("foobar"); - options.setTempLocation("gs://bucket/tmpLocation"); - options.setFilesToStage(Lists.<String>newArrayList()); - - options.as(DataflowPipelineDebugOptions.class).setPathValidatorClass(NoopPathValidator.class); - options.as(GcpOptions.class).setCredentialFactoryClass(NoopCredentialFactory.class); - - return options; - } - - /** - * Create a {@link DisplayDataEvaluator} instance to evaluate pipeline display data against - * the {@link DataflowRunner}. - */ - public static DisplayDataEvaluator create() { - return create(getDefaultOptions()); - } - - /** - * Create a {@link DisplayDataEvaluator} instance to evaluate pipeline display data against - * the {@link DataflowRunner} with the specified {@code options}. - */ - public static DisplayDataEvaluator create(DataflowPipelineOptions options) { - return DisplayDataEvaluator.create(options); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java deleted file mode 100644 index 8a5e67d..0000000 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.transforms; - -import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; - -import static org.hamcrest.Matchers.hasItem; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; - -import org.junit.Test; - -import java.io.Serializable; -import java.util.Set; - -/** - * Unit tests for Dataflow usage of {@link MapElements} transforms. - */ -public class DataflowMapElementsTest implements Serializable { - @Test - public void testPrimitiveDisplayData() { - SimpleFunction<?, ?> mapFn = new SimpleFunction<Integer, Integer>() { - @Override - public Integer apply(Integer input) { - return input; - } - }; - - MapElements<?, ?> map = MapElements.via(mapFn); - DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create(); - - Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(map); - assertThat("MapElements should include the mapFn in its primitive display data", - displayData, hasItem(hasDisplayItem("mapFn", mapFn.getClass()))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 5faf4e3..9a87b36 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -2295,6 +2295,11 @@ public class Combine { c.output(KV.of(key, combineFnRunner.apply(key, c.element().getValue(), c))); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + Combine.GroupedValues.this.populateDisplayData(builder); + } }).withSideInputs(sideInputs)); try { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 8625b10..047e7d0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -30,9 +31,11 @@ import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.io.AvroIO.Write.Bound; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.values.PCollection; @@ -40,6 +43,7 @@ import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; +import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.reflect.Nullable; @@ -55,6 +59,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Set; /** * Tests for AvroIO Read and Write transforms. @@ -273,6 +278,20 @@ public class AvroIOTest { } @Test + @Category(RunnableOnService.class) + public void testPrimitiveReadDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + + AvroIO.Read.Bound<?> read = AvroIO.Read.from("foo.*") + .withSchema(Schema.create(Schema.Type.STRING)) + .withoutValidation(); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("AvroIO.Read should include the file pattern in its primitive transform", + displayData, hasItem(hasDisplayItem("filePattern"))); + } + + @Test public void testWriteDisplayData() { AvroIO.Write.Bound<?> write = AvroIO.Write .to("foo") @@ -291,4 +310,19 @@ public class AvroIOTest { assertThat(displayData, hasDisplayItem("numShards", 100)); assertThat(displayData, hasDisplayItem("validation", false)); } + + @Test + @Category(RunnableOnService.class) + public void testPrimitiveWriteDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + + AvroIO.Write.Bound<?> write = AvroIO.Write + .to("foo") + .withSchema(Schema.create(Schema.Type.STRING)) + .withoutValidation(); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("AvroIO.Write should include the file pattern in its primitive transform", + displayData, hasItem(hasDisplayItem("fileNamePattern"))); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index 43bf314..78d950e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -23,6 +23,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp import static com.google.common.base.Preconditions.checkArgument; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; @@ -45,6 +46,7 @@ import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; @@ -57,6 +59,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.BigQueryServices; import org.apache.beam.sdk.util.BigQueryServices.DatasetService; @@ -109,6 +112,7 @@ import java.io.Serializable; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Set; import javax.annotation.Nullable; @@ -329,9 +333,10 @@ public class BigQueryIOTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); @Rule public transient ExpectedLogs logged = ExpectedLogs.none(BigQueryIO.class); @Rule public transient TemporaryFolder testFolder = new TemporaryFolder(); - @Mock public transient BigQueryServices.JobService mockJobService; + @Mock(extraInterfaces = Serializable.class) + public transient BigQueryServices.JobService mockJobService; @Mock private transient IOChannelFactory mockIOChannelFactory; - @Mock private transient DatasetService mockDatasetService; + @Mock(extraInterfaces = Serializable.class) private transient DatasetService mockDatasetService; private transient BigQueryOptions bqOptions; @@ -637,6 +642,39 @@ public class BigQueryIOTest implements Serializable { } @Test + @Category(RunnableOnService.class) + public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(bqOptions); + BigQueryIO.Read.Bound read = BigQueryIO.Read + .from("project:dataset.tableId") + .withTestServices(new FakeBigQueryServices() + .withDatasetService(mockDatasetService) + .withJobService(mockJobService)) + .withoutValidation(); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("BigQueryIO.Read should include the table spec in its primitive display data", + displayData, hasItem(hasDisplayItem("table"))); + } + + @Test + @Category(RunnableOnService.class) + public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(bqOptions); + BigQueryIO.Read.Bound read = BigQueryIO.Read + .fromQuery("foobar") + .withTestServices(new FakeBigQueryServices() + .withDatasetService(mockDatasetService) + .withJobService(mockJobService)) + .withoutValidation(); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("BigQueryIO.Read should include the query in its primitive display data", + displayData, hasItem(hasDisplayItem("query"))); + } + + + @Test public void testBuildSink() { BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("foo.com:project:somedataset.sometable"); checkWriteObject( @@ -645,6 +683,39 @@ public class BigQueryIOTest implements Serializable { } @Test + @Category(RunnableOnService.class) + public void testBatchSinkPrimitiveDisplayData() throws IOException, InterruptedException { + testSinkPrimitiveDisplayData(/* streaming: */ false); + } + + @Test + @Category(RunnableOnService.class) + public void testStreamingSinkPrimitiveDisplayData() throws IOException, InterruptedException { + testSinkPrimitiveDisplayData(/* streaming: */ true); + } + + private void testSinkPrimitiveDisplayData(boolean streaming) throws IOException, + InterruptedException { + bqOptions.as(StreamingOptions.class).setStreaming(streaming); + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(bqOptions); + + BigQueryIO.Write.Bound write = BigQueryIO.Write + .to("project:dataset.table") + .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2")) + .withTestServices(new FakeBigQueryServices() + .withDatasetService(mockDatasetService) + .withJobService(mockJobService)) + .withoutValidation(); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("BigQueryIO.Write should include the table spec in its primitive display data", + displayData, hasItem(hasDisplayItem("tableSpec"))); + + assertThat("BigQueryIO.Write should include the table schema in its primitive display data", + displayData, hasItem(hasDisplayItem("schema"))); + } + + @Test public void testBuildSinkwithoutValidation() { // This test just checks that using withoutValidation will not trigger object // construction errors. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java index efa1cd2..1e9ebf2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java @@ -19,18 +19,24 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.util.Set; + /** * Tests for PubsubIO Read and Write transforms. */ @@ -101,6 +107,19 @@ public class PubsubIOTest { } @Test + @Category(RunnableOnService.class) + public void testPrimitiveReadDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PubsubIO.Read.Bound<String> read = + PubsubIO.Read.subscription("projects/project/subscriptions/subscription") + .maxNumRecords(1); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("PubsubIO.Read should include the subscription in its primitive display data", + displayData, hasItem(hasDisplayItem("subscription"))); + } + + @Test public void testWriteDisplayData() { String topic = "projects/project/topics/topic"; PubsubIO.Write.Bound<?> write = PubsubIO.Write @@ -114,4 +133,15 @@ public class PubsubIOTest { assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp")); assertThat(displayData, hasDisplayItem("idLabel", "myId")); } + + @Test + @Category(RunnableOnService.class) + public void testPrimitiveWriteDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PubsubIO.Write.Bound<?> write = PubsubIO.Write.topic("projects/project/topics/topic"); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("PubsubIO.Write should include the topic in its primitive display data", + displayData, hasItem(hasDisplayItem("topic"))); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index df598c8..28e9ea4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -22,8 +22,11 @@ import static org.apache.beam.sdk.TestUtils.LINES_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -42,10 +45,12 @@ import org.apache.beam.sdk.options.GcsOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.IOChannelUtils; @@ -79,6 +84,7 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; @@ -185,6 +191,20 @@ public class TextIOTest { assertThat(displayData, hasDisplayItem("validation", false)); } + @Test + @Category(RunnableOnService.class) + public void testPrimitiveReadDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + + TextIO.Read.Bound<String> read = TextIO.Read + .from("foobar") + .withoutValidation(); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read); + assertThat("TextIO.Read should include the file prefix in its primitive display data", + displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); + } + <T> void runTestWrite(T[] elems, Coder<T> coder) throws Exception { runTestWrite(elems, coder, 1); } @@ -315,6 +335,18 @@ public class TextIOTest { } @Test + @Category(RunnableOnService.class) + public void testPrimitiveWriteDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + + TextIO.Write.Bound<?> write = TextIO.Write.to("foobar"); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("TextIO.Write should include the file prefix in its primitive display data", + displayData, hasItem(hasDisplayItem(hasValue(startsWith("foobar"))))); + } + + @Test public void testUnsupportedFilePattern() throws IOException { File outFolder = tmpFolder.newFolder(); // Windows doesn't like resolving paths with * in them. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java index 9a87ed3..dd22289 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/datastore/V1Beta3Test.java @@ -23,6 +23,7 @@ import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -45,8 +46,14 @@ import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; +import org.apache.beam.sdk.testing.RunnableOnService; +import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.util.TestCredential; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.POutput; import com.google.common.collect.Lists; import com.google.datastore.v1beta3.Entity; @@ -68,6 +75,7 @@ import com.google.protobuf.Int32Value; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -80,6 +88,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; +import java.util.Set; /** * Tests for {@link V1Beta3}. @@ -199,6 +208,18 @@ public class V1Beta3Test { } @Test + @Category(RunnableOnService.class) + public void testSourcePrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PTransform<PBegin, ? extends POutput> read = DatastoreIO.v1beta3().read().withProjectId( + "myProject").withQuery(Query.newBuilder().build()); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read); + assertThat("DatastoreIO read should include the project in its primitive display data", + displayData, hasItem(hasDisplayItem("projectId"))); + } + + @Test public void testWriteDoesNotAllowNullProject() throws Exception { thrown.expect(NullPointerException.class); thrown.expectMessage("projectId"); @@ -233,6 +254,18 @@ public class V1Beta3Test { } @Test + @Category(RunnableOnService.class) + public void testSinkPrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + PTransform<PCollection<Entity>, ?> write = + DatastoreIO.v1beta3().write().withProjectId("myProject"); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write); + assertThat("DatastoreIO write should include the project in its primitive display data", + displayData, hasItem(hasDisplayItem("projectId"))); + } + + @Test public void testQuerySplitBasic() throws Exception { KindExpression mykind = KindExpression.newBuilder().setName("mykind").build(); Query query = Query.newBuilder().addKind(mykind).build(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index 6f6c4a1..b453089 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; @@ -50,6 +51,7 @@ import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.Context; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.transforms.windowing.AfterPane; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -63,6 +65,7 @@ import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.POutput; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableSet; @@ -711,6 +714,22 @@ public class CombineTest implements Serializable { assertThat(displayData, hasDisplayItem(hasNamespace(combineFn.getClass()))); } + @Test + @Category(RunnableOnService.class) + public void testCombinePerKeyPrimitiveDisplayData() { + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + + CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts(); + PTransform<PCollection<KV<Integer, Integer>>, ? extends POutput> combine = + Combine.perKey(combineFn); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(combine, + KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); + + assertThat("Combine.perKey should include the combineFn in its primitive transform", + displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass()))); + } + //////////////////////////////////////////////////////////////////////////// // Test classes, for different kinds of combining fns. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java index e6694d2..f18504c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java @@ -19,13 +19,16 @@ package org.apache.beam.sdk.transforms; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.assertThat; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; @@ -38,6 +41,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.io.Serializable; +import java.util.Set; /** * Tests for {@link MapElements}. @@ -155,6 +159,24 @@ public class MapElementsTest implements Serializable { assertThat(DisplayData.from(simpleMap), hasDisplayItem("mapFn", simpleFn.getClass())); } + @Test + @Category(RunnableOnService.class) + public void testPrimitiveDisplayData() { + SimpleFunction<?, ?> mapFn = new SimpleFunction<Integer, Integer>() { + @Override + public Integer apply(Integer input) { + return input; + } + }; + + MapElements<?, ?> map = MapElements.via(mapFn); + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(map); + assertThat("MapElements should include the mapFn in its primitive display data", + displayData, hasItem(hasDisplayItem("mapFn", mapFn.getClass()))); + } + static class VoidValues<K, V> extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Void>>> { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9e0b1b42/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java index a78a4ad..dc8c1e9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java @@ -41,10 +41,11 @@ public class DisplayDataEvaluator { private final PipelineOptions options; /** - * Create a new {@link DisplayDataEvaluator} using {@link TestPipeline#testingPipelineOptions()}. + * Create a new {@link DisplayDataEvaluator} using options returned from + * {@link #getDefaultOptions()}. */ public static DisplayDataEvaluator create() { - return create(TestPipeline.testingPipelineOptions()); + return create(getDefaultOptions()); } /** @@ -54,6 +55,13 @@ public class DisplayDataEvaluator { return new DisplayDataEvaluator(pipelineOptions); } + /** + * The default {@link PipelineOptions} which will be used by {@link #create()}. + */ + public static PipelineOptions getDefaultOptions() { + return TestPipeline.testingPipelineOptions(); + } + private DisplayDataEvaluator(PipelineOptions options) { this.options = options; }
