[ 
https://issues.apache.org/jira/browse/BEAM-4162?focusedWorklogId=100972&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100972
 ]

ASF GitHub Bot logged work on BEAM-4162:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/May/18 04:00
            Start Date: 11/May/18 04:00
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #5253: [BEAM-4162][SQL] 
Wire up PubsubIO to SQL
URL: https://github.com/apache/beam/pull/5253
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
index 231b0c301c0..4c7fe0900e5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java
@@ -504,6 +504,13 @@ public int indexOf(String fieldName) {
     return index;
   }
 
+  /**
+   * Returns true if {@code fieldName} exists in the schema, false otherwise.
+   */
+  public boolean hasField(String fieldName) {
+    return fieldIndices.containsKey(fieldName);
+  }
+
   /**
    * Return the name of field by index.
    */
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
index eb3011df4d4..b962e33c9d8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/JsonToRow.java
@@ -17,9 +17,10 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static org.apache.beam.sdk.util.JsonToRowUtils.jsonToRow;
+import static org.apache.beam.sdk.util.JsonToRowUtils.newObjectMapperWith;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import java.io.IOException;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.schemas.Schema;
@@ -87,20 +88,12 @@ private JsonToRowFn(Schema schema) {
                   new DoFn<String, Row>() {
                     @ProcessElement
                     public void processElement(ProcessContext context) {
-                      context.output(jsonToRow(context.element()));
+                      context.output(jsonToRow(objectMapper(), 
context.element()));
                     }
                   }))
           .setCoder(schema.getRowCoder());
     }
 
