sclukas77 commented on a change in pull request #12202: URL: https://github.com/apache/beam/pull/12202#discussion_r457684377
########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java ########## @@ -17,63 +17,32 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub; -import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith; - -import com.alibaba.fastjson.JSONObject; -import com.fasterxml.jackson.core.JsonProcessingException; import com.google.auto.service.AutoService; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Internal; -import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; -import org.apache.beam.sdk.extensions.sql.meta.Table; -import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider; -import org.apache.beam.sdk.extensions.sql.meta.provider.InvalidTableException; +import org.apache.beam.sdk.extensions.sql.meta.provider.SchemaCapableIOTableProviderWrapper; import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubSchemaCapableIOProvider; -import org.apache.beam.sdk.schemas.io.InvalidConfigurationException; -import org.apache.beam.sdk.schemas.io.InvalidSchemaException; -import org.apache.beam.sdk.schemas.io.SchemaIO; -import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer; -import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.schemas.io.SchemaIOProvider; /** - * {@link TableProvider} for {@link PubsubIOJsonTable} which wraps {@link PubsubIO} for consumption - * by Beam SQL. + * {@link TableProvider} for {@link PubsubIO} for consumption by Beam SQL. + * + * <p>Passes the {@link PubsubSchemaCapableIOProvider} to the generalized table provider wrapper, + * {@link SchemaCapableIOTableProviderWrapper}, for Pubsub specific behavior. */ @Internal @Experimental @AutoService(TableProvider.class) -public class PubsubJsonTableProvider extends InMemoryMetaTableProvider { - +public class PubsubJsonTableProvider extends SchemaCapableIOTableProviderWrapper { @Override - public String getTableType() { - return "pubsub"; + public SchemaIOProvider getSchemaIOProvider() { + return new PubsubSchemaCapableIOProvider(); } @Override - public BeamSqlTable buildBeamSqlTable(Table tableDefinition) { - JSONObject tableProperties = tableDefinition.getProperties(); - PubsubSchemaCapableIOProvider ioProvider = new PubsubSchemaCapableIOProvider(); - - try { - RowJsonDeserializer deserializer = - RowJsonDeserializer.forSchema(ioProvider.configurationSchema()) - .withNullBehavior(RowJsonDeserializer.NullBehavior.ACCEPT_MISSING_OR_NULL); - - Row configurationRow = - newObjectMapperWith(deserializer).readValue(tableProperties.toString(), Row.class); - - SchemaIO pubsubSchemaIO = - ioProvider.from( - tableDefinition.getLocation(), configurationRow, tableDefinition.getSchema()); - - return PubsubIOJsonTable.fromSchemaIO(pubsubSchemaIO); - } catch (InvalidConfigurationException | InvalidSchemaException e) { - throw new InvalidTableException(e.getMessage()); - } catch (JsonProcessingException e) { - throw new AssertionError( - "Failed to re-parse TBLPROPERTIES JSON " + tableProperties.toString()); - } + public String getTableType() { Review comment: Done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org