Repository: incubator-beam
Updated Branches:
  refs/heads/master acb040684 -> 9f796e22f


Fix BigQuery sink display data


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/489298fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/489298fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/489298fc

Branch: refs/heads/master
Commit: 489298fcd0371f70d0d8d20ece609618808a5f0c
Parents: d9355c2
Author: Scott Wegner <[email protected]>
Authored: Fri May 6 16:46:40 2016 -0700
Committer: bchambers <[email protected]>
Committed: Tue May 10 16:21:27 2016 -0700

----------------------------------------------------------------------
 .../dataflow/io/DataflowBigQueryIOTest.java     | 71 ++++++++++++++++++++
 .../java/org/apache/beam/sdk/io/BigQueryIO.java | 26 +++++++
 2 files changed, 97 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/489298fc/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java
new file mode 100644
index 0000000..619da04
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowBigQueryIOTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.io;
+
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
+
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
+import 
org.apache.beam.runners.dataflow.transforms.DataflowDisplayDataEvaluator;
+import org.apache.beam.sdk.io.BigQueryIO;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+
+import com.google.api.services.bigquery.model.TableSchema;
+
+import org.junit.Test;
+
+import java.util.Set;
+
+/**
+ * Unit tests for Dataflow usage of {@link BigQueryIO} transforms.
+ */
+public class DataflowBigQueryIOTest {
+  @Test
+  public void testBatchSinkPrimitiveDisplayData() {
+    DataflowPipelineOptions options = 
DataflowDisplayDataEvaluator.getDefaultOptions();
+    options.setStreaming(false);
+    testSinkPrimitiveDisplayData(options);
+  }
+
+  @Test
+  public void testStreamingSinkPrimitiveDisplayData() {
+    DataflowPipelineOptions options = 
DataflowDisplayDataEvaluator.getDefaultOptions();
+    options.setStreaming(true);
+    testSinkPrimitiveDisplayData(options);
+  }
+
+  private void testSinkPrimitiveDisplayData(DataflowPipelineOptions options) {
+    DisplayDataEvaluator evaluator = 
DataflowDisplayDataEvaluator.create(options);
+
+    BigQueryIO.Write.Bound write = BigQueryIO.Write
+        .to("project:dataset.table")
+        .withSchema(new TableSchema().set("col1", "type1").set("col2", 
"type2"))
+        .withoutValidation();
+
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(write);
+    assertThat("BigQueryIO.Write should include the table spec in its 
primitive display data",
+        displayData, hasItem(hasDisplayItem(hasKey("tableSpec"))));
+
+    assertThat("BigQueryIO.Write should include the table schema in its 
primitive display data",
+        displayData, hasItem(hasDisplayItem(hasKey("schema"))));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/489298fc/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index 7785298..15872e5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -1166,6 +1166,15 @@ public class BigQueryIO {
       return new BigQueryWriteOperation(this);
     }
 
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+
+      builder
+          .addIfNotNull(DisplayData.item("schema", jsonSchema))
+          .addIfNotNull(DisplayData.item("tableSpec", jsonTable));
+    }
+
     private static class BigQueryWriteOperation extends 
FileBasedWriteOperation<TableRow> {
       // The maximum number of retry load jobs.
       private static final int MAX_RETRY_LOAD_JOBS = 3;
@@ -1396,6 +1405,13 @@ public class BigQueryIO {
       uniqueIdsForTableRows.clear();
     }
 
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+
+      builder.addIfNotNull(DisplayData.item("schema", jsonTableSchema));
+    }
+
     public TableReference getOrCreateTable(BigQueryOptions options, String 
tableSpec)
         throws IOException {
       TableReference tableReference = parseTableSpec(tableSpec);
@@ -1603,6 +1619,16 @@ public class BigQueryIO {
           new TableRowInfo(context.element(), uniqueId)));
     }
 
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+
+      builder.addIfNotNull(DisplayData.item("tableSpec", tableSpec));
+      if (tableRefFunction != null) {
+        builder.add(DisplayData.item("tableFn", tableRefFunction.getClass()));
+      }
+    }
+
     private String tableSpecFromWindow(BigQueryOptions options, BoundedWindow 
window) {
       if (tableSpec != null) {
         return tableSpec;

Reply via email to