TheNeuralBit commented on a change in pull request #12090:
URL: https://github.com/apache/beam/pull/12090#discussion_r446413871



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/InvalidConfigurationException.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.io;
+
+/** Exception thrown when the request for a table is invalid, such as invalid 
metadata. */

Review comment:
       ```suggestion
   /** Exception thrown when the configuration for a {@link SchemaIO} is 
invalid. */
   ```

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/SchemaCapableIOProvider.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.io;
+
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+public interface SchemaCapableIOProvider {
+  /** Returns an id that uniquely represents this IO. */
+  String identifier();
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is 
distinct from the schema
+   * of the data source itself.
+   */
+  Schema configurationSchema();
+
+  /**
+   * Produce a SchemaIO given a String representing the data's location, the 
schema of the data that
+   * resides there, and some IO-specific configuration object.
+   */

Review comment:
       Can you add a note that this can throw an 
`InvalidConfigurationException`?

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
##########
@@ -54,125 +47,29 @@ public String getTableType() {
   }
 
   @Override
-  public BeamSqlTable buildBeamSqlTable(Table tableDefintion) {
-    JSONObject tableProperties = tableDefintion.getProperties();
+  public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
+    JSONObject tableProperties = tableDefinition.getProperties();
     String timestampAttributeKey = 
tableProperties.getString("timestampAttributeKey");
     String deadLetterQueue = tableProperties.getString("deadLetterQueue");
-    validateDlq(deadLetterQueue);
-
-    Schema schema = tableDefintion.getSchema();
-    validateEventTimestamp(schema);
-
-    PubsubIOTableConfiguration config =
-        PubsubIOTableConfiguration.builder()
-            .setSchema(schema)
-            .setTimestampAttribute(timestampAttributeKey)
-            .setDeadLetterQueue(deadLetterQueue)
-            .setTopic(tableDefintion.getLocation())
-            .setUseFlatSchema(!definesAttributeAndPayload(schema))
-            .build();
-
-    return PubsubIOJsonTable.withConfiguration(config);
-  }
-
-  private void validateEventTimestamp(Schema schema) {
-    if (!fieldPresent(schema, TIMESTAMP_FIELD, TIMESTAMP)) {
-      throw new InvalidTableException(
-          "Unsupported schema specified for Pubsub source in CREATE TABLE."
-              + "CREATE TABLE for Pubsub topic must include at least 
'event_timestamp' field of "
-              + "type 'TIMESTAMP'");
-    }
-  }
-
-  private boolean definesAttributeAndPayload(Schema schema) {
-    return fieldPresent(
-            schema, ATTRIBUTES_FIELD, 
Schema.FieldType.map(VARCHAR.withNullable(false), VARCHAR))
-        && (schema.hasField(PAYLOAD_FIELD)
-            && 
ROW.equals(schema.getField(PAYLOAD_FIELD).getType().getTypeName()));
-  }
-
-  private boolean fieldPresent(Schema schema, String field, Schema.FieldType 
expectedType) {
-    return schema.hasField(field)
-        && expectedType.equivalent(
-            schema.getField(field).getType(), 
Schema.EquivalenceNullablePolicy.IGNORE);
-  }
-
-  private void validateDlq(String deadLetterQueue) {
-    if (deadLetterQueue != null && deadLetterQueue.isEmpty()) {
-      throw new InvalidTableException("Dead letter queue topic name is not 
specified");
-    }
-  }
-
-  @AutoValue
-  public abstract static class PubsubIOTableConfiguration implements 
Serializable {
-    public boolean useDlq() {
-      return getDeadLetterQueue() != null;
-    }
-
-    public boolean useTimestampAttribute() {
-      return getTimestampAttribute() != null;
-    }
-
-    /** Determines whether or not the messages should be represented with a 
flattened schema. */
-    abstract boolean getUseFlatSchema();
-
-    /**
-     * Optional attribute key of the Pubsub message from which to extract the 
event timestamp.
-     *
-     * <p>This attribute has to conform to the same requirements as in {@link
-     * PubsubIO.Read.Builder#withTimestampAttribute}.
-     *
-     * <p>Short version: it has to be either millis since epoch or string in 
RFC 3339 format.
-     *
-     * <p>If the attribute is specified then event timestamps will be 
extracted from the specified
-     * attribute. If it is not specified then message publish timestamp will 
be used.
-     */
-    @Nullable
-    abstract String getTimestampAttribute();
-
-    /**
-     * Optional topic path which will be used as a dead letter queue.
-     *
-     * <p>Messages that cannot be processed will be sent to this topic. If it 
is not specified then
-     * exception will be thrown for errors during processing causing the 
pipeline to crash.
-     */
-    @Nullable
-    abstract String getDeadLetterQueue();
 
-    /**
-     * Pubsub topic name.
-     *
-     * <p>Topic is the only way to specify the Pubsub source. Explicitly 
specifying the subscription
-     * is not supported at the moment. Subscriptions are automatically created 
(but not deleted).
-     */
-    abstract String getTopic();
+    Schema schema = tableDefinition.getSchema();
+    String location = tableDefinition.getLocation();
+    Schema dataSchema = tableDefinition.getSchema();
 
-    /**
-     * Table schema, describes Pubsub message schema.
-     *
-     * <p>If {@link #getUseFlatSchema()} is not set, schema must contain 
exactly fields
-     * 'event_timestamp', 'attributes, and 'payload'. Else, it must contain 
just 'event_timestamp'.
-     * See {@linkA PubsubMessageToRow} for details.
-     */
-    public abstract Schema getSchema();
+    PubsubSchemaCapableIOProvider ioProvider = new 
PubsubSchemaCapableIOProvider();
+    Schema configurationSchema = ioProvider.configurationSchema();
 
-    static Builder builder() {
-      return new 
AutoValue_PubsubJsonTableProvider_PubsubIOTableConfiguration.Builder();
-    }
-
-    @AutoValue.Builder
-    abstract static class Builder {
-      abstract Builder setUseFlatSchema(boolean useFlatSchema);
-
-      abstract Builder setSchema(Schema schema);
-
-      abstract Builder setTimestampAttribute(String timestampAttribute);
-
-      abstract Builder setDeadLetterQueue(String deadLetterQueue);
-
-      abstract Builder setTopic(String topic);
+    Row configurationRow =
+        Row.withSchema(configurationSchema)
+            .withFieldValue("timestampAttributeKey", timestampAttributeKey)
+            .withFieldValue("deadLetterQueue", deadLetterQueue)
+            .build();
 
-      abstract PubsubIOTableConfiguration build();
+    try {
+      SchemaIO pubsubSchemaIO = ioProvider.from(location, configurationRow, 
dataSchema);
+      return PubsubIOJsonTable.withConfiguration(pubsubSchemaIO, schema);
+    } catch (Exception InvalidConfigurationException) {
+      throw new InvalidTableException("Invalid configuration of table");

Review comment:
       This should just catch `InvalidConfigurationException e`, not any 
`Exception`.
   
   Also you should include the information from the original exception when 
re-throwing. Either use `e.getMessage()` to insert the original message into 
the new one, or add the exception as a cause (`new InvalidTableException(msg, 
e)`), or both.
   
   I think that should fix the unit test that's failing.

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
##########
@@ -54,125 +47,29 @@ public String getTableType() {
   }
 
   @Override
-  public BeamSqlTable buildBeamSqlTable(Table tableDefintion) {
-    JSONObject tableProperties = tableDefintion.getProperties();
+  public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
+    JSONObject tableProperties = tableDefinition.getProperties();
     String timestampAttributeKey = 
tableProperties.getString("timestampAttributeKey");
     String deadLetterQueue = tableProperties.getString("deadLetterQueue");
-    validateDlq(deadLetterQueue);
-
-    Schema schema = tableDefintion.getSchema();
-    validateEventTimestamp(schema);
-
-    PubsubIOTableConfiguration config =
-        PubsubIOTableConfiguration.builder()
-            .setSchema(schema)
-            .setTimestampAttribute(timestampAttributeKey)
-            .setDeadLetterQueue(deadLetterQueue)
-            .setTopic(tableDefintion.getLocation())
-            .setUseFlatSchema(!definesAttributeAndPayload(schema))
-            .build();
-
-    return PubsubIOJsonTable.withConfiguration(config);
-  }
-
-  private void validateEventTimestamp(Schema schema) {
-    if (!fieldPresent(schema, TIMESTAMP_FIELD, TIMESTAMP)) {
-      throw new InvalidTableException(
-          "Unsupported schema specified for Pubsub source in CREATE TABLE."
-              + "CREATE TABLE for Pubsub topic must include at least 
'event_timestamp' field of "
-              + "type 'TIMESTAMP'");
-    }
-  }
-
-  private boolean definesAttributeAndPayload(Schema schema) {
-    return fieldPresent(
-            schema, ATTRIBUTES_FIELD, 
Schema.FieldType.map(VARCHAR.withNullable(false), VARCHAR))
-        && (schema.hasField(PAYLOAD_FIELD)
-            && 
ROW.equals(schema.getField(PAYLOAD_FIELD).getType().getTypeName()));
-  }
-
-  private boolean fieldPresent(Schema schema, String field, Schema.FieldType 
expectedType) {
-    return schema.hasField(field)
-        && expectedType.equivalent(
-            schema.getField(field).getType(), 
Schema.EquivalenceNullablePolicy.IGNORE);
-  }
-
-  private void validateDlq(String deadLetterQueue) {
-    if (deadLetterQueue != null && deadLetterQueue.isEmpty()) {
-      throw new InvalidTableException("Dead letter queue topic name is not 
specified");
-    }
-  }
-
-  @AutoValue
-  public abstract static class PubsubIOTableConfiguration implements 
Serializable {
-    public boolean useDlq() {
-      return getDeadLetterQueue() != null;
-    }
-
-    public boolean useTimestampAttribute() {
-      return getTimestampAttribute() != null;
-    }
-
-    /** Determines whether or not the messages should be represented with a 
flattened schema. */
-    abstract boolean getUseFlatSchema();
-
-    /**
-     * Optional attribute key of the Pubsub message from which to extract the 
event timestamp.
-     *
-     * <p>This attribute has to conform to the same requirements as in {@link
-     * PubsubIO.Read.Builder#withTimestampAttribute}.
-     *
-     * <p>Short version: it has to be either millis since epoch or string in 
RFC 3339 format.
-     *
-     * <p>If the attribute is specified then event timestamps will be 
extracted from the specified
-     * attribute. If it is not specified then message publish timestamp will 
be used.
-     */
-    @Nullable
-    abstract String getTimestampAttribute();
-
-    /**
-     * Optional topic path which will be used as a dead letter queue.
-     *
-     * <p>Messages that cannot be processed will be sent to this topic. If it 
is not specified then
-     * exception will be thrown for errors during processing causing the 
pipeline to crash.
-     */
-    @Nullable
-    abstract String getDeadLetterQueue();
 
-    /**
-     * Pubsub topic name.
-     *
-     * <p>Topic is the only way to specify the Pubsub source. Explicitly 
specifying the subscription
-     * is not supported at the moment. Subscriptions are automatically created 
(but not deleted).
-     */
-    abstract String getTopic();
+    Schema schema = tableDefinition.getSchema();
+    String location = tableDefinition.getLocation();
+    Schema dataSchema = tableDefinition.getSchema();
 
-    /**
-     * Table schema, describes Pubsub message schema.
-     *
-     * <p>If {@link #getUseFlatSchema()} is not set, schema must contain 
exactly fields
-     * 'event_timestamp', 'attributes, and 'payload'. Else, it must contain 
just 'event_timestamp'.
-     * See {@linkA PubsubMessageToRow} for details.
-     */
-    public abstract Schema getSchema();

Review comment:
       I don't think we want to lose the information in these doc comments. 
Could you move it into the javadoc for `PubSubSchemaCapableIOProvider`?
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to