sclukas77 commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r457684377



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
##########
@@ -17,63 +17,32 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
 
-import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
-
-import com.alibaba.fastjson.JSONObject;
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.auto.service.AutoService;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import 
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
-import org.apache.beam.sdk.extensions.sql.meta.provider.InvalidTableException;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.SchemaCapableIOTableProviderWrapper;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubSchemaCapableIOProvider;
-import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
-import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
-import org.apache.beam.sdk.schemas.io.SchemaIO;
-import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer;
-import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
 
 /**
- * {@link TableProvider} for {@link PubsubIOJsonTable} which wraps {@link 
PubsubIO} for consumption
- * by Beam SQL.
+ * {@link TableProvider} for {@link PubsubIO} for consumption by Beam SQL.
+ *
+ * <p>Passes the {@link PubsubSchemaCapableIOProvider} to the generalized 
table provider wrapper,
+ * {@link SchemaCapableIOTableProviderWrapper}, for Pubsub specific behavior.
  */
 @Internal
 @Experimental
 @AutoService(TableProvider.class)
-public class PubsubJsonTableProvider extends InMemoryMetaTableProvider {
-
+public class PubsubJsonTableProvider extends 
SchemaCapableIOTableProviderWrapper {
   @Override
-  public String getTableType() {
-    return "pubsub";
+  public SchemaIOProvider getSchemaIOProvider() {
+    return new PubsubSchemaCapableIOProvider();
   }
 
   @Override
-  public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
-    JSONObject tableProperties = tableDefinition.getProperties();
-    PubsubSchemaCapableIOProvider ioProvider = new 
PubsubSchemaCapableIOProvider();
-
-    try {
-      RowJsonDeserializer deserializer =
-          RowJsonDeserializer.forSchema(ioProvider.configurationSchema())
-              
.withNullBehavior(RowJsonDeserializer.NullBehavior.ACCEPT_MISSING_OR_NULL);
-
-      Row configurationRow =
-          
newObjectMapperWith(deserializer).readValue(tableProperties.toString(), 
Row.class);
-
-      SchemaIO pubsubSchemaIO =
-          ioProvider.from(
-              tableDefinition.getLocation(), configurationRow, 
tableDefinition.getSchema());
-
-      return PubsubIOJsonTable.fromSchemaIO(pubsubSchemaIO);
-    } catch (InvalidConfigurationException | InvalidSchemaException e) {
-      throw new InvalidTableException(e.getMessage());
-    } catch (JsonProcessingException e) {
-      throw new AssertionError(
-          "Failed to re-parse TBLPROPERTIES JSON " + 
tableProperties.toString());
-    }
+  public String getTableType() {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to