[
https://issues.apache.org/jira/browse/BEAM-10336?focusedWorklogId=455158&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-455158
]
ASF GitHub Bot logged work on BEAM-10336:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Jul/20 00:13
Start Date: 07/Jul/20 00:13
Worklog Time Spent: 10m
Work Description: TheNeuralBit commented on a change in pull request
#12090:
URL: https://github.com/apache/beam/pull/12090#discussion_r450535581
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaCapableIOProvider.java
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.DLQ_TAG;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.MAIN_TAG;
+import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
+import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.TIMESTAMP_FIELD;
+import static org.apache.beam.sdk.schemas.Schema.TypeName.ROW;
+import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidSchemaException;
+import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.transforms.DropFields;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ToJson;
+import org.apache.beam.sdk.transforms.WithTimestamps;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * {@link SchemaCapableIOProvider} to create {@link PubsubSchemaIO} that
implements {@link
+ * SchemaIO}.
+ *
+ * <p>If useFlatSchema of {@link PubsubSchemaIO} is not set, schema must
contain exactly fields
+ * 'event_timestamp', 'attributes, and 'payload'. Else, it must contain just
'event_timestamp'. See
+ * {@link PubsubMessageToRow} for details.
+ *
+ * <p>{@link #configurationSchema()} consists of two attributes,
timestampAttributeKey and
+ * deadLetterQueue.
+ *
+ * <p>timestampAttributeKey is an optional attribute key of the Pubsub message
from which to extract
+ * the event timestamp.
+ *
+ * <p>This attribute has to conform to the same requirements as in {@link
+ * PubsubIO.Read.Builder#withTimestampAttribute}.
+ *
+ * <p>Short version: it has to be either millis since epoch or string in RFC
3339 format.
+ *
+ * <p>If the attribute is specified then event timestamps will be extracted
from the specified
+ * attribute. If it is not specified then message publish timestamp will be
used.
+ *
+ * <p>deadLetterQueue is an optional topic path which will be used as a dead
letter queue.
+ *
+ * <p>Messages that cannot be processed will be sent to this topic. If it is
not specified then
+ * exception will be thrown for errors during processing causing the pipeline
to crash.
Review comment:
Looks great, thank you!
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/SchemaIO.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.io;
+
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An abstraction to create schema capable and aware IOs. The interface is
intended to be used in
+ * conjunction with the interface {@link SchemaCapableIOProvider}.
+ *
+ * <p>The interfaces can be implemented to enable IOs for SDKs in addition to
Beam SQL.
Review comment:
Beam java is itself an SDK, so we should be clear this will enable IOs
in _other_ SDKs
```suggestion
* <p>The interfaces can be implemented to make IOs available in other SDKs
in addition to Beam SQL.
```
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java
##########
@@ -122,14 +115,14 @@
@Experimental
class PubsubIOJsonTable extends BaseBeamTable implements Serializable {
Review comment:
Got it, SGTM. It'll probably be easier to verify when there's more than
one implementation anyway :)
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/SchemaCapableIOProvider.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.io;
+
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+
+/** Provider to create {@link SchemaIO}. */
+public interface SchemaCapableIOProvider {
Review comment:
Sorry I should've brought this up before. We should add some disclaimers
here and in SchemaIO (and the exceptions should at least get the annotations):
```suggestion
/**
* Provider to create {@link SchemaIO} instances for use in Beam SQL and
other SDKs.
*
* <p><b>Internal only:</b> This interface is actively being worked on and
it will likely change as we provide
* implementations for more standard Beam IOs. We provide no backwards
compatibility guarantees and
* it should not be implemented outside of the Beam repository.
*/
@Internal
@Experimental(Kind.SCHEMAS)
public interface SchemaCapableIOProvider {
```
I think it's very likely we'll make some changes to these interfaces (e.g.
@tysonjh and @kennknowles have a good
[suggestion](https://docs.google.com/document/d/1ic3P8EVGHIydHQ-VMDKbN9kEdwm7sBXMo80VrhwksvI/edit?disco=AAAAGn4u7-4)
in the doc), so we don't want anyone other than us trying to implement them
yet. Let's do this to make that clear.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 455158)
Time Spent: 2h 40m (was: 2.5h)
> Move PubsubJsonTableProvider logic to core Beam
> -----------------------------------------------
>
> Key: BEAM-10336
> URL: https://issues.apache.org/jira/browse/BEAM-10336
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Scott Lukas
> Assignee: Scott Lukas
> Priority: P2
> Labels: schema-io
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
> Create abstraction interfaces for Schema Aware IOs within the java core sdk.
> Implement the interface for pubsub, moving the PubsubJsonTableProvider logic.
> Additional details:
> [https://docs.google.com/document/d/1ic3P8EVGHIydHQ-VMDKbN9kEdwm7sBXMo80VrhwksvI/edit?ts=5ef631ef#]
> [~bhulette]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)