[ 
https://issues.apache.org/jira/browse/BEAM-3983?focusedWorklogId=102770&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-102770
 ]

ASF GitHub Bot logged work on BEAM-3983:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/May/18 04:38
            Start Date: 17/May/18 04:38
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #5290: [BEAM-3983] 
Restore BigQuery SQL Support with copied enums
URL: https://github.com/apache/beam/pull/5290
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/extensions/sql/pom.xml b/sdks/java/extensions/sql/pom.xml
index d893205e96d..63f5022e964 100644
--- a/sdks/java/extensions/sql/pom.xml
+++ b/sdks/java/extensions/sql/pom.xml
@@ -402,6 +402,12 @@
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
     <!-- for tests  -->
     <dependency>
       <groupId>junit</groupId>
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
new file mode 100644
index 00000000000..6bfd839c018
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQueryTable.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * {@code BeamBigQueryTable} represent a BigQuery table as a target.
+ * This provider does not currently support being a source.
+ *
+ */
+@Experimental
+public class BeamBigQueryTable extends BaseBeamTable implements Serializable {
+  private String tableSpec;
+
+  public BeamBigQueryTable(Schema beamSchema, String tableSpec) {
+    super(beamSchema);
+    this.tableSpec = tableSpec;
+  }
+
+  @Override
+  public BeamIOType getSourceType() {
+    return BeamIOType.BOUNDED;
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(Pipeline pipeline) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
+    return new PTransform<PCollection<Row>, POutput>() {
+      @Override
+      public WriteResult expand(PCollection<Row> input) {
+        return input.apply(BigQueryIO.<Row>write()
+          .withSchema(BigQueryUtils.toTableSchema(getSchema()))
+          .withFormatFunction(BigQueryUtils.toTableRow())
+          .to(tableSpec));
+      }
+    };
+  }
+
+  public String getTableSpec() {
+    return tableSpec;
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java
new file mode 100644
index 00000000000..4d1bd2d9155
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
+
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
+import org.apache.beam.sdk.schemas.Schema;
+
+/**
+ * BigQuery table provider.
+ *
+ * <p>A sample of text table is:
+ * <pre>{@code
+ * CREATE TABLE ORDERS(
+ *   ID INT COMMENT 'this is the primary key',
+ *   NAME VARCHAR(127) COMMENT 'this is the name'
+ * )
+ * TYPE 'bigquery'
+ * COMMENT 'this is the table orders'
+ * LOCATION '[PROJECT_ID]:[DATASET].[TABLE]'
+ * }</pre>
+ */
+public class BigQueryTableProvider extends InMemoryMetaTableProvider {
+
+  @Override public String getTableType() {
+    return "bigquery";
+  }
+
+  @Override public BeamSqlTable buildBeamSqlTable(Table table) {
+    Schema schema = table.getSchema();
+    String filePattern = table.getLocation();
+
+    return new BeamBigQueryTable(schema, filePattern);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/package-info.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/package-info.java
new file mode 100644
index 00000000000..129714104b2
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Table schema for BigQuery.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/package-info.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/package-info.java
index 4101da775d5..19e88601545 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/package-info.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/package-info.java
@@ -17,6 +17,6 @@
  */
 
 /**
- * table schema for KafkaIO.
+ * Table schema for KafkaIO.
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.kafka;
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java
new file mode 100644
index 00000000000..7eea8776058
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTableProviderTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
+
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.stream.Stream;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.junit.Test;
+
+/**
+ * UnitTest for {@link BigQueryTableProvider}.
+ */
+public class BigQueryTableProviderTest {
+  private BigQueryTableProvider provider = new BigQueryTableProvider();
+
+  @Test
+  public void testGetTableType() throws Exception {
+    assertEquals("bigquery", provider.getTableType());
+  }
+
+  @Test
+  public void testBuildBeamSqlTable() throws Exception {
+    Table table = fakeTable("hello");
+    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+
+    assertNotNull(sqlTable);
+    assertTrue(sqlTable instanceof BeamBigQueryTable);
+
+    BeamBigQueryTable bqTable = (BeamBigQueryTable) sqlTable;
+    assertEquals("project:dataset.table", bqTable.getTableSpec());
+  }
+
+  private static Table fakeTable(String name) {
+    return Table.builder()
+        .name(name)
+        .comment(name + " table")
+        .location("project:dataset.table")
+        .schema(
+            Stream.of(
+                Schema.Field.of("id", 
TypeName.INT32.type()).withNullable(true),
+                Schema.Field.of("name", 
RowSqlTypes.VARCHAR).withNullable(true))
+                  .collect(toSchema()))
+        .type("bigquery")
+        .build();
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
new file mode 100644
index 00000000000..8484097f7f4
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * Utility methods for BigQuery related operations.
+ *
+ * <p><b>Example: Writing to BigQuery</b>
+ *
+ * <pre>{@code
+ * PCollection<Row> rows = ...;
+ *
+ * rows.apply(BigQueryIO.<Row>write()
+ *       .withSchema(BigQueryUtils.toTableSchema(rows))
+ *       .withFormatFunction(BigQueryUtils.toTableRow())
+ *       .to("my-project:my_dataset.my_table"));
+ * }</pre>
+ */
+public class BigQueryUtils {
+  private static final Map<TypeName, StandardSQLTypeName> 
BEAM_TO_BIGQUERY_TYPE_MAPPING =
+      ImmutableMap.<TypeName, StandardSQLTypeName>builder()
+          .put(TypeName.BYTE, StandardSQLTypeName.INT64)
+          .put(TypeName.INT16, StandardSQLTypeName.INT64)
+          .put(TypeName.INT32, StandardSQLTypeName.INT64)
+          .put(TypeName.INT64, StandardSQLTypeName.INT64)
+
+          .put(TypeName.FLOAT, StandardSQLTypeName.FLOAT64)
+          .put(TypeName.DOUBLE, StandardSQLTypeName.FLOAT64)
+
+          .put(TypeName.DECIMAL, StandardSQLTypeName.FLOAT64)
+
+          .put(TypeName.BOOLEAN, StandardSQLTypeName.BOOL)
+
+          .put(TypeName.ARRAY, StandardSQLTypeName.ARRAY)
+          .put(TypeName.ROW, StandardSQLTypeName.STRUCT)
+
+          .put(TypeName.DATETIME, StandardSQLTypeName.TIMESTAMP)
+          .put(TypeName.STRING, StandardSQLTypeName.STRING)
+
+          .build();
+
+  private static final Map<byte[], StandardSQLTypeName> 
BEAM_TO_BIGQUERY_METADATA_MAPPING =
+      ImmutableMap.<byte[], StandardSQLTypeName>builder()
+          .put("DATE".getBytes(), StandardSQLTypeName.DATE)
+          .put("TIME".getBytes(), StandardSQLTypeName.TIME)
+          .put("TIME_WITH_LOCAL_TZ".getBytes(), StandardSQLTypeName.TIME)
+          .put("TS".getBytes(), StandardSQLTypeName.TIMESTAMP)
+          .put("TS_WITH_LOCAL_TZ".getBytes(), StandardSQLTypeName.TIMESTAMP)
+          .build();
+
+  /**
+   * Get the corresponding BigQuery {@link StandardSQLTypeName}
+   * for supported Beam {@link FieldType}.
+   */
+  private static StandardSQLTypeName toStandardSQLTypeName(FieldType 
fieldType) {
+    StandardSQLTypeName sqlType = 
BEAM_TO_BIGQUERY_TYPE_MAPPING.get(fieldType.getTypeName());
+
+    if (sqlType == StandardSQLTypeName.TIMESTAMP && fieldType.getMetadata() != 
null) {
+      sqlType = BEAM_TO_BIGQUERY_METADATA_MAPPING.get(fieldType.getMetadata());
+    }
+
+    return sqlType;
+  }
+
+  private static List<TableFieldSchema> toTableFieldSchema(Schema schema) {
+    List<TableFieldSchema> fields = new 
ArrayList<TableFieldSchema>(schema.getFieldCount());
+    for (Field schemaField : schema.getFields()) {
+      FieldType type = schemaField.getType();
+
+      TableFieldSchema field = new TableFieldSchema()
+        .setName(schemaField.getName());
+      if (schemaField.getDescription() != null && 
!"".equals(schemaField.getDescription())) {
+        field.setDescription(schemaField.getDescription());
+      }
+
+      if (!schemaField.getNullable()) {
+        field.setMode(Mode.REQUIRED.toString());
+      }
+      if (TypeName.ARRAY == type.getTypeName()) {
+        type = type.getCollectionElementType();
+        field.setMode(Mode.REPEATED.toString());
+      }
+      if (TypeName.ROW == type.getTypeName()) {
+        Schema subType = type.getRowSchema();
+        field.setFields(toTableFieldSchema(subType));
+      }
+      field.setType(toStandardSQLTypeName(type).toString());
+
+      fields.add(field);
+    }
+    return fields;
+  }
+
+  /**
+   * Convert a Beam {@link Schema} to a BigQuery {@link TableSchema}.
+   */
+  public static TableSchema toTableSchema(Schema schema) {
+    return new TableSchema().setFields(toTableFieldSchema(schema));
+  }
+
+  /**
+   * Convert a Beam {@link PCollection} to a BigQuery {@link TableSchema}.
+   */
+  public static TableSchema toTableSchema(PCollection<Row> rows) {
+    RowCoder coder = (RowCoder) rows.getCoder();
+    return toTableSchema(coder.getSchema());
+  }
+
+  private static final SerializableFunction<Row, TableRow> TO_TABLE_ROW = new 
ToTableRow();
+
+  /**
+   * Convert a Beam {@link Row} to a BigQuery {@link TableRow}.
+   */
+  public static SerializableFunction<Row, TableRow> toTableRow() {
+    return TO_TABLE_ROW;
+  }
+
+  /**
+   * Convert a Beam {@link Row} to a BigQuery {@link TableRow}.
+   */
+  private static class ToTableRow implements SerializableFunction<Row, 
TableRow> {
+    @Override
+    public TableRow apply(Row input) {
+      TableRow output = new TableRow();
+      for (int i = 0; i < input.getFieldCount(); i++) {
+        Object value = input.getValue(i);
+
+        Field schemaField = input.getSchema().getField(i);
+        TypeName type = schemaField.getType().getTypeName();
+        if (TypeName.ARRAY == type) {
+          type = 
schemaField.getType().getCollectionElementType().getTypeName();
+          if (TypeName.ROW == type) {
+            List<Row> rows = (List<Row>) value;
+            List<TableRow> tableRows = new ArrayList<TableRow>(rows.size());
+            for (int j = 0; j < rows.size(); j++) {
+              tableRows.add(apply(rows.get(j)));
+            }
+            value = tableRows;
+          }
+        } else if (TypeName.ROW == type) {
+          value = apply((Row) value);
+        }
+
+        output = output.set(
+            schemaField.getName(),
+            value);
+      }
+      return output;
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/Mode.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/Mode.java
new file mode 100644
index 00000000000..36fceee43c3
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/Mode.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2015 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Copied from package com.google.cloud.bigquery, see BEAM-4248.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+/**
+ * Mode for a BigQuery Table field. {@link Mode#NULLABLE} fields can be set to 
{@code null},
+ * {@link Mode#REQUIRED} fields must be provided. {@link Mode#REPEATED} fields 
can contain more
+ * than one value.
+ */
+enum Mode {
+  NULLABLE, REQUIRED, REPEATED
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StandardSQLTypeName.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StandardSQLTypeName.java
new file mode 100644
index 00000000000..8675f39b2c0
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StandardSQLTypeName.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2016 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Copied from package com.google.cloud.bigquery, see BEAM-4248.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+/**
+ * A type used in standard SQL contexts. For example, these types are used in 
queries
+ * with query parameters, which requires usage of standard SQL.
+ *
+ * @see <a 
href="https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types";>https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types</a>
+ */
+enum StandardSQLTypeName {
+  /** A Boolean value (true or false). */
+  BOOL,
+  /** A 64-bit signed integer value. */
+  INT64,
+  /** A 64-bit IEEE binary floating-point value. */
+  FLOAT64,
+  /** Variable-length character (Unicode) data. */
+  STRING,
+  /** Variable-length binary data. */
+  BYTES,
+  /** Container of ordered fields each with a type (required) and field name 
(optional). */
+  STRUCT,
+  /** Ordered list of zero or more elements of any non-array type. */
+  ARRAY,
+  /**
+   * Represents an absolute point in time, with microsecond precision. Values 
range between the
+   * years 1 and 9999, inclusive.
+   */
+  TIMESTAMP,
+  /** Represents a logical calendar date. Values range between the years 1 and 
9999, inclusive. */
+  DATE,
+  /** Represents a time, independent of a specific date, to microsecond 
precision. */
+  TIME,
+  /** Represents a year, month, day, hour, minute, second, and subsecond 
(microsecond precision). */
+  DATETIME
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
new file mode 100644
index 00000000000..8b07fcc0a85
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toTableRow;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toTableSchema;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.collection.IsMapContaining.hasEntry;
+import static org.junit.Assert.assertThat;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+/**
+ * Tests for {@link BigQueryUtils}.
+ */
+public class BigQueryUtilsTest {
+  private static final Schema FLAT_TYPE = Schema
+      .builder()
+      .addInt64Field("id", true)
+      .addDoubleField("value", true)
+      .addStringField("name", true)
+      .addDateTimeField("timestamp", true)
+      .addBooleanField("valid", true)
+      .build();
+
+  private static final Schema ARRAY_TYPE = Schema
+      .builder()
+      .addArrayField("ids", Schema.TypeName.INT64.type())
+      .build();
+
+  private static final Schema ROW_TYPE = Schema
+      .builder()
+      .addRowField("row", FLAT_TYPE, true)
+      .build();
+
+  private static final Schema ARRAY_ROW_TYPE = Schema
+      .builder()
+      .addArrayField("rows", Schema.FieldType
+          .of(Schema.TypeName.ROW)
+          .withRowSchema(FLAT_TYPE))
+      .build();
+
+  private static final TableFieldSchema ID =
+      new TableFieldSchema().setName("id")
+        .setType(StandardSQLTypeName.INT64.toString());
+
+  private static final TableFieldSchema VALUE =
+      new TableFieldSchema().setName("value")
+        .setType(StandardSQLTypeName.FLOAT64.toString());
+
+  private static final TableFieldSchema NAME =
+      new TableFieldSchema().setName("name")
+        .setType(StandardSQLTypeName.STRING.toString());
+
+  private static final TableFieldSchema TIMESTAMP =
+      new TableFieldSchema().setName("timestamp")
+        .setType(StandardSQLTypeName.TIMESTAMP.toString());
+
+  private static final TableFieldSchema VALID =
+      new TableFieldSchema().setName("valid")
+        .setType(StandardSQLTypeName.BOOL.toString());
+
+  private static final TableFieldSchema IDS =
+      new TableFieldSchema().setName("ids")
+        .setType(StandardSQLTypeName.INT64.toString())
+        .setMode(Mode.REPEATED.toString());
+
+  private static final Row FLAT_ROW =
+      Row
+        .withSchema(FLAT_TYPE)
+        .addValues(123L, 123.456, "test", new DateTime(123456), false)
+        .build();
+
+  private static final Row ARRAY_ROW =
+      Row
+        .withSchema(ARRAY_TYPE)
+        .addValues((Object) Arrays.asList(123L, 124L))
+        .build();
+
+  private static final Row ROW_ROW =
+      Row
+        .withSchema(ROW_TYPE)
+        .addValues(FLAT_ROW)
+        .build();
+
+  private static final Row ARRAY_ROW_ROW =
+      Row
+        .withSchema(ARRAY_ROW_TYPE)
+        .addValues((Object) Arrays.asList(FLAT_ROW))
+        .build();
+
+  @Test public void testToTableSchema_flat() {
+    TableSchema schema = toTableSchema(FLAT_TYPE);
+
+    assertThat(schema.getFields(), containsInAnyOrder(ID, VALUE, NAME, 
TIMESTAMP, VALID));
+  }
+
+  @Test public void testToTableSchema_array() {
+    TableSchema schema = toTableSchema(ARRAY_TYPE);
+
+    assertThat(schema.getFields(), contains(IDS));
+  }
+
+  @Test public void testToTableSchema_row() {
+    TableSchema schema = toTableSchema(ROW_TYPE);
+
+    assertThat(schema.getFields().size(), equalTo(1));
+    TableFieldSchema field = schema.getFields().get(0);
+    assertThat(field.getName(), equalTo("row"));
+    assertThat(field.getType(), 
equalTo(StandardSQLTypeName.STRUCT.toString()));
+    assertThat(field.getMode(), nullValue());
+    assertThat(field.getFields(), containsInAnyOrder(ID, VALUE, NAME, 
TIMESTAMP, VALID));
+  }
+
+  @Test public void testToTableSchema_array_row() {
+    TableSchema schema = toTableSchema(ARRAY_ROW_TYPE);
+
+    assertThat(schema.getFields().size(), equalTo(1));
+    TableFieldSchema field = schema.getFields().get(0);
+    assertThat(field.getName(), equalTo("rows"));
+    assertThat(field.getType(), 
equalTo(StandardSQLTypeName.STRUCT.toString()));
+    assertThat(field.getMode(), equalTo(Mode.REPEATED.toString()));
+    assertThat(field.getFields(), containsInAnyOrder(ID, VALUE, NAME, 
TIMESTAMP, VALID));
+  }
+
+  @Test public void testToTableRow_flat() {
+    TableRow row = toTableRow().apply(FLAT_ROW);
+
+    assertThat(row.size(), equalTo(5));
+    assertThat(row, hasEntry("id", 123L));
+    assertThat(row, hasEntry("value", 123.456));
+    assertThat(row, hasEntry("name", "test"));
+    assertThat(row, hasEntry("valid", false));
+  }
+
+  @Test public void testToTableRow_array() {
+    TableRow row = toTableRow().apply(ARRAY_ROW);
+
+    assertThat(row, hasEntry("ids", Arrays.asList(123L, 124L)));
+    assertThat(row.size(), equalTo(1));
+  }
+
+  @Test public void testToTableRow_row() {
+    TableRow row = toTableRow().apply(ROW_ROW);
+
+    assertThat(row.size(), equalTo(1));
+    row = (TableRow) row.get("row");
+    assertThat(row.size(), equalTo(5));
+    assertThat(row, hasEntry("id", 123L));
+    assertThat(row, hasEntry("value", 123.456));
+    assertThat(row, hasEntry("name", "test"));
+    assertThat(row, hasEntry("valid", false));
+  }
+
+  @Test public void testToTableRow_array_row() {
+    TableRow row = toTableRow().apply(ARRAY_ROW_ROW);
+
+    assertThat(row.size(), equalTo(1));
+    row = ((List<TableRow>) row.get("rows")).get(0);
+    assertThat(row.size(), equalTo(5));
+    assertThat(row, hasEntry("id", 123L));
+    assertThat(row, hasEntry("value", 123.456));
+    assertThat(row, hasEntry("name", "test"));
+    assertThat(row, hasEntry("valid", false));
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 102770)
    Time Spent: 33h 20m  (was: 33h 10m)

> BigQuery writes from pure SQL
> -----------------------------
>
>                 Key: BEAM-3983
>                 URL: https://issues.apache.org/jira/browse/BEAM-3983
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Andrew Pilloud
>            Assignee: Andrew Pilloud
>            Priority: Major
>          Time Spent: 33h 20m
>  Remaining Estimate: 0h
>
> It would be nice if you could write to BigQuery in SQL without writing any 
> java code. For example:
> {code:java}
> INSERT INTO bigquery SELECT * FROM PCOLLECTION{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to