-    private Row jsonToRow(String jsonString) {
-      try {
-        return objectMapper().readValue(jsonString, Row.class);
-      } catch (IOException e) {
-        throw new IllegalArgumentException("Unable to parse json object: " + 
jsonString, e);
-      }
-    }
-
     private ObjectMapper objectMapper() {
       if (this.objectMapper == null) {
         synchronized (this) {
@@ -112,15 +105,5 @@ private ObjectMapper objectMapper() {
 
       return this.objectMapper;
     }
-
-    private static ObjectMapper newObjectMapperWith(RowJsonDeserializer 
deserializer) {
-      SimpleModule module = new SimpleModule("rowDeserializationModule");
-      module.addDeserializer(Row.class, deserializer);
-
-      ObjectMapper objectMapper = new ObjectMapper();
-      objectMapper.registerModule(module);
-
-      return objectMapper;
-    }
   }
 }
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/JsonToRowUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/JsonToRowUtils.java
new file mode 100644
index 00000000000..6ef666cf4cb
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/JsonToRowUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import java.io.IOException;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * JsonToRowUtils.
+ */
+@Internal
+public class JsonToRowUtils {
+
+  public static ObjectMapper newObjectMapperWith(RowJsonDeserializer 
deserializer) {
+    SimpleModule module = new SimpleModule("rowDeserializationModule");
+    module.addDeserializer(Row.class, deserializer);
+
+    ObjectMapper objectMapper = new ObjectMapper();
+    objectMapper.registerModule(module);
+
+    return objectMapper;
+  }
+
+  public static Row jsonToRow(ObjectMapper objectMapper, String jsonString) {
+    try {
+      return objectMapper.readValue(jsonString, Row.class);
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Unable to parse json object: " + 
jsonString, e);
+    }
+  }
+}
diff --git a/sdks/java/extensions/sql/build.gradle 
b/sdks/java/extensions/sql/build.gradle
index babcda2c5d2..e269198904e 100644
--- a/sdks/java/extensions/sql/build.gradle
+++ b/sdks/java/extensions/sql/build.gradle
@@ -64,6 +64,7 @@ dependencies {
   shadow library.java.joda_time
   shadow project(path: ":beam-runners-direct-java", configuration: "shadow")
   provided project(path: ":beam-sdks-java-io-kafka", configuration: "shadow")
+  provided project(path: ":beam-sdks-java-io-google-cloud-platform", 
configuration: "shadow")
   provided library.java.kafka_clients
   testCompile library.java.slf4j_jdk14
   testCompile library.java.junit
diff --git a/sdks/java/extensions/sql/pom.xml b/sdks/java/extensions/sql/pom.xml
index 94f3fe089de..d893205e96d 100644
--- a/sdks/java/extensions/sql/pom.xml
+++ b/sdks/java/extensions/sql/pom.xml
@@ -390,6 +390,12 @@
       <scope>provided</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
index cf95ddc473f..645a6baaf88 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/Table.java
@@ -40,6 +40,8 @@
   @Nullable
   public abstract JSONObject getProperties();
 
+  public abstract Builder toBuilder();
+
   public static Builder builder() {
     return new 
org.apache.beam.sdk.extensions.sql.meta.AutoValue_Table.Builder();
   }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java
new file mode 100644
index 00000000000..3a9902f06d2
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * <i>Experimental</i>
+ *
+ * <p>Wraps the {@link PubsubIO} with JSON messages into {@link BeamSqlTable}.
+ *
+ * <p>This enables {@link PubsubIO} registration in Beam SQL environment as a 
table, including DDL
+ * support.
+ *
+ * <p>Pubsub messages include metadata along with the payload, and it has to 
be explicitly
+ * specified in the schema to make sure it is available to the queries.
+ *
+ * <p>The fields included in the Pubsub message model are:
+ * 'event_timestamp', 'attributes', and 'payload'.
+ *
+ * <p>For example:
+ *
+ * <p>If the messages have JSON messages in the payload that look like this:
+ * <pre>
+ *  {
+ *    "id" : 5,
+ *    "name" : "foo"
+ *  }
+ * </pre>
+ *
+ * <p>Then SQL statements to declare and query such topic will look like this:
+ * <pre>
+ *  CREATE TABLE topic_table (
+ *        event_timestamp TIMESTAMP,
+ *        attributes MAP&lt;VARCHAR, VARCHAR&gt;,
+ *        payload ROW&lt;name VARCHAR, age INTEGER&gt;
+*      )
+ *     TYPE 'pubsub'
+ *     LOCATION projects/&lt;GCP project id&gt;/topics/&lt;topic name&gt;
+ *     TBLPROPERTIES '{ \"timestampAttributeKey\" : &lt;timestamp 
attribute&gt; }';
+ *
+ *  SELECT event_timestamp, topic_table.payload.name FROM topic_table;
+ * </pre>
+ *
+ * <p>Note, 'payload' field is defined as ROW with schema matching the JSON 
payload of the message.
+ * If 'timestampAttributeKey' is specified in TBLPROPERTIES then 
'event_timestamp' will be set
+ * to the value of that attribute. If it is not specified, then message 
publish time will be used as
+ * event timestamp. 'attributes' map contains Pubsub message attributes map 
unchanged and can
+ * be referenced in the queries as well.
+ */
+@AutoValue
+@Internal
+@Experimental
+abstract class PubsubIOJsonTable implements BeamSqlTable, Serializable {
+
+  /**
+   * Optional attribute key of the Pubsub message from which to extract the 
event timestamp.
+   *
+   * <p>This attribute has to conform to the same requirements as in {@link
+   * PubsubIO.Read.Builder#withTimestampAttribute}.
+   *
+   * <p>Short version: it has to be either millis since epoch or string in RFC 
3339 format.
+   *
+   * <p>If the attribute is specified then event timestamps will be extracted 
from
+   * the specified attribute. If it is not specified then message publish 
timestamp will be used.
+   */
+  @Nullable abstract String getTimestampAttribute();
+
+  /**
+   * Pubsub topic name.
+   *
+   * <p>Topic is the only way to specify the Pubsub source. Explicitly 
specifying the subscription
+   * is not supported at the moment. Subscriptions are automatically created 
(but not deleted).
+   */
+  abstract String getTopic();
+
+  static Builder builder() {
+    return new AutoValue_PubsubIOJsonTable.Builder();
+  }
+
+  /**
+   * Table schema, describes Pubsub message schema.
+   *
+   * <p>Includes fields 'event_timestamp', 'attributes, and 'payload'.
+   * See {@link PubsubMessageToRow}.
+   */
+   public abstract Schema getSchema();
+
+  @Override
+  public BeamIOType getSourceType() {
+    return BeamIOType.UNBOUNDED;
+  }
+
+  @Override
+  public PCollection<Row> buildIOReader(Pipeline pipeline) {
+    return
+        PBegin
+            .in(pipeline)
+            .apply("readFromPubsub", readMessagesWithAttributes())
+            .apply("parseMessageToRow", 
PubsubMessageToRow.forSchema(getSchema()))
+            .setCoder(getSchema().getRowCoder());
+  }
+
+  private PubsubIO.Read<PubsubMessage> readMessagesWithAttributes() {
+    PubsubIO.Read<PubsubMessage> read = PubsubIO
+        .readMessagesWithAttributes()
+        .fromTopic(getTopic());
+
+    return (getTimestampAttribute() == null)
+        ? read
+        : read.withTimestampAttribute(getTimestampAttribute());
+  }
+
+  @Override
+  public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
+    throw new UnsupportedOperationException("Writing to a Pubsub topic is not 
supported");
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    abstract Builder setSchema(Schema schema);
+    abstract Builder setTimestampAttribute(String timestampAttribute);
+    abstract Builder setTopic(String topic);
+
+    abstract PubsubIOJsonTable build();
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
new file mode 100644
index 00000000000..b814f9e5740
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
+
+import static org.apache.beam.sdk.extensions.sql.RowSqlTypes.TIMESTAMP;
+import static org.apache.beam.sdk.extensions.sql.RowSqlTypes.VARCHAR;
+import static 
org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD;
+import static 
org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
+import static 
org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubMessageToRow.TIMESTAMP_FIELD;
+import static org.apache.beam.sdk.schemas.Schema.TypeName.MAP;
+import static org.apache.beam.sdk.schemas.Schema.TypeName.ROW;
+
+import com.alibaba.fastjson.JSONObject;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
+import org.apache.beam.sdk.schemas.Schema;
+
+/**
+ * {@link TableProvider} for {@link PubsubIOJsonTable} which wraps {@link 
PubsubIO}
+ * for consumption by Beam SQL.
+ */
+@Internal
+@Experimental
+public class PubsubJsonTableProvider extends InMemoryMetaTableProvider {
+
+  @Override
+  public String getTableType() {
+    return "pubsub";
+  }
+
+  @Override
+  public BeamSqlTable buildBeamSqlTable(Table tableDefintion) {
+    validatePubsubMessageSchema(tableDefintion);
+
+    JSONObject tableProperties = tableDefintion.getProperties();
+    String timestampAttributeKey = 
tableProperties.getString("timestampAttributeKey");
+
+    return
+        PubsubIOJsonTable
+            .builder()
+            .setSchema(tableDefintion.getSchema())
+            .setTimestampAttribute(timestampAttributeKey)
+            .setTopic(tableDefintion.getLocation())
+            .build();
+  }
+
+  private void validatePubsubMessageSchema(Table tableDefinition) {
+    Schema schema = tableDefinition.getSchema();
+
+    if (schema.getFieldCount() != 3
+        || !fieldPresent(schema, TIMESTAMP_FIELD, TIMESTAMP)
+        || !fieldPresent(schema, ATTRIBUTES_FIELD, 
MAP.type().withMapType(VARCHAR, VARCHAR))
+        || !(schema.hasField(PAYLOAD_FIELD)
+             && 
ROW.equals(schema.getField(PAYLOAD_FIELD).getType().getTypeName()))) {
+
+      throw new IllegalArgumentException(
+          "Unsupported schema specified for Pubsub source in CREATE TABLE. "
+          + "CREATE TABLE for Pubsub topic should define exactly the following 
fields: "
+          + "'event_timestamp' field of type 'TIMESTAMP', 'attributes' field 
of type "
+          + "MAP<VARCHAR, VARCHAR>, and 'payload' field of type 'ROW<...>' 
which matches the "
+          + "payload JSON format.");
+    }
+  }
+
+  private boolean fieldPresent(Schema schema, String field, Schema.FieldType 
expectedType) {
+    return schema.hasField(field) && 
expectedType.equals(schema.getField(field).getType());
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java
new file mode 100644
index 00000000000..760defc70e9
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.beam.sdk.util.JsonToRowUtils.newObjectMapperWith;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.JsonToRowUtils;
+import org.apache.beam.sdk.util.RowJsonDeserializer;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+
+/**
+ * A {@link DoFn} to convert {@link PubsubMessage} with JSON payload to {@link 
Row}.
+ */
+@Internal
+@Experimental
+public class PubsubMessageToRow extends DoFn<PubsubMessage, Row> {
+  static final String TIMESTAMP_FIELD = "event_timestamp";
+  static final String ATTRIBUTES_FIELD = "attributes";
+  static final String PAYLOAD_FIELD = "payload";
+
+  private transient volatile @Nullable ObjectMapper objectMapper;
+  private Schema messageSchema;
+
+  /**
+   * Schema of the Pubsub message.
+   *
+   * <p>Required to have exactly 3 top level fields at the moment:
+   * <ul>
+   *   <li>'event_timestamp' of type {@link RowSqlTypes#TIMESTAMP}</li>
+   *   <li>'attributes' of type {@link TypeName#MAP 
MAP&lt;VARCHAR,VARCHAR&gt;}</li>
+   *   <li>'payload' of type {@link TypeName#ROW ROW&lt;...&gt;}</li>
+   * </ul>
+   *
+   * <p>Only UTF-8 JSON objects are supported.
+   */
+  public Schema getMessageSchema() {
+    return messageSchema;
+  }
+
+  public static ParDo.SingleOutput<PubsubMessage, Row> forSchema(Schema 
messageSchema) {
+    return ParDo.of(new PubsubMessageToRow(messageSchema));
+  }
+
+  private PubsubMessageToRow(Schema messageSchema) {
+    this.messageSchema = messageSchema;
+  }
+
+  @DoFn.ProcessElement
+  public void processElement(ProcessContext context) {
+    // get values for fields
+    // in the same order they're specified in schema
+    List<Object> values = getMessageSchema()
+        .getFields()
+        .stream()
+        .map(field ->
+                 getValueForField(
+                     field,
+                     context.timestamp(),
+                     context.element()))
+        .collect(toList());
+
+    context.output(
+        Row.withSchema(getMessageSchema()).addValues(values).build());
+  }
+
+  private Object getValueForField(
+      Schema.Field field,
+      Instant timestamp,
+      PubsubMessage pubsubMessage) {
+
+    switch (field.getName()) {
+      case TIMESTAMP_FIELD:
+        return timestamp;
+      case ATTRIBUTES_FIELD:
+        return pubsubMessage.getAttributeMap();
+      case PAYLOAD_FIELD:
+        return parsePayloadJsonRow(pubsubMessage);
+      default:
+        throw new IllegalArgumentException(
+            "Unexpected field '" + field.getName() + "' in top level schema"
+            + " for Pubsub message. Top level schema should only contain "
+            + "'timestamp', 'attributes', and 'payload' fields");
+    }
+  }
+
+  private Row parsePayloadJsonRow(PubsubMessage pubsubMessage) {
+    String payloadJson = new String(pubsubMessage.getPayload(), 
StandardCharsets.UTF_8);
+
+    if (objectMapper == null) {
+      objectMapper =
+          
newObjectMapperWith(RowJsonDeserializer.forSchema(getPayloadSchema()));
+    }
+
+    return JsonToRowUtils.jsonToRow(objectMapper, payloadJson);
+  }
+
+  private Schema getPayloadSchema() {
+    return getMessageSchema().getField(PAYLOAD_FIELD).getType().getRowSchema();
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/package-info.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/package-info.java
new file mode 100644
index 00000000000..2b8ac1e3191
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Table schema for {@link org.apache.beam.sdk.io.gcp.pubsub.PubsubIO}.
+ */
+@DefaultAnnotation(NonNull.class)
+package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliPubsubTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliPubsubTest.java
new file mode 100644
index 00000000000..c44ea63bc84
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliPubsubTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonTableProvider;
+import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link BeamSqlCli} Pubsub functionality.
+ */
+public class BeamSqlCliPubsubTest {
+
+  @Test
+  @Ignore("Something like this needs an emulator. TODO: BEAM-4195")
+  public void testPubsubTable() throws Exception {
+    String pubsubTopic = "projects/<probject>/topics/<topic>";
+    InMemoryMetaStore metaStore = new InMemoryMetaStore();
+    metaStore.registerProvider(new PubsubJsonTableProvider());
+
+    BeamSqlCli cli = new BeamSqlCli().metaStore(metaStore);
+
+    cli.execute(
+        "CREATE TABLE topic (\n"
+        + "event_timestamp TIMESTAMP, \n"
+        + "attributes MAP<VARCHAR, VARCHAR>, \n"
+        + "payload ROW< \n"
+        + "             `id` INTEGER, \n"
+        + "             `name` VARCHAR \n"
+        + "           > \n"
+        + ") \n"
+        + "TYPE 'pubsub' \n"
+        + "LOCATION '" + pubsubTopic + "' \n"
+        + "TBLPROPERTIES '{ \"timestampAttributeKey\" : \"ts\" }'");
+
+    cli.execute("SELECT topic.payload.name from topic");
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProviderTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProviderTest.java
new file mode 100644
index 00000000000..e4ec5747feb
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProviderTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
+
+import static junit.framework.TestCase.assertNotNull;
+import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR;
+import static org.junit.Assert.assertEquals;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.schemas.Schema;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/**
+ * Unit tests for {@link PubsubJsonTableProvider}.
+ */
+public class PubsubJsonTableProviderTest {
+
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testTableTypePubsub() {
+    PubsubJsonTableProvider provider = new PubsubJsonTableProvider();
+    assertEquals("pubsub", provider.getTableType());
+  }
+
+  @Test
+  public void testCreatesTable() {
+    PubsubJsonTableProvider provider = new PubsubJsonTableProvider();
+    Schema messageSchema = RowSqlTypes
+        .builder()
+        .withTimestampField("event_timestamp")
+        .withMapField("attributes", VARCHAR, VARCHAR)
+        .withRowField("payload", Schema.builder().build())
+        .build();
+
+    Table tableDefinition = tableDefinition().schema(messageSchema).build();
+
+    BeamSqlTable pubsubTable = provider.buildBeamSqlTable(tableDefinition);
+
+    assertNotNull(pubsubTable);
+    assertEquals(messageSchema, pubsubTable.getSchema());
+  }
+
+  @Test
+  public void testThrowsIfTimestampFieldNotProvided() {
+    PubsubJsonTableProvider provider = new PubsubJsonTableProvider();
+    Schema messageSchema = RowSqlTypes
+        .builder()
+        .withMapField("attributes", VARCHAR, VARCHAR)
+        .withRowField("payload", Schema.builder().build())
+        .build();
+
+    Table tableDefinition = tableDefinition().schema(messageSchema).build();
+
+    thrown.expectMessage("Unsupported");
+    thrown.expectMessage("'event_timestamp'");
+    provider.buildBeamSqlTable(tableDefinition);
+  }
+
+  @Test
+  public void testThrowsIfAttributesFieldNotProvided() {
+    PubsubJsonTableProvider provider = new PubsubJsonTableProvider();
+    Schema messageSchema = RowSqlTypes
+        .builder()
+        .withTimestampField("event_timestamp")
+        .withRowField("payload", Schema.builder().build())
+        .build();
+
+    Table tableDefinition = tableDefinition().schema(messageSchema).build();
+
+    thrown.expectMessage("Unsupported");
+    thrown.expectMessage("'attributes'");
+    provider.buildBeamSqlTable(tableDefinition);
+  }
+
+  @Test
+  public void testThrowsIfPayloadFieldNotProvided() {
+    PubsubJsonTableProvider provider = new PubsubJsonTableProvider();
+    Schema messageSchema = RowSqlTypes
+        .builder()
+        .withTimestampField("event_timestamp")
+        .withMapField("attributes", VARCHAR, VARCHAR)
+        .build();
+
+    Table tableDefinition = tableDefinition().schema(messageSchema).build();
+
+    thrown.expectMessage("Unsupported");
+    thrown.expectMessage("'payload'");
+    provider.buildBeamSqlTable(tableDefinition);
+  }
+
+  @Test
+  public void testThrowsIfExtraFieldsExist() {
+    PubsubJsonTableProvider provider = new PubsubJsonTableProvider();
+    Schema messageSchema = RowSqlTypes
+        .builder()
+        .withTimestampField("event_timestamp")
+        .withMapField("attributes", VARCHAR, VARCHAR)
+        .withVarcharField("someField")
+        .withRowField("payload", Schema.builder().build())
+        .build();
+
+    Table tableDefinition = tableDefinition().schema(messageSchema).build();
+
+    thrown.expectMessage("Unsupported");
+    thrown.expectMessage("'event_timestamp'");
+    provider.buildBeamSqlTable(tableDefinition);
+  }
+
+  private static Table.Builder tableDefinition() {
+    return
+        Table
+            .builder()
+            .name("FakeTable")
+            .comment("fake table")
+            .location("projects/project/topics/topic")
+            .schema(Schema.builder().build())
+            .type("pubsub")
+            .properties(JSON.parseObject("{ \"timestampAttributeKey\" : 
\"ts_field\" }"));
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRowTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRowTest.java
new file mode 100644
index 00000000000..63932def64d
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRowTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
+
+import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link PubsubMessageToRow}.
+ */
+public class PubsubMessageToRowTest implements Serializable {
+
+  @Rule
+  public transient TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void testConvertsMessages() {
+    Schema payloadSchema =
+        RowSqlTypes
+            .builder()
+            .withIntegerField("id")
+            .withVarcharField("name")
+            .build();
+
+    Schema messageSchema =
+        RowSqlTypes
+            .builder()
+            .withTimestampField("event_timestamp")
+            .withMapField("attributes", VARCHAR, VARCHAR)
+            .withRowField("payload", payloadSchema)
+            .build();
+
+    PCollection<Row> rows = pipeline
+        .apply("create",
+               Create.timestamped(
+                   message(1, map("attr", "val"), "{ \"id\" : 3, \"name\" : 
\"foo\" }"),
+                   message(2, map("bttr", "vbl"), "{ \"name\" : \"baz\", 
\"id\" : 5 }"),
+                   message(3, map("cttr", "vcl"), "{ \"id\" : 7, \"name\" : 
\"bar\" }"),
+                   message(4, map("dttr", "vdl"), "{ \"name\" : \"qaz\", 
\"id\" : 8 }")))
+        .apply("convert", PubsubMessageToRow.forSchema(messageSchema));
+
+    PAssert
+        .that(rows)
+        .containsInAnyOrder(
+            Row.withSchema(messageSchema)
+               .addValues(ts(1), map("attr", "val"), row(payloadSchema, 3, 
"foo"))
+               .build(),
+            Row.withSchema(messageSchema)
+               .addValues(ts(2), map("bttr", "vbl"), row(payloadSchema, 5, 
"baz"))
+               .build(),
+            Row.withSchema(messageSchema)
+               .addValues(ts(3), map("cttr", "vcl"), row(payloadSchema, 7, 
"bar"))
+               .build(),
+            Row.withSchema(messageSchema)
+               .addValues(ts(4), map("dttr", "vdl"), row(payloadSchema, 8, 
"qaz"))
+               .build()
+        );
+
+    pipeline.run();
+  }
+
+  private Row row(Schema schema, int id, String name) {
+    return Row.withSchema(schema).addValues(id, name).build();
+  }
+
+  private Map<String, String> map(String attr, String val) {
+    return ImmutableMap.of(attr, val);
+  }
+
+  private TimestampedValue<PubsubMessage> message(
+      int timestamp,
+      Map<String, String> attributes,
+      String payload) {
+
+    return TimestampedValue.of(
+        new PubsubMessage(payload.getBytes(StandardCharsets.UTF_8), 
attributes),
+        ts(timestamp));
+  }
+
+  private Instant ts(long timestamp) {
+    return new DateTime(timestamp).toInstant();
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 100972)
    Time Spent: 6h 10m  (was: 6h)

> Wire up PubsubIO+JSON to Beam SQL
> ---------------------------------
>
>                 Key: BEAM-4162
>                 URL: https://issues.apache.org/jira/browse/BEAM-4162
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Assignee: Anton Kedin
>            Priority: Major
>          Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> Read JSON messages from Pubsub, convert them to Rows (BEAM-4160), wire up to 
> Beam SQL.
>  
> Use publication time as event timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to