[
https://issues.apache.org/jira/browse/BEAM-10407?focusedWorklogId=461269&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-461269
]
ASF GitHub Bot logged work on BEAM-10407:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Jul/20 20:52
Start Date: 20/Jul/20 20:52
Worklog Time Spent: 10m
Work Description: sclukas77 commented on a change in pull request #12202:
URL: https://github.com/apache/beam/pull/12202#discussion_r457684377
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
##########
@@ -17,63 +17,32 @@
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
-import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
-
-import com.alibaba.fastjson.JSONObject;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.auto.service.AutoService;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
-import org.apache.beam.sdk.extensions.sql.meta.provider.InvalidTableException;
+import
org.apache.beam.sdk.extensions.sql.meta.provider.SchemaCapableIOTableProviderWrapper;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubSchemaCapableIOProvider;
-import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
-import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
-import org.apache.beam.sdk.schemas.io.SchemaIO;
-import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer;
-import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
/**
- * {@link TableProvider} for {@link PubsubIOJsonTable} which wraps {@link
PubsubIO} for consumption
- * by Beam SQL.
+ * {@link TableProvider} for {@link PubsubIO} for consumption by Beam SQL.
+ *
+ * <p>Passes the {@link PubsubSchemaCapableIOProvider} to the generalized
table provider wrapper,
+ * {@link SchemaCapableIOTableProviderWrapper}, for Pubsub specific behavior.
*/
@Internal
@Experimental
@AutoService(TableProvider.class)
-public class PubsubJsonTableProvider extends InMemoryMetaTableProvider {
-
+public class PubsubJsonTableProvider extends
SchemaCapableIOTableProviderWrapper {
@Override
- public String getTableType() {
- return "pubsub";
+ public SchemaIOProvider getSchemaIOProvider() {
+ return new PubsubSchemaCapableIOProvider();
}
@Override
- public BeamSqlTable buildBeamSqlTable(Table tableDefinition) {
- JSONObject tableProperties = tableDefinition.getProperties();
- PubsubSchemaCapableIOProvider ioProvider = new
PubsubSchemaCapableIOProvider();
-
- try {
- RowJsonDeserializer deserializer =
- RowJsonDeserializer.forSchema(ioProvider.configurationSchema())
-
.withNullBehavior(RowJsonDeserializer.NullBehavior.ACCEPT_MISSING_OR_NULL);
-
- Row configurationRow =
-
newObjectMapperWith(deserializer).readValue(tableProperties.toString(),
Row.class);
-
- SchemaIO pubsubSchemaIO =
- ioProvider.from(
- tableDefinition.getLocation(), configurationRow,
tableDefinition.getSchema());
-
- return PubsubIOJsonTable.fromSchemaIO(pubsubSchemaIO);
- } catch (InvalidConfigurationException | InvalidSchemaException e) {
- throw new InvalidTableException(e.getMessage());
- } catch (JsonProcessingException e) {
- throw new AssertionError(
- "Failed to re-parse TBLPROPERTIES JSON " +
tableProperties.toString());
- }
+ public String getTableType() {
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 461269)
Time Spent: 5.5h (was: 5h 20m)
> Move Avro and Parquet provider logic to core beam
> -------------------------------------------------
>
> Key: BEAM-10407
> URL: https://issues.apache.org/jira/browse/BEAM-10407
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Scott Lukas
> Assignee: Scott Lukas
> Priority: P2
> Labels: schema-io
> Time Spent: 5.5h
> Remaining Estimate: 0h
>
> Implement SchemaIO and SchemaCapableIOProvider for Avro and Parquet.
> Additional details:
> [https://docs.google.com/document/d/1ic3P8EVGHIydHQ-VMDKbN9kEdwm7sBXMo80VrhwksvI/edit#heading=h.x9snb54sjlu9]
> [~bhulette]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)