TheNeuralBit commented on a change in pull request #12090: URL: https://github.com/apache/beam/pull/12090#discussion_r450535581
########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaCapableIOProvider.java ########## @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.DLQ_TAG; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.MAIN_TAG; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD; +import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.TIMESTAMP_FIELD; +import static org.apache.beam.sdk.schemas.Schema.TypeName.ROW; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.io.InvalidConfigurationException; +import org.apache.beam.sdk.schemas.io.InvalidSchemaException; +import org.apache.beam.sdk.schemas.io.SchemaCapableIOProvider; +import org.apache.beam.sdk.schemas.io.SchemaIO; +import org.apache.beam.sdk.schemas.transforms.DropFields; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ToJson; +import org.apache.beam.sdk.transforms.WithTimestamps; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; + +/** + * {@link SchemaCapableIOProvider} to create {@link PubsubSchemaIO} that implements {@link + * SchemaIO}. + * + * <p>If useFlatSchema of {@link PubsubSchemaIO} is not set, schema must contain exactly fields + * 'event_timestamp', 'attributes, and 'payload'. Else, it must contain just 'event_timestamp'. See + * {@link PubsubMessageToRow} for details. + * + * <p>{@link #configurationSchema()} consists of two attributes, timestampAttributeKey and + * deadLetterQueue. + * + * <p>timestampAttributeKey is an optional attribute key of the Pubsub message from which to extract + * the event timestamp. + * + * <p>This attribute has to conform to the same requirements as in {@link + * PubsubIO.Read.Builder#withTimestampAttribute}. + * + * <p>Short version: it has to be either millis since epoch or string in RFC 3339 format. + * + * <p>If the attribute is specified then event timestamps will be extracted from the specified + * attribute. If it is not specified then message publish timestamp will be used. + * + * <p>deadLetterQueue is an optional topic path which will be used as a dead letter queue. + * + * <p>Messages that cannot be processed will be sent to this topic. If it is not specified then + * exception will be thrown for errors during processing causing the pipeline to crash. Review comment: Looks great, thank you! ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/SchemaIO.java ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.io; + +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; + +/** + * An abstraction to create schema capable and aware IOs. The interface is intended to be used in + * conjunction with the interface {@link SchemaCapableIOProvider}. + * + * <p>The interfaces can be implemented to enable IOs for SDKs in addition to Beam SQL. Review comment: Beam java is itself an SDK, so we should be clear this will enable IOs in _other_ SDKs ```suggestion * <p>The interfaces can be implemented to make IOs available in other SDKs in addition to Beam SQL. ``` ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java ########## @@ -122,14 +115,14 @@ @Experimental class PubsubIOJsonTable extends BaseBeamTable implements Serializable { Review comment: Got it, SGTM. It'll probably be easier to verify when there's more than one implementation anyway :) ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/SchemaCapableIOProvider.java ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.io; + +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; + +/** Provider to create {@link SchemaIO}. */ +public interface SchemaCapableIOProvider { Review comment: Sorry I should've brought this up before. We should add some disclaimers here and in SchemaIO (and the exceptions should at least get the annotations): ```suggestion /** * Provider to create {@link SchemaIO} instances for use in Beam SQL and other SDKs. * * <p><b>Internal only:</b> This interface is actively being worked on and it will likely change as we provide * implementations for more standard Beam IOs. We provide no backwards compatibility guarantees and * it should not be implemented outside of the Beam repository. */ @Internal @Experimental(Kind.SCHEMAS) public interface SchemaCapableIOProvider { ``` I think it's very likely we'll make some changes to these interfaces (e.g. @tysonjh and @kennknowles have a good [suggestion](https://docs.google.com/document/d/1ic3P8EVGHIydHQ-VMDKbN9kEdwm7sBXMo80VrhwksvI/edit?disco=AAAAGn4u7-4) in the doc), so we don't want anyone other than us trying to implement them yet. Let's do this to make that clear. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org