This is an automated email from the ASF dual-hosted git repository.
johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c490304040a Moving Pubsub Transforms to beam (#27366)
c490304040a is described below
commit c490304040a3cf5782bda412a8395dda0fb63554
Author: Dip Patel <[email protected]>
AuthorDate: Tue Jul 18 08:59:41 2023 -0500
Moving Pubsub Transforms to beam (#27366)
---
.../PubsubReadSchemaTransformConfiguration.java | 160 ++---
.../pubsub/PubsubReadSchemaTransformProvider.java | 329 ++++-----
.../PubsubSchemaTransformMessageToRowFactory.java | 179 -----
.../PubsubWriteSchemaTransformConfiguration.java | 174 +----
.../pubsub/PubsubWriteSchemaTransformProvider.java | 447 +++---------
.../PubsubReadSchemaTransformProviderTest.java | 419 ++++-------
...bsubSchemaTransformMessageToRowFactoryTest.java | 337 ---------
.../PubsubWriteSchemaTransformProviderIT.java | 180 -----
.../PubsubWriteSchemaTransformProviderTest.java | 786 ---------------------
9 files changed, 459 insertions(+), 2552 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformConfiguration.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformConfiguration.java
index f663f60f09b..befb22ca6dc 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformConfiguration.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformConfiguration.java
@@ -17,11 +17,13 @@
*/
package org.apache.beam.sdk.io.gcp.pubsub;
+import com.google.api.client.util.Clock;
import com.google.auto.value.AutoValue;
import javax.annotation.Nullable;
+import
org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
/**
* Configuration for reading from Pub/Sub.
@@ -33,137 +35,57 @@ import
org.apache.beam.sdk.schemas.annotations.DefaultSchema;
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class PubsubReadSchemaTransformConfiguration {
+ @SchemaFieldDescription(
+ "The name of the topic to consume data from. If a topic is specified, "
+ + " will create a new subscription for that topic and start
consuming from that point. "
+ + "Either a topic or a subscription must be provided. "
+ + "Format: projects/${PROJECT}/topics/${TOPIC}")
+ public abstract @Nullable String getTopic();
+
+ @SchemaFieldDescription(
+ "The name of the subscription to consume data. "
+ + "Either a topic or subscription must be provided. "
+ + "Format: projects/${PROJECT}/subscriptions/${SUBSCRIPTION}")
+ public abstract @Nullable String getSubscription();
+
+ @SchemaFieldDescription(
+ "The encoding format for the data stored in Pubsub. Valid options are: "
+ + PubsubReadSchemaTransformProvider.VALID_FORMATS_STR)
+ public abstract String getFormat(); // AVRO, JSON
+
+ @SchemaFieldDescription(
+ "The schema in which the data is encoded in the Pubsub topic. "
+ + "For AVRO data, this is a schema defined with AVRO schema syntax "
+ + "(https://avro.apache.org/docs/1.10.2/spec.html#schemas). "
+ + "For JSON data, this is a schema defined with JSON-schema syntax
(https://json-schema.org/).")
+ public abstract String getSchema();
+
+ // Used for testing only.
+ public abstract @Nullable PubsubTestClientFactory getClientFactory();
+
+ // Used for testing only.
+ public abstract @Nullable Clock getClock();
- /** Instantiates a {@link PubsubReadSchemaTransformConfiguration.Builder}. */
public static Builder builder() {
return new AutoValue_PubsubReadSchemaTransformConfiguration.Builder();
}
- /** The expected schema of the Pub/Sub message. */
- public abstract Schema getDataSchema();
-
- /**
- * The Pub/Sub topic path to write failures.
- *
- * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on
the format of the dead
- * letter queue topic string.
- */
- @Nullable
- public abstract String getDeadLetterQueue();
-
- /**
- * The expected format of the Pub/Sub message.
- *
- * <p>Used to retrieve the {@link
org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} from
- * {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}.
- */
- @Nullable
- public abstract String getFormat();
-
- /** Used by the ProtoPayloadSerializerProvider when serializing from a
Pub/Sub message. */
- @Nullable
- public abstract String getProtoClass();
-
- /**
- * The subscription from which to read Pub/Sub messages.
- *
- * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more
details on the format of
- * the subscription string.
- */
- @Nullable
- public abstract String getSubscription();
-
- /** Used by the ThriftPayloadSerializerProvider when serializing from a
Pub/Sub message. */
- @Nullable
- public abstract String getThriftClass();
-
- /** Used by the ThriftPayloadSerializerProvider when serializing from a
Pub/Sub message. */
- @Nullable
- public abstract String getThriftProtocolFactoryClass();
-
- /**
- * When reading from Cloud Pub/Sub where record timestamps are provided as
Pub/Sub message
- * attributes, specifies the name of the attribute that contains the
timestamp.
- */
- @Nullable
- public abstract String getTimestampAttribute();
-
- /**
- * When reading from Cloud Pub/Sub where unique record identifiers are
provided as Pub/Sub message
- * attributes, specifies the name of the attribute containing the unique
identifier.
- */
- @Nullable
- public abstract String getIdAttribute();
-
- /**
- * The topic from which to read Pub/Sub messages.
- *
- * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on
the format of the
- * topic string.
- */
- @Nullable
- public abstract String getTopic();
-
@AutoValue.Builder
public abstract static class Builder {
+ public abstract Builder setTopic(@Nullable String topic);
- /** The expected schema of the Pub/Sub message. */
- public abstract Builder setDataSchema(Schema value);
-
- /**
- * The Pub/Sub topic path to write failures.
- *
- * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details
on the format of the
- * dead letter queue topic string.
- */
- public abstract Builder setDeadLetterQueue(String value);
-
- /**
- * The expected format of the Pub/Sub message.
- *
- * <p>Used to retrieve the {@link
org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer}
- * from {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}.
- */
- public abstract Builder setFormat(String value);
-
- /** Used by the ProtoPayloadSerializerProvider when serializing from a
Pub/Sub message. */
- public abstract Builder setProtoClass(String value);
-
- /**
- * The subscription from which to read Pub/Sub messages.
- *
- * <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more
details on the format of
- * the subscription string.
- */
- public abstract Builder setSubscription(String value);
-
- /** Used by the ThriftPayloadSerializerProvider when serializing from a
Pub/Sub message. */
- public abstract Builder setThriftClass(String value);
+ public abstract Builder setSubscription(@Nullable String subscription);
- /** Used by the ThriftPayloadSerializerProvider when serializing from a
Pub/Sub message. */
- public abstract Builder setThriftProtocolFactoryClass(String value);
+ public abstract Builder setFormat(String format);
- /**
- * When reading from Cloud Pub/Sub where record timestamps are provided as
Pub/Sub message
- * attributes, specifies the name of the attribute that contains the
timestamp.
- */
- public abstract Builder setTimestampAttribute(String value);
+ public abstract Builder setSchema(String schema);
- /**
- * When reading from Cloud Pub/Sub where unique record identifiers are
provided as Pub/Sub
- * message attributes, specifies the name of the attribute containing the
unique identifier.
- */
- public abstract Builder setIdAttribute(String value);
+ // Used for testing only.
+ public abstract Builder setClientFactory(@Nullable PubsubTestClientFactory
clientFactory);
- /**
- * The topic from which to read Pub/Sub messages.
- *
- * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details
on the format of the
- * topic string.
- */
- public abstract Builder setTopic(String value);
+ // Used for testing only.
+ public abstract Builder setClock(@Nullable Clock clock);
- /** Builds a {@link PubsubReadSchemaTransformConfiguration} instance. */
public abstract PubsubReadSchemaTransformConfiguration build();
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java
index cec07dafef4..c0e8880d028 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProvider.java
@@ -17,23 +17,39 @@
*/
package org.apache.beam.sdk.io.gcp.pubsub;
-import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.DLQ_TAG;
-import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.MAIN_TAG;
-
import com.google.api.client.util.Clock;
import com.google.auto.service.AutoService;
+import java.io.Serializable;
+import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
+import java.util.Objects;
+import java.util.Set;
+import
org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.utils.JsonUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
/**
* An implementation of {@link TypedSchemaTransformProvider} for Pub/Sub reads
configured using
@@ -43,196 +59,191 @@ import
org.checkerframework.checker.nullness.qual.Nullable;
* provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
* repository.
*/
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-@Internal
@AutoService(SchemaTransformProvider.class)
public class PubsubReadSchemaTransformProvider
extends
TypedSchemaTransformProvider<PubsubReadSchemaTransformConfiguration> {
- static final String OUTPUT_TAG = "OUTPUT";
- /** Returns the expected class of the configuration. */
- @Override
- protected Class<PubsubReadSchemaTransformConfiguration> configurationClass()
{
- return PubsubReadSchemaTransformConfiguration.class;
- }
+ public static final String VALID_FORMATS_STR = "AVRO,JSON";
+ public static final Set<String> VALID_DATA_FORMATS =
+ Sets.newHashSet(VALID_FORMATS_STR.split(","));
- /** Returns the expected {@link SchemaTransform} of the configuration. */
- @Override
- protected SchemaTransform from(PubsubReadSchemaTransformConfiguration
configuration) {
- PubsubMessageToRow toRowTransform =
-
PubsubSchemaTransformMessageToRowFactory.from(configuration).buildMessageToRow();
- return new PubsubReadSchemaTransform(configuration, toRowTransform);
- }
+ public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() {};
+ public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
+ public static final Schema ERROR_SCHEMA =
+
Schema.builder().addStringField("error").addNullableByteArrayField("row").build();
- /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
@Override
- public String identifier() {
- return "beam:schematransform:org.apache.beam:pubsub_read:v1";
- }
-
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since
- * no input is expected, this returns an empty list.
- */
- @Override
- public List<String> inputCollectionNames() {
- return Collections.emptyList();
+ public Class<PubsubReadSchemaTransformConfiguration> configurationClass() {
+ return PubsubReadSchemaTransformConfiguration.class;
}
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. Since
- * a single output is expected, this returns a list with a single name.
- */
@Override
- public List<String> outputCollectionNames() {
- return Collections.singletonList(OUTPUT_TAG);
- }
-
- /**
- * An implementation of {@link SchemaTransform} for Pub/Sub reads configured
using {@link
- * PubsubReadSchemaTransformConfiguration}.
- */
- static class PubsubReadSchemaTransform extends SchemaTransform {
+ public SchemaTransform from(PubsubReadSchemaTransformConfiguration
configuration) {
+ if (configuration.getSubscription() == null && configuration.getTopic() ==
null) {
+ throw new IllegalArgumentException(
+ "To read from Pubsub, a subscription name or a topic name must be
provided");
+ }
- private final PubsubReadSchemaTransformConfiguration configuration;
- private final PubsubMessageToRow pubsubMessageToRow;
+ if (configuration.getSubscription() != null && configuration.getTopic() !=
null) {
+ throw new IllegalArgumentException(
+ "To read from Pubsub, a subscription name or a topic name must be
provided. Not both.");
+ }
- private PubsubClient.PubsubClientFactory clientFactory;
+ if ((Strings.isNullOrEmpty(configuration.getSchema())
+ && !Strings.isNullOrEmpty(configuration.getFormat()))
+ || (!Strings.isNullOrEmpty(configuration.getSchema())
+ && Strings.isNullOrEmpty(configuration.getFormat()))) {
+ throw new IllegalArgumentException(
+ "A schema was provided without a data format (or viceversa). Please
provide "
+ + "both of these parameters to read from Pubsub, or if you would
like to use the Pubsub schema service,"
+ + " please leave both of these blank.");
+ }
- private Clock clock;
+ Schema beamSchema;
+ SerializableFunction<byte[], Row> valueMapper;
- private PubsubReadSchemaTransform(
- PubsubReadSchemaTransformConfiguration configuration,
- PubsubMessageToRow pubsubMessageToRow) {
- this.configuration = configuration;
- this.pubsubMessageToRow = pubsubMessageToRow;
+ if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Format %s not supported. Only supported formats are %s",
+ configuration.getFormat(), VALID_FORMATS_STR));
}
-
- /**
- * Sets the {@link PubsubClient.PubsubClientFactory}.
- *
- * <p>Used for testing.
- */
- void setClientFactory(PubsubClient.PubsubClientFactory value) {
- this.clientFactory = value;
+ beamSchema =
+ Objects.equals(configuration.getFormat(), "JSON")
+ ? JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema())
+ : AvroUtils.toBeamSchema(
+ new
org.apache.avro.Schema.Parser().parse(configuration.getSchema()));
+ valueMapper =
+ Objects.equals(configuration.getFormat(), "JSON")
+ ? JsonUtils.getJsonBytesToRowFunction(beamSchema)
+ : AvroUtils.getAvroBytesToRowFunction(beamSchema);
+
+ PubsubReadSchemaTransform transform =
+ new PubsubReadSchemaTransform(
+ configuration.getTopic(), configuration.getSubscription(),
beamSchema, valueMapper);
+
+ if (configuration.getClientFactory() != null) {
+ transform.setClientFactory(configuration.getClientFactory());
+ }
+ if (configuration.getClock() != null) {
+ transform.setClock(configuration.getClock());
}
- /**
- * Sets the {@link Clock}.
- *
- * <p>Used for testing.
- */
- void setClock(Clock clock) {
- this.clock = clock;
+ return transform;
+ }
+
+ private static class PubsubReadSchemaTransform extends SchemaTransform
implements Serializable {
+ final Schema beamSchema;
+ final SerializableFunction<byte[], Row> valueMapper;
+ final @Nullable String topic;
+ final @Nullable String subscription;
+ @Nullable PubsubTestClientFactory clientFactory;
+ @Nullable Clock clock;
+
+ PubsubReadSchemaTransform(
+ @Nullable String topic,
+ @Nullable String subscription,
+ Schema beamSchema,
+ SerializableFunction<byte[], Row> valueMapper) {
+ this.topic = topic;
+ this.subscription = subscription;
+ this.beamSchema = beamSchema;
+ this.valueMapper = valueMapper;
}
- /** Validates the {@link PubsubReadSchemaTransformConfiguration}. */
- @Override
- public void validate(@Nullable PipelineOptions options) {
- if (configuration.getSubscription() == null && configuration.getTopic()
== null) {
- throw new IllegalArgumentException(
- String.format(
- "%s needs to set either the topic or the subscription",
- PubsubReadSchemaTransformConfiguration.class));
- }
+ private static class ErrorCounterFn extends DoFn<PubsubMessage, Row> {
+ private Counter pubsubErrorCounter;
+ private Long errorsInBundle = 0L;
+ private SerializableFunction<byte[], Row> valueMapper;
- if (configuration.getSubscription() != null && configuration.getTopic()
!= null) {
- throw new IllegalArgumentException(
- String.format(
- "%s should not set both the topic or the subscription",
- PubsubReadSchemaTransformConfiguration.class));
+ ErrorCounterFn(String name, SerializableFunction<byte[], Row>
valueMapper) {
+ this.pubsubErrorCounter =
Metrics.counter(PubsubReadSchemaTransformProvider.class, name);
+ this.valueMapper = valueMapper;
}
- try {
- PayloadSerializers.getSerializer(
- configuration.getFormat(), configuration.getDataSchema(), new
HashMap<>());
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException(
- String.format(
- "Invalid %s, no serializer provider exists for format `%s`",
- PubsubReadSchemaTransformConfiguration.class,
configuration.getFormat()));
+ @ProcessElement
+ public void process(@DoFn.Element PubsubMessage message,
MultiOutputReceiver receiver) {
+
+ try {
+
receiver.get(OUTPUT_TAG).output(valueMapper.apply(message.getPayload()));
+ } catch (Exception e) {
+ errorsInBundle += 1;
+ receiver
+ .get(ERROR_TAG)
+ .output(
+ Row.withSchema(ERROR_SCHEMA)
+ .addValues(e.toString(), message.getPayload())
+ .build());
+ }
}
- }
- /** Reads from Pub/Sub according to {@link
PubsubReadSchemaTransformConfiguration}. */
- @Override
- public PCollectionRowTuple expand(PCollectionRowTuple input) {
- if (!input.getAll().isEmpty()) {
- throw new IllegalArgumentException(
- String.format(
- "%s %s input is expected to be empty",
- input.getClass().getSimpleName(), getClass().getSimpleName()));
+ @FinishBundle
+ public void finish(FinishBundleContext c) {
+ pubsubErrorCounter.inc(errorsInBundle);
+ errorsInBundle = 0L;
}
-
- PCollectionTuple rowsWithDlq =
- input
- .getPipeline()
- .apply("ReadFromPubsub", buildPubsubRead())
- .apply("PubsubMessageToRow", pubsubMessageToRow);
-
- writeToDeadLetterQueue(rowsWithDlq);
-
- return PCollectionRowTuple.of(OUTPUT_TAG, rowsWithDlq.get(MAIN_TAG));
}
- private void writeToDeadLetterQueue(PCollectionTuple rowsWithDlq) {
- PubsubIO.Write<PubsubMessage> deadLetterQueue =
buildDeadLetterQueueWrite();
- if (deadLetterQueue == null) {
- return;
- }
- rowsWithDlq.get(DLQ_TAG).apply("WriteToDeadLetterQueue",
deadLetterQueue);
+ void setClientFactory(@Nullable PubsubTestClientFactory factory) {
+ this.clientFactory = factory;
}
- /**
- * Builds {@link PubsubIO.Write} dead letter queue from {@link
- * PubsubReadSchemaTransformConfiguration}.
- */
- PubsubIO.Write<PubsubMessage> buildDeadLetterQueueWrite() {
- if (configuration.getDeadLetterQueue() == null) {
- return null;
- }
-
- PubsubIO.Write<PubsubMessage> writeDlq =
- PubsubIO.writeMessages().to(configuration.getDeadLetterQueue());
-
- if (configuration.getTimestampAttribute() != null) {
- writeDlq =
writeDlq.withTimestampAttribute(configuration.getTimestampAttribute());
- }
-
- return writeDlq;
+ void setClock(@Nullable Clock clock) {
+ this.clock = clock;
}
- /** Builds {@link PubsubIO.Read} from a {@link
PubsubReadSchemaTransformConfiguration}. */
+ @SuppressWarnings("nullness")
PubsubIO.Read<PubsubMessage> buildPubsubRead() {
- PubsubIO.Read<PubsubMessage> read =
PubsubIO.readMessagesWithAttributes();
-
- if (configuration.getSubscription() != null) {
- read = read.fromSubscription(configuration.getSubscription());
+ PubsubIO.Read<PubsubMessage> pubsubRead = PubsubIO.readMessages();
+ if (!Strings.isNullOrEmpty(topic)) {
+ pubsubRead = pubsubRead.fromTopic(topic);
+ } else {
+ pubsubRead = pubsubRead.fromSubscription(subscription);
}
-
- if (configuration.getTopic() != null) {
- read = read.fromTopic(configuration.getTopic());
+ if (clientFactory != null && clock != null) {
+ pubsubRead = pubsubRead.withClientFactory(clientFactory);
+ pubsubRead = clientFactory.setClock(pubsubRead, clock);
+ } else if (clientFactory != null || clock != null) {
+ throw new IllegalArgumentException(
+ "Both PubsubTestClientFactory and Clock need to be specified for
testing, but only one is provided");
}
+ return pubsubRead;
+ }
- if (configuration.getTimestampAttribute() != null) {
- read =
read.withTimestampAttribute(configuration.getTimestampAttribute());
- }
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ PubsubIO.Read<PubsubMessage> pubsubRead = buildPubsubRead();
- if (configuration.getIdAttribute() != null) {
- read = read.withIdAttribute(configuration.getIdAttribute());
- }
+ PCollectionTuple outputTuple =
+ input
+ .getPipeline()
+ .apply(pubsubRead)
+ .apply(
+ ParDo.of(new ErrorCounterFn("PubSub-read-error-counter",
valueMapper))
+ .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
+
+ return PCollectionRowTuple.of(
+ "output",
+ outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema),
+ "errors",
+ outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA));
+ }
+ }
- if (clientFactory != null) {
- read = read.withClientFactory(clientFactory);
- }
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:pubsub_read:v1";
+ }
- if (clock != null) {
- read = read.withClock(clock);
- }
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull
@Initialized String>
+ inputCollectionNames() {
+ return Collections.emptyList();
+ }
- return read;
- }
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull
@Initialized String>
+ outputCollectionNames() {
+ return Arrays.asList("output", "errors");
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java
deleted file mode 100644
index 988c593e32f..00000000000
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactory.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
-import static org.apache.beam.sdk.schemas.Schema.TypeName.ROW;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
-
-/**
- * Builds a {@link PubsubMessageToRow} from a {@link
PubsubReadSchemaTransformConfiguration}.
- *
- * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
- * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
- * repository.
- */
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-@Internal
-class PubsubSchemaTransformMessageToRowFactory {
- private static final String DEFAULT_FORMAT = "json";
-
- private static final Schema.FieldType ATTRIBUTE_MAP_FIELD_TYPE =
- Schema.FieldType.map(Schema.FieldType.STRING.withNullable(false),
Schema.FieldType.STRING);
- private static final Schema ATTRIBUTE_ARRAY_ENTRY_SCHEMA =
- Schema.builder().addStringField("key").addStringField("value").build();
- private static final Schema.FieldType ATTRIBUTE_ARRAY_FIELD_TYPE =
-
Schema.FieldType.array(Schema.FieldType.row(ATTRIBUTE_ARRAY_ENTRY_SCHEMA));
-
- private static final String THRIFT_CLASS_KEY = "thriftClass";
- private static final String THRIFT_PROTOCOL_FACTORY_CLASS_KEY =
"thriftProtocolFactoryClass";
- private static final String PROTO_CLASS_KEY = "protoClass";
-
- /**
- * Instantiate a {@link PubsubSchemaTransformMessageToRowFactory} from a
{@link
- * PubsubReadSchemaTransformConfiguration}.
- */
- static PubsubSchemaTransformMessageToRowFactory from(
- PubsubReadSchemaTransformConfiguration configuration) {
- return new PubsubSchemaTransformMessageToRowFactory(configuration);
- }
-
- /** Build the {@link PubsubMessageToRow}. */
- PubsubMessageToRow buildMessageToRow() {
- PubsubMessageToRow.Builder builder =
- PubsubMessageToRow.builder()
- .messageSchema(configuration.getDataSchema())
- .useDlq(
- configuration.getDeadLetterQueue() != null
- && !configuration.getDeadLetterQueue().isEmpty())
- .useFlatSchema(!shouldUseNestedSchema());
-
- if (needsSerializer()) {
- builder = builder.serializerProvider(serializer());
- }
-
- return builder.build();
- }
-
- private final PubsubReadSchemaTransformConfiguration configuration;
-
- private PubsubSchemaTransformMessageToRowFactory(
- PubsubReadSchemaTransformConfiguration configuration) {
- this.configuration = configuration;
- }
-
- private PayloadSerializer payloadSerializer() {
- Schema schema = configuration.getDataSchema();
- String format = DEFAULT_FORMAT;
-
- if (configuration.getFormat() != null &&
!configuration.getFormat().isEmpty()) {
- format = configuration.getFormat();
- }
-
- Map<String, Object> params = new HashMap<>();
-
- if (configuration.getThriftClass() != null &&
!configuration.getThriftClass().isEmpty()) {
- params.put(THRIFT_CLASS_KEY, configuration.getThriftClass());
- }
-
- if (configuration.getThriftProtocolFactoryClass() != null
- && !configuration.getThriftProtocolFactoryClass().isEmpty()) {
- params.put(THRIFT_PROTOCOL_FACTORY_CLASS_KEY,
configuration.getThriftProtocolFactoryClass());
- }
-
- if (configuration.getProtoClass() != null &&
!configuration.getProtoClass().isEmpty()) {
- params.put(PROTO_CLASS_KEY, configuration.getProtoClass());
- }
-
- return PayloadSerializers.getSerializer(format, schema, params);
- }
-
- PubsubMessageToRow.SerializerProvider serializer() {
- return input -> payloadSerializer();
- }
-
- /**
- * Determines whether the {@link PubsubMessageToRow} needs a {@link
- * PubsubMessageToRow.SerializerProvider}.
- *
- * <p>The determination is based on {@link #shouldUseNestedSchema()} is
false or if the {@link
- * PubsubMessageToRow#PAYLOAD_FIELD} is not present.
- */
- boolean needsSerializer() {
- return !shouldUseNestedSchema() || !fieldPresent(PAYLOAD_FIELD,
Schema.FieldType.BYTES);
- }
-
- /**
- * Determines whether a nested schema should be used for {@link
- * PubsubReadSchemaTransformConfiguration#getDataSchema()}.
- *
- * <p>The determination is based on {@link #schemaHasValidPayloadField()}
and {@link
- * #schemaHasValidAttributesField()}}
- */
- boolean shouldUseNestedSchema() {
- return schemaHasValidPayloadField() && schemaHasValidAttributesField();
- }
-
- /**
- * Determines whether {@link
PubsubReadSchemaTransformConfiguration#getDataSchema()} has a valid
- * {@link PubsubMessageToRow#PAYLOAD_FIELD}.
- */
- boolean schemaHasValidPayloadField() {
- Schema schema = configuration.getDataSchema();
- if (!schema.hasField(PAYLOAD_FIELD)) {
- return false;
- }
- if (fieldPresent(PAYLOAD_FIELD, Schema.FieldType.BYTES)) {
- return true;
- }
- return schema.getField(PAYLOAD_FIELD).getType().getTypeName().equals(ROW);
- }
-
- /**
- * Determines whether {@link
PubsubReadSchemaTransformConfiguration#getDataSchema()} has a valid
- * {@link PubsubMessageToRow#ATTRIBUTES_FIELD} field.
- *
- * <p>The determination is based on whether {@link #fieldPresent(String,
Schema.FieldType)} for
- * {@link PubsubMessageToRow#ATTRIBUTES_FIELD} is true for either {@link
- * #ATTRIBUTE_MAP_FIELD_TYPE} or {@link #ATTRIBUTE_ARRAY_FIELD_TYPE} {@link
Schema.FieldType}s.
- */
- boolean schemaHasValidAttributesField() {
- return fieldPresent(ATTRIBUTES_FIELD, ATTRIBUTE_MAP_FIELD_TYPE)
- || fieldPresent(ATTRIBUTES_FIELD, ATTRIBUTE_ARRAY_FIELD_TYPE);
- }
-
- /**
- * Determines whether {@link
PubsubReadSchemaTransformConfiguration#getDataSchema()} contains the
- * field and whether that field is an expectedType {@link Schema.FieldType}.
- */
- boolean fieldPresent(String field, Schema.FieldType expectedType) {
- Schema schema = configuration.getDataSchema();
- return schema.hasField(field)
- && expectedType.equivalent(
- schema.getField(field).getType(),
Schema.EquivalenceNullablePolicy.IGNORE);
- }
-}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java
index acaf04cdfc6..57620c968c5 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformConfiguration.java
@@ -18,9 +18,9 @@
package org.apache.beam.sdk.io.gcp.pubsub;
import com.google.auto.value.AutoValue;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
/**
* Configuration for writing to Pub/Sub.
@@ -32,179 +32,25 @@ import
org.apache.beam.sdk.schemas.annotations.DefaultSchema;
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class PubsubWriteSchemaTransformConfiguration {
+ @SchemaFieldDescription(
+ "The encoding format for the data stored in Pubsub. Valid options are: "
+ + PubsubWriteSchemaTransformProvider.VALID_FORMATS_STR)
+ public abstract String getFormat();
- public static final String DEFAULT_TIMESTAMP_ATTRIBUTE = "event_timestamp";
+ @SchemaFieldDescription(
+ "The name of the topic to write data to. " + "Format:
projects/${PROJECT}/topics/${TOPIC}")
+ public abstract String getTopic();
public static Builder builder() {
return new AutoValue_PubsubWriteSchemaTransformConfiguration.Builder();
}
- public static TargetConfiguration.Builder targetConfigurationBuilder() {
- return new
AutoValue_PubsubWriteSchemaTransformConfiguration_TargetConfiguration.Builder()
- .setTimestampAttributeKey(DEFAULT_TIMESTAMP_ATTRIBUTE);
- }
-
- public static SourceConfiguration.Builder sourceConfigurationBuilder() {
- return new
AutoValue_PubsubWriteSchemaTransformConfiguration_SourceConfiguration.Builder();
- }
-
- /**
- * Configuration details of the source {@link
org.apache.beam.sdk.values.Row} {@link
- * org.apache.beam.sdk.schemas.Schema}.
- */
- @Nullable
- public abstract SourceConfiguration getSource();
-
- /** Configuration details of the target {@link PubsubMessage}. */
- public abstract TargetConfiguration getTarget();
-
- /**
- * The topic to which to write Pub/Sub messages.
- *
- * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on
the format of the
- * topic string.
- */
- public abstract String getTopic();
-
- /**
- * The expected format of the Pub/Sub message.
- *
- * <p>Used to retrieve the {@link
org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} from
- * {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}. See
list of supported
- * values by invoking {@link
org.apache.beam.sdk.schemas.io.Providers#loadProviders(Class)}.
- *
- * <pre>{@code
Providers.loadProviders(PayloadSerializer.class).keySet()}</pre>
- */
- @Nullable
- public abstract String getFormat();
-
- /**
- * When writing to Cloud Pub/Sub where unique record identifiers are
provided as Pub/Sub message
- * attributes, specifies the name of the attribute containing the unique
identifier.
- */
- @Nullable
- public abstract String getIdAttribute();
-
- /** Builder for {@link PubsubWriteSchemaTransformConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {
+ public abstract Builder setFormat(String format);
- /**
- * Configuration details of the source {@link
org.apache.beam.sdk.values.Row} {@link
- * org.apache.beam.sdk.schemas.Schema}.
- */
- public abstract Builder setSource(SourceConfiguration value);
-
- /** Configuration details of the target {@link PubsubMessage}. */
- public abstract Builder setTarget(TargetConfiguration value);
-
- /**
- * The topic to which to write Pub/Sub messages.
- *
- * <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details
on the format of the
- * topic string.
- */
- public abstract Builder setTopic(String value);
-
- /**
- * The expected format of the Pub/Sub message.
- *
- * <p>Used to retrieve the {@link
org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer}
- * from {@link
org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}. See list of
- * supported values by invoking {@link
- * org.apache.beam.sdk.schemas.io.Providers#loadProviders(Class)}.
- *
- * <pre>{@code
Providers.loadProviders(PayloadSerializer.class).keySet()}</pre>
- */
- public abstract Builder setFormat(String value);
-
- /**
- * When reading from Cloud Pub/Sub where unique record identifiers are
provided as Pub/Sub
- * message attributes, specifies the name of the attribute containing the
unique identifier.
- */
- public abstract Builder setIdAttribute(String value);
+ public abstract Builder setTopic(String topic);
public abstract PubsubWriteSchemaTransformConfiguration build();
}
-
- @DefaultSchema(AutoValueSchema.class)
- @AutoValue
- public abstract static class SourceConfiguration {
- /**
- * The attributes field name of the source {@link
org.apache.beam.sdk.values.Row}. {@link
- * org.apache.beam.sdk.schemas.Schema.FieldType} must be a
<code>Map<String, String>
- * </code>
- */
- @Nullable
- public abstract String getAttributesFieldName();
-
- /**
- * The timestamp field name of the source {@link
org.apache.beam.sdk.values.Row}. {@link
- * org.apache.beam.sdk.schemas.Schema.FieldType} must be a {@link
- * org.apache.beam.sdk.schemas.Schema.FieldType#DATETIME}.
- */
- @Nullable
- public abstract String getTimestampFieldName();
-
- /**
- * The payload field name of the source {@link
org.apache.beam.sdk.values.Row}. {@link
- * org.apache.beam.sdk.schemas.Schema.FieldType} must be either {@link
- * org.apache.beam.sdk.schemas.Schema.FieldType#BYTES} or a {@link
- * org.apache.beam.sdk.values.Row}. If null, payload serialized from user
fields other than
- * attributes. Not compatible with other payload intended fields.
- */
- @Nullable
- public abstract String getPayloadFieldName();
-
- @AutoValue.Builder
- public abstract static class Builder {
- /**
- * The attributes field name of the source {@link
org.apache.beam.sdk.values.Row}. {@link
- * org.apache.beam.sdk.schemas.Schema.FieldType} must be a
<code>Map<String, String>
- * </code>
- */
- public abstract Builder setAttributesFieldName(String value);
-
- /**
- * The timestamp field name of the source {@link
org.apache.beam.sdk.values.Row}. {@link
- * org.apache.beam.sdk.schemas.Schema.FieldType} must be a {@link
- * org.apache.beam.sdk.schemas.Schema.FieldType#DATETIME}.
- */
- public abstract Builder setTimestampFieldName(String value);
-
- /**
- * The payload field name of the source {@link
org.apache.beam.sdk.values.Row}. {@link
- * org.apache.beam.sdk.schemas.Schema.FieldType} must be either {@link
- * org.apache.beam.sdk.schemas.Schema.FieldType#BYTES} or a {@link
- * org.apache.beam.sdk.values.Row}. If null, payload serialized from
user fields other than
- * attributes. Not compatible with other payload intended fields.
- */
- public abstract Builder setPayloadFieldName(String value);
-
- public abstract SourceConfiguration build();
- }
- }
-
- @DefaultSchema(AutoValueSchema.class)
- @AutoValue
- public abstract static class TargetConfiguration {
-
- /**
- * The attribute key to assign the {@link PubsubMessage} stringified
timestamp value. {@link
- * #builder()} method defaults value to {@link
#DEFAULT_TIMESTAMP_ATTRIBUTE}.
- */
- public abstract String getTimestampAttributeKey();
-
- @AutoValue.Builder
- public abstract static class Builder {
-
- /**
- * The attribute key to assign the {@link PubsubMessage} stringified
timestamp value. Defaults
- * to {@link #DEFAULT_TIMESTAMP_ATTRIBUTE}.
- */
- public abstract Builder setTimestampAttributeKey(String value);
-
- public abstract TargetConfiguration build();
- }
- }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java
index 7f3f6f2c702..8e8b804801b 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProvider.java
@@ -17,56 +17,29 @@
*/
package org.apache.beam.sdk.io.gcp.pubsub;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_ATTRIBUTES_KEY_NAME;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_EVENT_TIMESTAMP_KEY_NAME;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_PAYLOAD_KEY_NAME;
-import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ERROR;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE;
-import static org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.OUTPUT;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.PAYLOAD_BYTES_TYPE_NAME;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.PAYLOAD_ROW_TYPE_NAME;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.removeFields;
-import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-import static
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
-
-import com.google.api.client.util.Clock;
import com.google.auto.service.AutoService;
-import java.io.IOException;
-import java.util.ArrayList;
+import java.io.Serializable;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
import java.util.Set;
-import java.util.stream.Stream;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SchemaPath;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.FieldMatcher;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.SchemaReflection;
-import
org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformConfiguration.SourceConfiguration;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.Field;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
-import org.apache.beam.sdk.schemas.io.Providers;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializerProvider;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
+import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.joda.time.Instant;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
/**
* An implementation of {@link TypedSchemaTransformProvider} for Pub/Sub reads
configured using
@@ -76,360 +49,102 @@ import org.joda.time.Instant;
* provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
* repository.
*/
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-@Internal
@AutoService(SchemaTransformProvider.class)
public class PubsubWriteSchemaTransformProvider
extends
TypedSchemaTransformProvider<PubsubWriteSchemaTransformConfiguration> {
- private static final String IDENTIFIER =
"beam:schematransform:org.apache.beam:pubsub_write:v1";
- static final String INPUT_TAG = "input";
- static final String ERROR_TAG = "error";
- /** Returns the expected class of the configuration. */
- @Override
- protected Class<PubsubWriteSchemaTransformConfiguration>
configurationClass() {
- return PubsubWriteSchemaTransformConfiguration.class;
- }
+ public static final TupleTag<PubsubMessage> OUTPUT_TAG = new
TupleTag<PubsubMessage>() {};
+ public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
- /** Returns the expected {@link SchemaTransform} of the configuration. */
- @Override
- public SchemaTransform from(PubsubWriteSchemaTransformConfiguration
configuration) {
- return new PubsubWriteSchemaTransform(configuration);
- }
-
- /** Implementation of the {@link SchemaTransformProvider} identifier method.
*/
- @Override
- public String identifier() {
- return IDENTIFIER;
- }
-
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since a
- * single input is expected, this returns a list with a single name.
- */
- @Override
- public List<String> inputCollectionNames() {
- return Collections.singletonList(INPUT_TAG);
- }
+ public static final String VALID_FORMATS_STR = "AVRO,JSON";
+ public static final Set<String> VALID_DATA_FORMATS =
+ Sets.newHashSet(VALID_FORMATS_STR.split(","));
- /**
- * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. The
- * only expected output is the {@link #ERROR_TAG}.
- */
@Override
- public List<String> outputCollectionNames() {
- return Collections.singletonList(ERROR_TAG);
+ public Class<PubsubWriteSchemaTransformConfiguration> configurationClass() {
+ return PubsubWriteSchemaTransformConfiguration.class;
}
- /**
- * An implementation of {@link SchemaTransform} for Pub/Sub writes
configured using {@link
- * PubsubWriteSchemaTransformConfiguration}.
- */
- static class PubsubWriteSchemaTransform extends SchemaTransform {
-
- private final PubsubWriteSchemaTransformConfiguration configuration;
+ public static class ErrorFn extends DoFn<Row, PubsubMessage> {
+ private SerializableFunction<Row, byte[]> valueMapper;
+ private Schema errorSchema;
- private PubsubClient.PubsubClientFactory pubsubClientFactory;
-
- PubsubWriteSchemaTransform(PubsubWriteSchemaTransformConfiguration
configuration) {
- this.configuration = configuration;
+ ErrorFn(SerializableFunction<Row, byte[]> valueMapper, Schema errorSchema)
{
+ this.valueMapper = valueMapper;
+ this.errorSchema = errorSchema;
}
- PubsubWriteSchemaTransform
withPubsubClientFactory(PubsubClient.PubsubClientFactory factory) {
- this.pubsubClientFactory = factory;
- return this;
- }
-
- @Override
- public PCollectionRowTuple expand(PCollectionRowTuple input) {
- if (input.getAll().size() != 1 || !input.has(INPUT_TAG)) {
- throw new IllegalArgumentException(
- String.format(
- "%s %s input is expected to contain a single %s tagged
PCollection<Row>",
- input.getClass().getSimpleName(), getClass().getSimpleName(),
INPUT_TAG));
- }
-
- PCollection<Row> rows = input.get(INPUT_TAG);
- if (rows.getSchema().getFieldCount() == 0) {
- throw new IllegalArgumentException(String.format("empty Schema for
%s", INPUT_TAG));
- }
-
- Schema targetSchema = buildTargetSchema(rows.getSchema());
-
- rows =
- rows.apply(
- ConvertForRowToMessage.class.getSimpleName(),
- convertForRowToMessage(targetSchema))
- .setRowSchema(targetSchema);
-
- Schema schema = rows.getSchema();
-
- Schema serializableSchema =
- removeFields(schema, DEFAULT_ATTRIBUTES_KEY_NAME,
DEFAULT_EVENT_TIMESTAMP_KEY_NAME);
- FieldMatcher payloadRowMatcher =
FieldMatcher.of(DEFAULT_PAYLOAD_KEY_NAME, TypeName.ROW);
- if (payloadRowMatcher.match(serializableSchema)) {
- serializableSchema =
-
serializableSchema.getField(DEFAULT_PAYLOAD_KEY_NAME).getType().getRowSchema();
- }
-
- validateTargetSchemaAgainstPubsubSchema(serializableSchema,
input.getPipeline().getOptions());
-
- PCollectionTuple pct =
- rows.apply(
- PubsubRowToMessage.class.getSimpleName(),
- buildPubsubRowToMessage(serializableSchema));
-
- PCollection<PubsubMessage> messages = pct.get(OUTPUT);
- messages.apply(PubsubIO.Write.class.getSimpleName(), buildPubsubWrite());
- return PCollectionRowTuple.of(ERROR_TAG, pct.get(ERROR));
- }
-
- PayloadSerializer getPayloadSerializer(Schema schema) {
- if (configuration.getFormat() == null) {
- return null;
- }
- String format = configuration.getFormat();
- Set<String> availableFormats =
- Providers.loadProviders(PayloadSerializerProvider.class).keySet();
- if (!availableFormats.contains(format)) {
- String availableFormatsString = String.join(",", availableFormats);
- throw new IllegalArgumentException(
- String.format(
- "%s is not among the valid formats: [%s]", format,
availableFormatsString));
- }
- return PayloadSerializers.getSerializer(configuration.getFormat(),
schema, ImmutableMap.of());
- }
-
- PubsubRowToMessage buildPubsubRowToMessage(Schema schema) {
- PubsubRowToMessage.Builder builder =
-
PubsubRowToMessage.builder().setPayloadSerializer(getPayloadSerializer(schema));
-
- if (configuration.getTarget() != null) {
- builder =
- builder.setTargetTimestampAttributeName(
- configuration.getTarget().getTimestampAttributeKey());
- }
-
- return builder.build();
- }
-
- PubsubIO.Write<PubsubMessage> buildPubsubWrite() {
- PubsubIO.Write<PubsubMessage> write =
PubsubIO.writeMessages().to(configuration.getTopic());
-
- if (configuration.getIdAttribute() != null) {
- write = write.withIdAttribute(configuration.getIdAttribute());
- }
-
- if (pubsubClientFactory != null) {
- write = write.withClientFactory(pubsubClientFactory);
+ @ProcessElement
+ public void processElement(@Element Row row, MultiOutputReceiver receiver)
{
+ try {
+ receiver.get(OUTPUT_TAG).output(new
PubsubMessage(valueMapper.apply(row), null));
+ } catch (Exception e) {
+ receiver
+ .get(ERROR_TAG)
+ .output(Row.withSchema(errorSchema).addValues(e.toString(),
row).build());
}
-
- return write;
}
+ }
- void validateSourceSchemaAgainstConfiguration(Schema sourceSchema) {
- if (sourceSchema.getFieldCount() == 0) {
- throw new IllegalArgumentException(String.format("empty Schema for
%s", INPUT_TAG));
- }
-
- if (configuration.getSource() == null) {
- return;
- }
-
- SourceConfiguration source = configuration.getSource();
-
- if (source.getAttributesFieldName() != null) {
- String fieldName = source.getAttributesFieldName();
- FieldType fieldType = ATTRIBUTES_FIELD_TYPE;
- FieldMatcher fieldMatcher = FieldMatcher.of(fieldName, fieldType);
- checkArgument(
- fieldMatcher.match(sourceSchema),
- String.format("schema missing field: %s for type %s: ", fieldName,
fieldType));
- }
-
- if (source.getTimestampFieldName() != null) {
- String fieldName = source.getTimestampFieldName();
- FieldType fieldType = EVENT_TIMESTAMP_FIELD_TYPE;
- FieldMatcher fieldMatcher = FieldMatcher.of(fieldName, fieldType);
- checkArgument(
- fieldMatcher.match(sourceSchema),
- String.format("schema missing field: %s for type: %s", fieldName,
fieldType));
- }
-
- if (source.getPayloadFieldName() == null) {
- return;
- }
-
- String fieldName = source.getPayloadFieldName();
- FieldMatcher bytesFieldMatcher = FieldMatcher.of(fieldName,
PAYLOAD_BYTES_TYPE_NAME);
- FieldMatcher rowFieldMatcher = FieldMatcher.of(fieldName,
PAYLOAD_ROW_TYPE_NAME);
- SchemaReflection schemaReflection = SchemaReflection.of(sourceSchema);
- checkArgument(
- schemaReflection.matchesAny(bytesFieldMatcher, rowFieldMatcher),
+ @Override
+ public SchemaTransform from(PubsubWriteSchemaTransformConfiguration
configuration) {
+ if (!VALID_DATA_FORMATS.contains(configuration.getFormat())) {
+ throw new IllegalArgumentException(
String.format(
- "schema missing field: %s for types %s or %s",
- fieldName, PAYLOAD_BYTES_TYPE_NAME, PAYLOAD_ROW_TYPE_NAME));
-
- String[] fieldsToExclude =
- Stream.of(
- source.getAttributesFieldName(),
- source.getTimestampFieldName(),
- source.getPayloadFieldName())
- .filter(Objects::nonNull)
- .toArray(String[]::new);
-
- Schema userFieldsSchema = removeFields(sourceSchema, fieldsToExclude);
-
- if (userFieldsSchema.getFieldCount() > 0) {
- throw new IllegalArgumentException(
- String.format("user fields incompatible with %s field",
source.getPayloadFieldName()));
- }
- }
-
- void validateTargetSchemaAgainstPubsubSchema(Schema targetSchema,
PipelineOptions options) {
- checkArgument(options != null);
-
- try (PubsubClient pubsubClient =
getPubsubClient(options.as(PubsubOptions.class))) {
- PubsubClient.TopicPath topicPath =
PubsubClient.topicPathFromPath(configuration.getTopic());
- PubsubClient.SchemaPath schemaPath =
pubsubClient.getSchemaPath(topicPath);
- if (schemaPath == null ||
schemaPath.equals(SchemaPath.DELETED_SCHEMA)) {
- return;
- }
- Schema expectedSchema = pubsubClient.getSchema(schemaPath);
- checkState(
- targetSchema.equals(expectedSchema),
- String.format(
- "input schema mismatch with expected schema at path: %s\ninput
schema: %s\nPub/Sub schema: %s",
- schemaPath, targetSchema, expectedSchema));
- } catch (IOException e) {
- throw new IllegalStateException(e.getMessage());
- }
- }
-
- Schema buildTargetSchema(Schema sourceSchema) {
- validateSourceSchemaAgainstConfiguration(sourceSchema);
- FieldType payloadFieldType = null;
-
- List<String> fieldsToRemove = new ArrayList<>();
-
- if (configuration.getSource() != null) {
- SourceConfiguration source = configuration.getSource();
-
- if (source.getAttributesFieldName() != null) {
- fieldsToRemove.add(source.getAttributesFieldName());
- }
-
- if (source.getTimestampFieldName() != null) {
- fieldsToRemove.add(source.getTimestampFieldName());
- }
-
- if (source.getPayloadFieldName() != null) {
- String fieldName = source.getPayloadFieldName();
- Field field = sourceSchema.getField(fieldName);
- payloadFieldType = field.getType();
- fieldsToRemove.add(fieldName);
- }
- }
-
- Schema targetSchema =
- PubsubRowToMessage.builder()
- .build()
- .inputSchemaFactory(payloadFieldType)
- .buildSchema(sourceSchema.getFields().toArray(new Field[0]));
-
- return removeFields(targetSchema, fieldsToRemove.toArray(new String[0]));
+ "Format %s not supported. Only supported formats are %s",
+ configuration.getFormat(), VALID_FORMATS_STR));
}
+ return new PubsubWriteSchemaTransform(configuration.getTopic(),
configuration.getFormat());
+ }
- private PubsubClient.PubsubClientFactory getPubsubClientFactory() {
- if (pubsubClientFactory != null) {
- return pubsubClientFactory;
- }
- return PubsubGrpcClient.FACTORY;
- }
+ private static class PubsubWriteSchemaTransform extends SchemaTransform
implements Serializable {
+ final String topic;
+ final String format;
- private PubsubClient getPubsubClient(PubsubOptions options) throws
IOException {
- return getPubsubClientFactory()
- .newClient(
- configuration.getTarget().getTimestampAttributeKey(),
- configuration.getIdAttribute(),
- options);
+ PubsubWriteSchemaTransform(String topic, String format) {
+ this.topic = topic;
+ this.format = format;
}
- ParDo.SingleOutput<Row, Row> convertForRowToMessage(Schema targetSchema) {
- return convertForRowToMessage(targetSchema, null);
- }
-
- ParDo.SingleOutput<Row, Row> convertForRowToMessage(
- Schema targetSchema, @Nullable Clock clock) {
- String attributesName = null;
- String timestampName = null;
- String payloadName = null;
- SourceConfiguration source = configuration.getSource();
- if (source != null) {
- attributesName = source.getAttributesFieldName();
- timestampName = source.getTimestampFieldName();
- payloadName = source.getPayloadFieldName();
- }
- return ParDo.of(
- new ConvertForRowToMessage(
- targetSchema, clock, attributesName, timestampName,
payloadName));
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ final Schema errorSchema =
+ Schema.builder()
+ .addStringField("error")
+ .addNullableRowField("row", input.get("input").getSchema())
+ .build();
+ SerializableFunction<Row, byte[]> fn =
+ format.equals("AVRO")
+ ?
AvroUtils.getRowToAvroBytesFunction(input.get("input").getSchema())
+ :
JsonUtils.getRowToJsonBytesFunction(input.get("input").getSchema());
+
+ PCollectionTuple outputTuple =
+ input
+ .get("input")
+ .apply(
+ ParDo.of(new ErrorFn(fn, errorSchema))
+ .withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
+
+ outputTuple.get(OUTPUT_TAG).apply(PubsubIO.writeMessages().to(topic));
+
+ return PCollectionRowTuple.of("errors",
outputTuple.get(ERROR_TAG).setRowSchema(errorSchema));
}
}
- private static class ConvertForRowToMessage extends DoFn<Row, Row> {
- private final Schema targetSchema;
- @Nullable private final Clock clock;
- @Nullable private final String attributesFieldName;
- @Nullable private final String timestampFieldName;
- @Nullable private final String payloadFieldName;
-
- ConvertForRowToMessage(
- Schema targetSchema,
- @Nullable Clock clock,
- @Nullable String attributesFieldName,
- @Nullable String timestampFieldName,
- @Nullable String payloadFieldName) {
- this.targetSchema = targetSchema;
- this.clock = clock;
- this.attributesFieldName = attributesFieldName;
- this.timestampFieldName = timestampFieldName;
- this.payloadFieldName = payloadFieldName;
- }
-
- @ProcessElement
- public void process(@Element Row row, OutputReceiver<Row> receiver) {
- Instant now = Instant.now();
- if (clock != null) {
- now = Instant.ofEpochMilli(clock.currentTimeMillis());
- }
- Map<String, Object> values = new HashMap<>();
-
- // Default attributes value
- checkState(targetSchema.hasField(DEFAULT_ATTRIBUTES_KEY_NAME));
- values.put(DEFAULT_ATTRIBUTES_KEY_NAME, ImmutableMap.of());
-
- // Default timestamp value
- checkState(targetSchema.hasField(DEFAULT_EVENT_TIMESTAMP_KEY_NAME));
- values.put(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, now);
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized String identifier() {
+ return "beam:schematransform:org.apache.beam:pubsub_write:v1";
+ }
- for (String fieldName : row.getSchema().getFieldNames()) {
- if (targetSchema.hasField(fieldName)) {
- values.put(fieldName, row.getValue(fieldName));
- }
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull
@Initialized String>
+ inputCollectionNames() {
+ return Collections.singletonList("input");
+ }
- if (attributesFieldName != null) {
- values.put(DEFAULT_ATTRIBUTES_KEY_NAME,
row.getValue(attributesFieldName));
- }
- if (timestampFieldName != null) {
- values.put(DEFAULT_EVENT_TIMESTAMP_KEY_NAME,
row.getValue(timestampFieldName));
- }
- if (payloadFieldName != null) {
- values.put(DEFAULT_PAYLOAD_KEY_NAME, row.getValue(payloadFieldName));
- }
- }
-
receiver.output(Row.withSchema(targetSchema).withFieldValues(values).build());
- }
+ @Override
+ public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull
@Initialized String>
+ outputCollectionNames() {
+ return Collections.singletonList("errors");
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java
index 848549f1929..0de998f1112 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadSchemaTransformProviderTest.java
@@ -18,300 +18,237 @@
package org.apache.beam.sdk.io.gcp.pubsub;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import com.google.api.client.util.Clock;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
+import org.apache.beam.sdk.PipelineResult;
import
org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
+import
org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/** Tests for {@link PubsubReadSchemaTransformProvider}. */
+/** Tests for {@link
org.apache.beam.sdk.io.gcp.pubsub.PubsubReadSchemaTransformProvider}. */
@RunWith(JUnit4.class)
public class PubsubReadSchemaTransformProviderTest {
- private static final Schema SCHEMA =
+ private static final Schema BEAM_SCHEMA =
Schema.of(
Schema.Field.of("name", Schema.FieldType.STRING),
Schema.Field.of("number", Schema.FieldType.INT64));
-
+ private static final Schema BEAM_SCHEMA_WITH_ERROR =
+ Schema.of(Schema.Field.of("error", Schema.FieldType.STRING));
+ private static final String SCHEMA =
AvroUtils.toAvroSchema(BEAM_SCHEMA).toString();
private static final String SUBSCRIPTION =
"projects/project/subscriptions/subscription";
private static final String TOPIC = "projects/project/topics/topic";
- private static final List<TestCase> cases =
- Arrays.asList(
- testCase(
- "no configured topic or subscription",
-
PubsubReadSchemaTransformConfiguration.builder().setDataSchema(SCHEMA).build())
- .expectInvalidConfiguration(),
- testCase(
- "both topic and subscription configured",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setSubscription(TOPIC)
- .setDataSchema(SCHEMA)
- .build())
- .expectInvalidConfiguration(),
- testCase(
- "invalid format configured",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setDataSchema(SCHEMA)
- .setFormat("invalidformat")
- .build())
- .expectInvalidConfiguration(),
- testCase(
- "configuration with subscription",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setDataSchema(SCHEMA)
- .build())
-
.withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION)),
- testCase(
- "configuration with topic",
- PubsubReadSchemaTransformConfiguration.builder()
- .setTopic(TOPIC)
- .setDataSchema(SCHEMA)
- .build())
-
.withExpectedPubsubRead(PubsubIO.readMessages().fromTopic(TOPIC)),
- testCase(
- "configuration with subscription, timestamp and id
attributes",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setTimestampAttribute("timestampAttribute")
- .setIdAttribute("idAttribute")
- .setDataSchema(SCHEMA)
- .build())
- .withExpectedPubsubRead(
- PubsubIO.readMessages()
- .fromSubscription(SUBSCRIPTION)
- .withTimestampAttribute("timestampAttribute")
- .withIdAttribute("idAttribute")),
- testCase(
- "configuration with subscription and dead letter queue",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setDataSchema(SCHEMA)
- .setDeadLetterQueue(TOPIC)
- .build())
-
.withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION))
- .withExpectedDeadLetterQueue(PubsubIO.writeMessages().to(TOPIC)),
- testCase(
- "configuration with subscription, timestamp attribute, and
dead letter queue",
- PubsubReadSchemaTransformConfiguration.builder()
- .setSubscription(SUBSCRIPTION)
- .setTimestampAttribute("timestampAttribute")
- .setDataSchema(SCHEMA)
- .setDeadLetterQueue(TOPIC)
- .build())
- .withExpectedPubsubRead(
- PubsubIO.readMessages()
- .fromSubscription(SUBSCRIPTION)
- .withTimestampAttribute("timestampAttribute"))
- .withExpectedDeadLetterQueue(
-
PubsubIO.writeMessages().to(TOPIC).withTimestampAttribute("timestampAttribute")));
-
- private static final AutoValueSchema AUTO_VALUE_SCHEMA = new
AutoValueSchema();
- private static final TypeDescriptor<PubsubReadSchemaTransformConfiguration>
TYPE_DESCRIPTOR =
- TypeDescriptor.of(PubsubReadSchemaTransformConfiguration.class);
- private static final
SerializableFunction<PubsubReadSchemaTransformConfiguration, Row>
- ROW_SERIALIZABLE_FUNCTION =
AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
-
private static final List<Row> ROWS =
Arrays.asList(
- Row.withSchema(SCHEMA).withFieldValue("name",
"a").withFieldValue("number", 100L).build(),
- Row.withSchema(SCHEMA).withFieldValue("name",
"b").withFieldValue("number", 200L).build(),
- Row.withSchema(SCHEMA)
+ Row.withSchema(BEAM_SCHEMA)
+ .withFieldValue("name", "a")
+ .withFieldValue("number", 100L)
+ .build(),
+ Row.withSchema(BEAM_SCHEMA)
+ .withFieldValue("name", "b")
+ .withFieldValue("number", 200L)
+ .build(),
+ Row.withSchema(BEAM_SCHEMA)
.withFieldValue("name", "c")
.withFieldValue("number", 300L)
.build());
- private static final Clock CLOCK = (Clock & Serializable) () ->
1656788475425L;
+ private static final List<Row> ROWSWITHERROR =
+ Arrays.asList(
+ Row.withSchema(BEAM_SCHEMA_WITH_ERROR).withFieldValue("error",
"a").build(),
+ Row.withSchema(BEAM_SCHEMA_WITH_ERROR).withFieldValue("error",
"b").build(),
+ Row.withSchema(BEAM_SCHEMA_WITH_ERROR).withFieldValue("error",
"c").build());
+
+ private static final Clock CLOCK = (Clock & Serializable) () ->
1678988970000L;
private static final AvroPayloadSerializerProvider
AVRO_PAYLOAD_SERIALIZER_PROVIDER =
new AvroPayloadSerializerProvider();
private static final PayloadSerializer AVRO_PAYLOAD_SERIALIZER =
- AVRO_PAYLOAD_SERIALIZER_PROVIDER.getSerializer(SCHEMA, new HashMap<>());
+ AVRO_PAYLOAD_SERIALIZER_PROVIDER.getSerializer(BEAM_SCHEMA, new
HashMap<>());
+ private static final PayloadSerializer AVRO_PAYLOAD_SERIALIZER_WITH_ERROR =
+ AVRO_PAYLOAD_SERIALIZER_PROVIDER.getSerializer(BEAM_SCHEMA_WITH_ERROR,
new HashMap<>());
- @Rule public TestPipeline p =
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+ @Rule public transient TestPipeline p = TestPipeline.create();
@Test
- public void testBuildDeadLetterQueueWrite() {
- for (TestCase testCase : cases) {
- PubsubIO.Write<PubsubMessage> dlq =
- testCase.pubsubReadSchemaTransform().buildDeadLetterQueueWrite();
-
- if (testCase.expectedDeadLetterQueue == null) {
- assertNull(testCase.name, dlq);
- return;
- }
-
- Map<DisplayData.Identifier, DisplayData.Item> actual =
DisplayData.from(dlq).asMap();
- Map<DisplayData.Identifier, DisplayData.Item> expected =
testCase.expectedDeadLetterQueue;
-
- assertEquals(testCase.name, expected, actual);
- }
+ public void testInvalidConfigNoTopicOrSubscription() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ new PubsubReadSchemaTransformProvider()
+ .from(
+ PubsubReadSchemaTransformConfiguration.builder()
+ .setSchema(SCHEMA)
+ .setFormat("AVRO")
+ .build()));
}
@Test
- public void testReadAvro() throws IOException {
+ public void testInvalidConfigBothTopicAndSubscription() {
PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
- PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform transform =
- schemaTransformWithClock("avro");
- PubsubTestClient.PubsubTestClientFactory clientFactory =
- clientFactory(incomingAvroMessagesOf(CLOCK.currentTimeMillis()));
- transform.setClientFactory(clientFactory);
- PCollectionRowTuple reads = begin.apply(transform);
-
-
PAssert.that(reads.get(PubsubReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(ROWS);
-
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ begin.apply(
+ new PubsubReadSchemaTransformProvider()
+ .from(
+ PubsubReadSchemaTransformConfiguration.builder()
+ .setSchema(SCHEMA)
+ .setFormat("AVRO")
+ .setTopic(TOPIC)
+ .setSubscription(SUBSCRIPTION)
+ .build())));
p.run().waitUntilFinish();
- clientFactory.close();
}
@Test
- public void testReadJson() throws IOException {
+ public void testInvalidConfigInvalidFormat() {
PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
- PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform transform =
- schemaTransformWithClock("json");
- PubsubTestClient.PubsubTestClientFactory clientFactory =
- clientFactory(incomingJsonMessagesOf(CLOCK.currentTimeMillis()));
- transform.setClientFactory(clientFactory);
- PCollectionRowTuple reads = begin.apply(transform);
-
-
PAssert.that(reads.get(PubsubReadSchemaTransformProvider.OUTPUT_TAG)).containsInAnyOrder(ROWS);
-
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ begin.apply(
+ new PubsubReadSchemaTransformProvider()
+ .from(
+ PubsubReadSchemaTransformConfiguration.builder()
+ .setSchema(SCHEMA)
+ .setFormat("BadFormat")
+ .setSubscription(SUBSCRIPTION)
+ .build())));
p.run().waitUntilFinish();
-
- clientFactory.close();
- }
-
- @Test
- public void testBuildPubSubRead() {
- for (TestCase testCase : cases) {
- if (testCase.invalidConfigurationExpected) {
- continue;
- }
- Map<DisplayData.Identifier, DisplayData.Item> actual =
-
DisplayData.from(testCase.pubsubReadSchemaTransform().buildPubsubRead()).asMap();
-
- Map<DisplayData.Identifier, DisplayData.Item> expected =
testCase.expectedPubsubRead;
-
- assertEquals(testCase.name, expected, actual);
- }
- }
-
- @Test
- public void testInvalidConfiguration() {
- for (TestCase testCase : cases) {
- PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
- if (testCase.invalidConfigurationExpected) {
- assertThrows(
- testCase.name,
- RuntimeException.class,
- () -> begin.apply(testCase.pubsubReadSchemaTransform()));
- }
- }
}
@Test
- public void testInvalidInput() {
- PCollectionRowTuple begin = PCollectionRowTuple.of("BadInput",
p.apply(Create.of(ROWS)));
+ public void testNoSchema() {
+ PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
assertThrows(
- IllegalArgumentException.class,
+ IllegalStateException.class,
() ->
begin.apply(
new PubsubReadSchemaTransformProvider()
.from(
PubsubReadSchemaTransformConfiguration.builder()
- .setDataSchema(SCHEMA)
+ .setSubscription(SUBSCRIPTION)
+ .setFormat("AVRO")
.build())));
+ p.run().waitUntilFinish();
}
- private PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform
schemaTransformWithClock(
- String format) {
- PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform transform =
- (PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform)
- new PubsubReadSchemaTransformProvider()
- .from(
- PubsubReadSchemaTransformConfiguration.builder()
- .setDataSchema(SCHEMA)
- .setSubscription(SUBSCRIPTION)
- .setFormat(format)
- .build());
-
- transform.setClock(CLOCK);
+ @Test
+ public void testReadAvro() throws IOException {
+ PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
- return transform;
+ try (PubsubTestClientFactory clientFactory =
clientFactory(beamRowToMessage())) {
+ PubsubReadSchemaTransformConfiguration config =
+ PubsubReadSchemaTransformConfiguration.builder()
+ .setFormat("AVRO")
+ .setSchema(SCHEMA)
+ .setSubscription(SUBSCRIPTION)
+ .setClientFactory(clientFactory)
+ .setClock(CLOCK)
+ .build();
+ SchemaTransform transform = new
PubsubReadSchemaTransformProvider().from(config);
+ PCollectionRowTuple reads = begin.apply(transform);
+
+ PAssert.that(reads.get("output")).containsInAnyOrder(ROWS);
+
+ p.run().waitUntilFinish();
+ } catch (Exception e) {
+ throw e;
+ }
}
- private static PubsubTestClient.PubsubTestClientFactory clientFactory(
- List<PubsubClient.IncomingMessage> messages) {
- return PubsubTestClient.createFactoryForPull(
- CLOCK, PubsubClient.subscriptionPathFromPath(SUBSCRIPTION), 60,
messages);
- }
+ @Test
+ public void testReadAvroWithError() throws IOException {
+ PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
- private static List<PubsubClient.IncomingMessage>
incomingAvroMessagesOf(long millisSinceEpoch) {
- return ROWS.stream()
- .map(row -> incomingAvroMessageOf(row, millisSinceEpoch))
- .collect(Collectors.toList());
- }
+ try (PubsubTestClientFactory clientFactory =
clientFactory(beamRowToMessageWithError())) {
+ PubsubReadSchemaTransformConfiguration config =
+ PubsubReadSchemaTransformConfiguration.builder()
+ .setFormat("AVRO")
+ .setSchema(SCHEMA)
+ .setSubscription(SUBSCRIPTION)
+ .setClientFactory(clientFactory)
+ .setClock(CLOCK)
+ .build();
+ SchemaTransform transform = new
PubsubReadSchemaTransformProvider().from(config);
+ PCollectionRowTuple reads = begin.apply(transform);
+
+ PAssert.that(reads.get("output")).empty();
+
+ PipelineResult result = p.run();
+ result.waitUntilFinish();
+
+ MetricResults metrics = result.metrics();
+ MetricQueryResults metricResults =
+ metrics.queryMetrics(
+ MetricsFilter.builder()
+ .addNameFilter(
+ MetricNameFilter.named(
+ PubsubReadSchemaTransformProvider.class,
"PubSub-read-error-counter"))
+ .build());
+
+ Iterable<MetricResult<Long>> counters = metricResults.getCounters();
+ if (!counters.iterator().hasNext()) {
+ throw new RuntimeException("no counters available ");
+ }
- private static PubsubClient.IncomingMessage incomingAvroMessageOf(
- Row row, long millisSinceEpoch) {
- byte[] bytes = AVRO_PAYLOAD_SERIALIZER.serialize(row);
- return incomingMessageOf(bytes, millisSinceEpoch);
+ Long expectedCount = 3L;
+ for (MetricResult<Long> count : counters) {
+ assertEquals(expectedCount, count.getAttempted());
+ }
+ } catch (Exception e) {
+ throw e;
+ }
}
- private static List<PubsubClient.IncomingMessage>
incomingJsonMessagesOf(long millisSinceEpoch) {
- return PubsubReadSchemaTransformProviderTest.ROWS.stream()
- .map(row -> incomingJsonMessageOf(row, millisSinceEpoch))
+ private static List<PubsubClient.IncomingMessage> beamRowToMessage() {
+ long timestamp = CLOCK.currentTimeMillis();
+ return ROWS.stream()
+ .map(
+ row -> {
+ byte[] bytes = AVRO_PAYLOAD_SERIALIZER.serialize(row);
+ return incomingMessageOf(bytes, timestamp);
+ })
.collect(Collectors.toList());
}
- private static PubsubClient.IncomingMessage incomingJsonMessageOf(
- Row row, long millisSinceEpoch) {
- String name = Objects.requireNonNull(row.getString("name"));
- long number = Objects.requireNonNull(row.getInt64("number"));
- return incomingJsonMessageOf(name, number, millisSinceEpoch);
- }
-
- private static PubsubClient.IncomingMessage incomingJsonMessageOf(
- String name, long number, long millisSinceEpoch) {
- Gson gson = new Gson();
- JsonObject obj = new JsonObject();
- obj.add("name", new JsonPrimitive(name));
- obj.add("number", new JsonPrimitive(number));
- byte[] bytes = gson.toJson(obj).getBytes(StandardCharsets.UTF_8);
- return incomingMessageOf(bytes, millisSinceEpoch);
+ private static List<PubsubClient.IncomingMessage>
beamRowToMessageWithError() {
+ long timestamp = CLOCK.currentTimeMillis();
+ return ROWSWITHERROR.stream()
+ .map(
+ row -> {
+ byte[] bytes = AVRO_PAYLOAD_SERIALIZER_WITH_ERROR.serialize(row);
+ return incomingMessageOf(bytes, timestamp);
+ })
+ .collect(Collectors.toList());
}
private static PubsubClient.IncomingMessage incomingMessageOf(
@@ -329,51 +266,9 @@ public class PubsubReadSchemaTransformProviderTest {
UUID.randomUUID().toString());
}
- static TestCase testCase(String name, PubsubReadSchemaTransformConfiguration
configuration) {
- return new TestCase(name, configuration);
- }
-
- private static class TestCase {
-
- private final String name;
- private final PubsubReadSchemaTransformConfiguration configuration;
-
- private Map<DisplayData.Identifier, DisplayData.Item>
expectedDeadLetterQueue;
-
- private Map<DisplayData.Identifier, DisplayData.Item> expectedPubsubRead =
- DisplayData.from(PubsubIO.readMessages()).asMap();
-
- private boolean invalidConfigurationExpected = false;
-
- TestCase(String name, PubsubReadSchemaTransformConfiguration
configuration) {
- this.name = name;
- this.configuration = configuration;
- }
-
- PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform
pubsubReadSchemaTransform() {
- PubsubReadSchemaTransformProvider provider = new
PubsubReadSchemaTransformProvider();
- Row configurationRow = toBeamRow();
- return (PubsubReadSchemaTransformProvider.PubsubReadSchemaTransform)
- provider.from(configurationRow);
- }
-
- private Row toBeamRow() {
- return ROW_SERIALIZABLE_FUNCTION.apply(configuration);
- }
-
- TestCase withExpectedDeadLetterQueue(PubsubIO.Write<PubsubMessage> value) {
- this.expectedDeadLetterQueue = DisplayData.from(value).asMap();
- return this;
- }
-
- TestCase withExpectedPubsubRead(PubsubIO.Read<PubsubMessage> value) {
- this.expectedPubsubRead = DisplayData.from(value).asMap();
- return this;
- }
-
- TestCase expectInvalidConfiguration() {
- this.invalidConfigurationExpected = true;
- return this;
- }
+ private static PubsubTestClient.PubsubTestClientFactory clientFactory(
+ List<PubsubClient.IncomingMessage> messages) {
+ return PubsubTestClient.createFactoryForPull(
+ CLOCK, PubsubClient.subscriptionPathFromPath(SUBSCRIPTION), 60,
messages);
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java
deleted file mode 100644
index 709fc35e02a..00000000000
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformMessageToRowFactoryTest.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.ATTRIBUTES_FIELD;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.PAYLOAD_FIELD;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import
org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializerProvider;
-import org.apache.beam.sdk.values.Row;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Test for {@link PubsubSchemaTransformMessageToRowFactory}. */
-@RunWith(JUnit4.class)
-public class PubsubSchemaTransformMessageToRowFactoryTest {
-
- List<TestCase> cases =
- Arrays.asList(
-
testCase(PubsubReadSchemaTransformConfiguration.builder().setDataSchema(SCHEMA))
-
.expectPayloadSerializerProvider(JSON_PAYLOAD_SERIALIZER_PROVIDER)
- .withSerializerInput(),
-
testCase(PubsubReadSchemaTransformConfiguration.builder().setDataSchema(SCHEMA))
- .expectPubsubToRow(
- PubsubMessageToRow.builder()
- .messageSchema(SCHEMA)
- .useFlatSchema(true)
- .useDlq(false)),
- testCase(
- PubsubReadSchemaTransformConfiguration.builder()
- .setDataSchema(SCHEMA)
- .setDeadLetterQueue("projects/project/topics/topic"))
- .expectPubsubToRow(
- PubsubMessageToRow.builder()
- .messageSchema(SCHEMA)
- .useFlatSchema(true)
- .useDlq(true)),
- testCase(
- PubsubReadSchemaTransformConfiguration.builder()
- .setDataSchema(SCHEMA)
- .setFormat("avro"))
-
.expectPayloadSerializerProvider(AVRO_PAYLOAD_SERIALIZER_PROVIDER)
- .withSerializerInput(),
- testCase(
- PubsubReadSchemaTransformConfiguration.builder()
- .setDataSchema(Schema.of(ATTRIBUTES_FIELD_ARRAY)))
- .schemaShouldHaveValidAttributesField()
- .fieldShouldBePresent(
- ATTRIBUTES_FIELD_ARRAY.getName(),
ATTRIBUTES_FIELD_ARRAY.getType()),
- testCase(
- PubsubReadSchemaTransformConfiguration.builder()
- .setDataSchema(Schema.of(ATTRIBUTES_FIELD_MAP)))
- .schemaShouldHaveValidAttributesField()
- .fieldShouldBePresent(ATTRIBUTES_FIELD_MAP.getName(),
ATTRIBUTES_FIELD_MAP.getType()),
- testCase(
- PubsubReadSchemaTransformConfiguration.builder()
-
.setDataSchema(Schema.of(ATTRIBUTES_FIELD_SHOULD_NOT_MATCH))),
- testCase(
- PubsubReadSchemaTransformConfiguration.builder()
- .setDataSchema(Schema.of(PAYLOAD_FIELD_SHOULD_NOT_MATCH))),
- testCase(
- PubsubReadSchemaTransformConfiguration.builder()
- .setDataSchema(Schema.of(PAYLOAD_FIELD_BYTES)))
- .schemaShouldHaveValidPayloadField()
- .fieldShouldBePresent(PAYLOAD_FIELD_BYTES.getName(),
PAYLOAD_FIELD_BYTES.getType()),
- testCase(
- PubsubReadSchemaTransformConfiguration.builder()
- .setDataSchema(Schema.of(PAYLOAD_FIELD_ROW)))
- .schemaShouldHaveValidPayloadField()
- .fieldShouldBePresent(PAYLOAD_FIELD_ROW.getName(),
PAYLOAD_FIELD_ROW.getType()),
- testCase(
- PubsubReadSchemaTransformConfiguration.builder()
- .setDataSchema(Schema.of(ATTRIBUTES_FIELD_ARRAY,
PAYLOAD_FIELD_BYTES)))
- .schemaShouldHaveValidAttributesField()
- .schemaShouldHaveValidPayloadField()
- .shouldUseNestedSchema()
- .shouldNotNeedSerializer()
- .expectPubsubToRow(
- PubsubMessageToRow.builder()
- .messageSchema(Schema.of(ATTRIBUTES_FIELD_ARRAY,
PAYLOAD_FIELD_BYTES))
- .useFlatSchema(false)
- .useDlq(false)));
-
- static final Schema.FieldType ATTRIBUTE_MAP_FIELD_TYPE =
- Schema.FieldType.map(Schema.FieldType.STRING.withNullable(false),
Schema.FieldType.STRING);
- static final Schema ATTRIBUTE_ARRAY_ENTRY_SCHEMA =
- Schema.builder().addStringField("key").addStringField("value").build();
-
- static final Schema.FieldType ATTRIBUTE_ARRAY_FIELD_TYPE =
-
Schema.FieldType.array(Schema.FieldType.row(ATTRIBUTE_ARRAY_ENTRY_SCHEMA));
-
- private static final Schema.Field ATTRIBUTES_FIELD_SHOULD_NOT_MATCH =
- Schema.Field.of(ATTRIBUTES_FIELD, Schema.FieldType.STRING);
-
- private static final Schema.Field ATTRIBUTES_FIELD_MAP =
- Schema.Field.of(ATTRIBUTES_FIELD, ATTRIBUTE_MAP_FIELD_TYPE);
-
- private static final Schema.Field ATTRIBUTES_FIELD_ARRAY =
- Schema.Field.of(ATTRIBUTES_FIELD, ATTRIBUTE_ARRAY_FIELD_TYPE);
-
- private static final Schema.Field PAYLOAD_FIELD_SHOULD_NOT_MATCH =
- Schema.Field.of(PAYLOAD_FIELD, Schema.FieldType.STRING);
-
- private static final Schema.Field PAYLOAD_FIELD_BYTES =
- Schema.Field.of(PAYLOAD_FIELD, Schema.FieldType.BYTES);
-
- private static final Schema.Field PAYLOAD_FIELD_ROW =
- Schema.Field.of(PAYLOAD_FIELD, Schema.FieldType.row(Schema.of()));
-
- private static final PayloadSerializerProvider
JSON_PAYLOAD_SERIALIZER_PROVIDER =
- new JsonPayloadSerializerProvider();
-
- private static final AvroPayloadSerializerProvider
AVRO_PAYLOAD_SERIALIZER_PROVIDER =
- new AvroPayloadSerializerProvider();
-
- private static final Schema SCHEMA =
- Schema.of(
- Schema.Field.of("name", Schema.FieldType.STRING),
- Schema.Field.of("number", Schema.FieldType.INT64));
-
- private static final Row ROW =
- Row.withSchema(SCHEMA).withFieldValue("name",
"a").withFieldValue("number", 1L).build();
-
- @Test
- public void testBuildMessageToRow() {
- for (TestCase testCase : cases) {
- if (testCase.expectPubsubToRow == null) {
- continue;
- }
-
- PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
-
- PubsubMessageToRow expected = testCase.expectPubsubToRow;
- PubsubMessageToRow actual = factory.buildMessageToRow();
-
- assertEquals("messageSchema", expected.messageSchema(),
actual.messageSchema());
- assertEquals("useFlatSchema", expected.useFlatSchema(),
actual.useFlatSchema());
- assertEquals("useDlq", expected.useDlq(), actual.useDlq());
- }
- }
-
- @Test
- public void serializer() {
- for (TestCase testCase : cases) {
- PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
-
- if (testCase.expectPayloadSerializerProvider == null) {
- continue;
- }
-
- Row serializerInput = testCase.serializerInput;
-
- byte[] expectedBytes =
- testCase
- .expectSerializerProvider()
- .apply(testCase.dataSchema())
- .serialize(serializerInput);
-
- byte[] actualBytes =
-
factory.serializer().apply(testCase.dataSchema()).serialize(serializerInput);
-
- String expected = new String(expectedBytes, StandardCharsets.UTF_8);
- String actual = new String(actualBytes, StandardCharsets.UTF_8);
-
- assertEquals(expected, actual);
- }
- }
-
- @Test
- public void needsSerializer() {
- for (TestCase testCase : cases) {
- PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
-
- boolean expected = testCase.shouldNeedSerializer;
- boolean actual = factory.needsSerializer();
-
- assertEquals(expected, actual);
- }
- }
-
- @Test
- public void shouldUseNestedSchema() {
- for (TestCase testCase : cases) {
- PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
-
- boolean expected = testCase.shouldUseNestedSchema;
- boolean actual = factory.shouldUseNestedSchema();
-
- assertEquals(expected, actual);
- }
- }
-
- @Test
- public void schemaHasValidPayloadField() {
- for (TestCase testCase : cases) {
- PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
-
- boolean expected = testCase.shouldSchemaHaveValidPayloadField;
- boolean actual = factory.schemaHasValidPayloadField();
-
- assertEquals(expected, actual);
- }
- }
-
- @Test
- public void schemaHasValidAttributesField() {
- for (TestCase testCase : cases) {
- PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
-
- boolean expected = testCase.shouldSchemaHaveValidAttributesField;
- boolean actual = factory.schemaHasValidAttributesField();
-
- assertEquals(expected, actual);
- }
- }
-
- @Test
- public void fieldPresent() {
- for (TestCase testCase : cases) {
- PubsubSchemaTransformMessageToRowFactory factory = testCase.factory();
- for (Entry<String, FieldType> entry :
testCase.shouldFieldPresent.entrySet()) {
-
- boolean actual = factory.fieldPresent(entry.getKey(),
entry.getValue());
-
- assertTrue(actual);
- }
- }
- }
-
- static TestCase testCase(PubsubReadSchemaTransformConfiguration.Builder
configurationBuilder) {
- return new TestCase(configurationBuilder);
- }
-
- private static class TestCase {
- private final PubsubReadSchemaTransformConfiguration configuration;
-
- private PubsubMessageToRow expectPubsubToRow;
-
- private PayloadSerializerProvider expectPayloadSerializerProvider;
-
- private boolean shouldUseNestedSchema = false;
- private boolean shouldNeedSerializer = true;
- private boolean shouldSchemaHaveValidPayloadField = false;
- private boolean shouldSchemaHaveValidAttributesField = false;
- private final Map<String, FieldType> shouldFieldPresent = new HashMap<>();
-
- private Row serializerInput;
-
- TestCase(PubsubReadSchemaTransformConfiguration.Builder
configurationBuilder) {
- this.configuration = configurationBuilder.build();
- }
-
- PubsubSchemaTransformMessageToRowFactory factory() {
- return PubsubSchemaTransformMessageToRowFactory.from(configuration);
- }
-
- Schema dataSchema() {
- return configuration.getDataSchema();
- }
-
- TestCase expectPubsubToRow(PubsubMessageToRow.Builder
pubsubMessageToRowBuilder) {
- this.expectPubsubToRow = pubsubMessageToRowBuilder.build();
- return this;
- }
-
- TestCase withSerializerInput() {
- this.serializerInput = PubsubSchemaTransformMessageToRowFactoryTest.ROW;
- return this;
- }
-
- TestCase expectPayloadSerializerProvider(PayloadSerializerProvider value) {
- this.expectPayloadSerializerProvider = value;
- return this;
- }
-
- PubsubMessageToRow.SerializerProvider expectSerializerProvider() {
- Map<String, Object> params = new HashMap<>();
- PayloadSerializer payloadSerializer =
-
expectPayloadSerializerProvider.getSerializer(configuration.getDataSchema(),
params);
-
- return (input -> payloadSerializer);
- }
-
- TestCase shouldUseNestedSchema() {
- this.shouldUseNestedSchema = true;
- return this;
- }
-
- TestCase shouldNotNeedSerializer() {
- this.shouldNeedSerializer = false;
- return this;
- }
-
- TestCase schemaShouldHaveValidPayloadField() {
- this.shouldSchemaHaveValidPayloadField = true;
- return this;
- }
-
- TestCase schemaShouldHaveValidAttributesField() {
- this.shouldSchemaHaveValidAttributesField = true;
- return this;
- }
-
- TestCase fieldShouldBePresent(String name, Schema.FieldType expectedType) {
- this.shouldFieldPresent.put(name, expectedType);
- return this;
- }
- }
-}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderIT.java
deleted file mode 100644
index cb0e6ec03cc..00000000000
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderIT.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformConfiguration.DEFAULT_TIMESTAMP_ATTRIBUTE;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformProvider.INPUT_TAG;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.Field;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.values.PCollectionRowTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.apache.commons.lang3.tuple.Pair;
-import org.joda.time.Instant;
-import org.joda.time.format.ISODateTimeFormat;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-
-/** Integration tests for {@link PubsubWriteSchemaTransformProvider}. */
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-public class PubsubWriteSchemaTransformProviderIT {
-
- @Rule public transient TestPipeline pipeline = TestPipeline.create();
-
- private static final TestPubsubOptions TEST_PUBSUB_OPTIONS =
- TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class);
-
- static {
- TEST_PUBSUB_OPTIONS.setBlockOnRun(false);
- }
-
- private static final String HAS_NO_SCHEMA = "has-no-schema";
-
- private static PubsubClient pubsubClient;
-
- private static PubsubClient.TopicPath hasNoSchemaTopic;
-
- private static PubsubClient.SubscriptionPath hasNoSchemaSubscription;
-
- private static final Instant TIMESTAMP = Instant.now();
-
- private static final String RESOURCE_NAME_POSTFIX = "-" +
TIMESTAMP.getMillis();
-
- private static final int ACK_DEADLINE_SECONDS = 60;
-
- private static final int AWAIT_TERMINATED_SECONDS = 30;
-
- private static final AutoValueSchema AUTO_VALUE_SCHEMA = new
AutoValueSchema();
-
- private static final TypeDescriptor<PubsubWriteSchemaTransformConfiguration>
- CONFIGURATION_TYPE_DESCRIPTOR =
- TypeDescriptor.of(PubsubWriteSchemaTransformConfiguration.class);
-
- private static final
SerializableFunction<PubsubWriteSchemaTransformConfiguration, Row>
- TO_ROW_FN =
AUTO_VALUE_SCHEMA.toRowFunction(CONFIGURATION_TYPE_DESCRIPTOR);
-
- private final Field timestampField = Field.of("timestamp",
FieldType.DATETIME);
-
- private final Field payloadBytesField = Field.of("payload", FieldType.BYTES);
-
- @BeforeClass
- public static void setUp() throws IOException {
- String project = TEST_PUBSUB_OPTIONS.as(PubsubOptions.class).getProject();
- pubsubClient = PubsubGrpcClient.FACTORY.newClient(null, null,
TEST_PUBSUB_OPTIONS);
- hasNoSchemaTopic =
- PubsubClient.topicPathFromName(project, HAS_NO_SCHEMA +
RESOURCE_NAME_POSTFIX);
- hasNoSchemaSubscription =
- PubsubClient.subscriptionPathFromName(project, HAS_NO_SCHEMA +
RESOURCE_NAME_POSTFIX);
-
- pubsubClient.createTopic(hasNoSchemaTopic);
- pubsubClient.createSubscription(
- hasNoSchemaTopic, hasNoSchemaSubscription, ACK_DEADLINE_SECONDS);
- }
-
- @AfterClass
- public static void tearDown() throws IOException {
- pubsubClient.deleteSubscription(hasNoSchemaSubscription);
- pubsubClient.deleteTopic(hasNoSchemaTopic);
-
- pubsubClient.close();
- }
-
- @Test
- public void testWritePayloadBytes() throws IOException {
- Instant timestamp = Instant.ofEpochMilli(100000L);
- Schema schema = Schema.of(payloadBytesField, timestampField);
- List<Row> input =
- Collections.singletonList(
-
Row.withSchema(schema).attachValues("aaa".getBytes(StandardCharsets.UTF_8),
timestamp));
- Row configuration =
- TO_ROW_FN.apply(
- PubsubWriteSchemaTransformConfiguration.builder()
- .setSource(
-
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
- .setPayloadFieldName(payloadBytesField.getName())
- .setTimestampFieldName(timestampField.getName())
- .build())
- .setTopic(hasNoSchemaTopic.getPath())
- .setTarget(
-
PubsubWriteSchemaTransformConfiguration.targetConfigurationBuilder().build())
- .build());
-
- PCollectionRowTuple.of(INPUT_TAG,
pipeline.apply(Create.of(input).withRowSchema(schema)))
- .apply(new PubsubWriteSchemaTransformProvider().from(configuration));
-
- PipelineResult job = pipeline.run(TEST_PUBSUB_OPTIONS);
- Instant now = Instant.now();
- Instant stop = Instant.ofEpochMilli(now.getMillis() +
AWAIT_TERMINATED_SECONDS * 1000);
- List<Pair<String, Map<String, String>>> actualList = new ArrayList<>();
- while (now.isBefore(stop)) {
- List<IncomingMessage> received = pubsubClient.pull(0,
hasNoSchemaSubscription, 1, true);
- for (IncomingMessage incoming : received) {
- actualList.add(
- Pair.of(
- incoming.message().getData().toStringUtf8(),
- ImmutableMap.of(
- DEFAULT_TIMESTAMP_ATTRIBUTE,
- incoming
- .message()
- .getAttributesMap()
- .getOrDefault(DEFAULT_TIMESTAMP_ATTRIBUTE, ""))));
- }
- if (actualList.size() == input.size()) {
- break;
- }
- now = Instant.now();
- }
- job.cancel();
- assertFalse(
- String.format(
- "messages pulled from %s should not be empty",
hasNoSchemaSubscription.getPath()),
- actualList.isEmpty());
- Pair<String, Map<String, String>> actual = actualList.get(0);
- Row expected = input.get(0);
- String payload =
- new String(
-
Objects.requireNonNull(expected.getBytes(payloadBytesField.getName())),
- StandardCharsets.UTF_8);
- assertEquals(payload, actual.getLeft());
- assertEquals(
- ISODateTimeFormat.dateTime().print(timestamp),
- actual.getRight().get(DEFAULT_TIMESTAMP_ATTRIBUTE));
- }
-}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderTest.java
deleted file mode 100644
index 98939f7ddc6..00000000000
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderTest.java
+++ /dev/null
@@ -1,786 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.pubsub;
-
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.ATTRIBUTES_FIELD_TYPE;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_ATTRIBUTES_KEY_NAME;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_EVENT_TIMESTAMP_KEY_NAME;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.DEFAULT_PAYLOAD_KEY_NAME;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessage.EVENT_TIMESTAMP_FIELD_TYPE;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessageTest.ALL_DATA_TYPES_SCHEMA;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessageTest.NON_USER_WITH_BYTES_PAYLOAD;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubRowToMessageTest.rowWithAllDataTypes;
-import static
org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformProvider.INPUT_TAG;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
-
-import com.google.api.client.util.Clock;
-import java.io.IOException;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import org.apache.avro.SchemaParseException;
-import
org.apache.beam.sdk.extensions.avro.schemas.io.payloads.AvroPayloadSerializerProvider;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SchemaPath;
-import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
-import
org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient.PubsubTestClientFactory;
-import
org.apache.beam.sdk.io.gcp.pubsub.PubsubWriteSchemaTransformProvider.PubsubWriteSchemaTransform;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.schemas.AutoValueSchema;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.Field;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.io.payloads.JsonPayloadSerializerProvider;
-import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.util.RowJson.UnsupportedRowJsonException;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionRowTuple;
-import org.apache.beam.sdk.values.Row;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link PubsubWriteSchemaTransformProvider}. */
-@RunWith(JUnit4.class)
-public class PubsubWriteSchemaTransformProviderTest {
-
- private static final String ID_ATTRIBUTE = "id_attribute";
- private static final String TOPIC = "projects/project/topics/topic";
- private static final MockClock CLOCK = new MockClock(Instant.now());
- private static final AutoValueSchema AUTO_VALUE_SCHEMA = new
AutoValueSchema();
- private static final TypeDescriptor<PubsubWriteSchemaTransformConfiguration>
TYPE_DESCRIPTOR =
- TypeDescriptor.of(PubsubWriteSchemaTransformConfiguration.class);
- private static final
SerializableFunction<PubsubWriteSchemaTransformConfiguration, Row> TO_ROW =
- AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
-
- private static final PipelineOptions OPTIONS =
PipelineOptionsFactory.create();
-
- static {
- OPTIONS.setStableUniqueNames(PipelineOptions.CheckEnabled.OFF);
- }
-
- @Rule public transient TestPipeline pipeline = TestPipeline.create();
-
- @Test
- public void testBuildPubsubWrite() {
- assertEquals(
- "default configuration should yield a topic Pub/Sub write",
- pubsubWrite(),
- transform(configurationBuilder()).buildPubsubWrite());
-
- assertEquals(
- "idAttribute in configuration should yield a idAttribute set Pub/Sub
write",
- pubsubWrite().withIdAttribute(ID_ATTRIBUTE),
-
transform(configurationBuilder().setIdAttribute(ID_ATTRIBUTE)).buildPubsubWrite());
- }
-
- @Test
- public void testBuildPubsubRowToMessage() {
- assertEquals(
- "override timestamp attribute on configuration should yield a
PubsubRowToMessage with target timestamp",
-
rowToMessageBuilder().setTargetTimestampAttributeName("custom_timestamp_attribute").build(),
- transform(
- configurationBuilder()
- .setTarget(
-
PubsubWriteSchemaTransformConfiguration.targetConfigurationBuilder()
-
.setTimestampAttributeKey("custom_timestamp_attribute")
- .build()))
- .buildPubsubRowToMessage(NON_USER_WITH_BYTES_PAYLOAD));
-
- assertNull(
- "failing to set format should yield a null payload serializer",
- transform(configurationBuilder())
- .buildPubsubRowToMessage(ALL_DATA_TYPES_SCHEMA)
- .getPayloadSerializer());
-
- assertThrows(
- "setting 'json' format for a unsupported field containing Schema
should throw an Exception",
- UnsupportedRowJsonException.class,
- () ->
- transform(configurationBuilder().setFormat("json"))
- .buildPubsubRowToMessage(
- Schema.of(Field.of(DEFAULT_ATTRIBUTES_KEY_NAME,
ATTRIBUTES_FIELD_TYPE))));
-
- assertThrows(
- "setting 'avro' format for a unsupported field containing Schema
should throw an Exception",
- SchemaParseException.class,
- () ->
- transform(configurationBuilder().setFormat("avro"))
- .buildPubsubRowToMessage(
- Schema.of(Field.of(DEFAULT_ATTRIBUTES_KEY_NAME,
ATTRIBUTES_FIELD_TYPE))));
-
- assertNotNull(
- "setting 'json' format for valid schema should yield
PayloadSerializer",
- transform(configurationBuilder().setFormat("json"))
- .buildPubsubRowToMessage(ALL_DATA_TYPES_SCHEMA)
- .getPayloadSerializer());
-
- assertNotNull(
- "setting 'avro' format for valid schema should yield
PayloadSerializer",
- transform(configurationBuilder().setFormat("avro"))
- .buildPubsubRowToMessage(ALL_DATA_TYPES_SCHEMA)
- .getPayloadSerializer());
- }
-
- @Test
- public void testInvalidTaggedInput() {
- Row withAllDataTypes =
- rowWithAllDataTypes(
- true,
- (byte) 0,
- Instant.now().toDateTime(),
- BigDecimal.valueOf(1L),
- 3.12345,
- 4.1f,
- (short) 5,
- 2,
- 7L,
- "asdfjkl;");
-
- PCollection<Row> rows =
-
pipeline.apply(Create.of(withAllDataTypes)).setRowSchema(ALL_DATA_TYPES_SCHEMA);
-
- assertThrows(
- "empty input should not be allowed",
- IllegalArgumentException.class,
- () ->
transform(configurationBuilder()).expand(PCollectionRowTuple.empty(pipeline)));
-
- assertThrows(
- "input with >1 tagged rows should not be allowed",
- IllegalArgumentException.class,
- () ->
- transform(configurationBuilder())
- .expand(PCollectionRowTuple.of(INPUT_TAG,
rows).and("somethingelse", rows)));
-
- assertThrows(
- "input missing INPUT tag should not be allowed",
- IllegalArgumentException.class,
- () ->
- transform(configurationBuilder())
- .expand(PCollectionRowTuple.of("somethingelse", rows)));
-
- pipeline.run(OPTIONS);
- }
-
- @Test
- public void testValidateSourceSchemaAgainstConfiguration() {
- // Only containing user fields and no configuration details should be valid
- transform(configurationBuilder())
- .validateSourceSchemaAgainstConfiguration(ALL_DATA_TYPES_SCHEMA);
-
- // Matching attributes, timestamp, and payload (bytes) fields configured
with expected types
- // should be valid
- transform(
- configurationBuilder()
- .setSource(
-
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
- .setAttributesFieldName("attributes")
- .setTimestampFieldName("timestamp")
- .setPayloadFieldName("payload")
- .build()))
- .validateSourceSchemaAgainstConfiguration(
- Schema.of(
- Field.of("attributes", ATTRIBUTES_FIELD_TYPE),
- Field.of("timestamp", EVENT_TIMESTAMP_FIELD_TYPE),
- Field.of("payload", Schema.FieldType.BYTES)));
-
- // Matching attributes, timestamp, and payload (ROW) fields configured
with expected types
- // should be valid
- transform(
- configurationBuilder()
- .setSource(
-
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
- .setAttributesFieldName("attributes")
- .setTimestampFieldName("timestamp")
- .setPayloadFieldName("payload")
- .build()))
- .validateSourceSchemaAgainstConfiguration(
- Schema.of(
- Field.of("attributes", ATTRIBUTES_FIELD_TYPE),
- Field.of("timestamp", EVENT_TIMESTAMP_FIELD_TYPE),
- Field.of("payload",
Schema.FieldType.row(ALL_DATA_TYPES_SCHEMA))));
-
- assertThrows(
- "empty Schema should be invalid",
- IllegalArgumentException.class,
- () ->
- transform(configurationBuilder())
- .validateSourceSchemaAgainstConfiguration(Schema.of()));
-
- assertThrows(
- "attributes field in configuration but not in schema should be
invalid",
- IllegalArgumentException.class,
- () ->
- transform(
- configurationBuilder()
- .setSource(
-
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
- .setAttributesFieldName("attributes")
- .build()))
-
.validateSourceSchemaAgainstConfiguration(ALL_DATA_TYPES_SCHEMA));
-
- assertThrows(
- "timestamp field in configuration but not in schema should be invalid",
- IllegalArgumentException.class,
- () ->
- transform(
- configurationBuilder()
- .setSource(
-
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
- .setTimestampFieldName("timestamp")
- .build()))
-
.validateSourceSchemaAgainstConfiguration(ALL_DATA_TYPES_SCHEMA));
-
- assertThrows(
- "payload field in configuration but not in schema should be invalid",
- IllegalArgumentException.class,
- () ->
- transform(
- configurationBuilder()
- .setSource(
-
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
- .setPayloadFieldName("payload")
- .build()))
-
.validateSourceSchemaAgainstConfiguration(ALL_DATA_TYPES_SCHEMA));
-
- assertThrows(
- "attributes field in configuration but mismatching attributes type
should be invalid",
- IllegalArgumentException.class,
- () ->
- transform(
- configurationBuilder()
- .setSource(
-
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
- .setAttributesFieldName("attributes")
- .build()))
- .validateSourceSchemaAgainstConfiguration(
- // should be FieldType.map(FieldType.STRING,
FieldType.STRING)
- Schema.of(
- Field.of("attributes", FieldType.map(FieldType.BYTES,
FieldType.STRING)))));
-
- assertThrows(
- "timestamp field in configuration but mismatching timestamp type
should be invalid",
- IllegalArgumentException.class,
- () ->
- transform(
- configurationBuilder()
- .setSource(
-
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
- .setAttributesFieldName("timestamp")
- .build()))
- .validateSourceSchemaAgainstConfiguration(
- // should be FieldType.DATETIME
- Schema.of(Field.of("timestamp", FieldType.STRING))));
-
- assertThrows(
- "payload field in configuration but mismatching payload type should be
invalid",
- IllegalArgumentException.class,
- () ->
- transform(
- configurationBuilder()
- .setSource(
-
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
- .setAttributesFieldName("payload")
- .build()))
- .validateSourceSchemaAgainstConfiguration(
- // should be FieldType.BYTES or FieldType.row(...)
- Schema.of(Field.of("payload", FieldType.STRING))));
- }
-
- @Test
- public void testValidateTargetSchemaAgainstPubsubSchema() throws IOException
{
- TopicPath topicPath = PubsubClient.topicPathFromPath(TOPIC);
- PubsubTestClientFactory noSchemaFactory =
- PubsubTestClient.createFactoryForGetSchema(topicPath, null, null);
-
- PubsubTestClientFactory schemaDeletedFactory =
- PubsubTestClient.createFactoryForGetSchema(topicPath,
SchemaPath.DELETED_SCHEMA, null);
-
- PubsubTestClientFactory mismatchingSchemaFactory =
- PubsubTestClient.createFactoryForGetSchema(
- topicPath,
- PubsubClient.schemaPathFromId("testProject", "misMatch"),
- Schema.of(Field.of("StringField", FieldType.STRING)));
-
- PubsubTestClientFactory matchingSchemaFactory =
- PubsubTestClient.createFactoryForGetSchema(
- topicPath,
- PubsubClient.schemaPathFromId("testProject", "match"),
- ALL_DATA_TYPES_SCHEMA);
-
- // Should pass validation exceptions if Pub/Sub topic lacks schema
- transform(configurationBuilder())
- .withPubsubClientFactory(noSchemaFactory)
- .validateTargetSchemaAgainstPubsubSchema(ALL_DATA_TYPES_SCHEMA,
OPTIONS);
- noSchemaFactory.close();
-
- // Should pass validation if Pub/Sub topic schema deleted
- transform(configurationBuilder())
- .withPubsubClientFactory(schemaDeletedFactory)
- .validateTargetSchemaAgainstPubsubSchema(ALL_DATA_TYPES_SCHEMA,
OPTIONS);
- schemaDeletedFactory.close();
-
- assertThrows(
- "mismatched schema should be detected from Pub/Sub topic",
- IllegalStateException.class,
- () ->
- transform(configurationBuilder())
- .withPubsubClientFactory(mismatchingSchemaFactory)
-
.validateTargetSchemaAgainstPubsubSchema(ALL_DATA_TYPES_SCHEMA, OPTIONS));
- mismatchingSchemaFactory.close();
-
- // Should pass validation if Pub/Sub topic schema matches
- transform(configurationBuilder())
- .withPubsubClientFactory(matchingSchemaFactory)
- .validateTargetSchemaAgainstPubsubSchema(ALL_DATA_TYPES_SCHEMA,
OPTIONS);
- matchingSchemaFactory.close();
- }
-
- @Test
- public void testBuildTargetSchema() {
-
- Field sourceAttributesField = Field.of("attributes",
ATTRIBUTES_FIELD_TYPE);
- Field sourceTimestampField = Field.of("timestamp",
EVENT_TIMESTAMP_FIELD_TYPE);
- Field sourcePayloadBytesField = Field.of("payload", FieldType.BYTES);
- Field sourcePayloadRowField = Field.of("payload",
FieldType.row(ALL_DATA_TYPES_SCHEMA));
-
- Field targetAttributesField = Field.of(DEFAULT_ATTRIBUTES_KEY_NAME,
ATTRIBUTES_FIELD_TYPE);
- Field targetTimestampField =
- Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME, EVENT_TIMESTAMP_FIELD_TYPE);
- Field targetPayloadBytesField = Field.of(DEFAULT_PAYLOAD_KEY_NAME,
FieldType.BYTES);
- Field targetPayloadRowField =
- Field.of(DEFAULT_PAYLOAD_KEY_NAME,
FieldType.row(ALL_DATA_TYPES_SCHEMA));
-
- assertEquals(
- "attributes and timestamp field should append to user fields",
- Schema.builder()
- .addField(targetAttributesField)
- .addField(targetTimestampField)
- .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
- .build(),
- transform(
- configurationBuilder()
- .setSource(
-
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
- .build()))
- .buildTargetSchema(ALL_DATA_TYPES_SCHEMA));
-
- assertEquals(
- "timestamp field should append to user fields; attributes field name
changed",
- Schema.builder()
- .addField(targetAttributesField)
- .addField(targetTimestampField)
- .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
- .build(),
- transform(
- configurationBuilder()
- .setSource(
-
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
- .setAttributesFieldName("attributes")
- .build()))
- .buildTargetSchema(
- Schema.builder()
- .addField(sourceAttributesField)
- .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
- .build()));
-
- assertEquals(
- "attributes field should append to user fields; timestamp field name
changed",
- Schema.builder()
- .addField(targetAttributesField)
- .addField(targetTimestampField)
- .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
- .build(),
- transform(
- configurationBuilder()
- .setSource(
-
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
- .setTimestampFieldName("timestamp")
- .build()))
- .buildTargetSchema(
- Schema.builder()
- .addField(sourceTimestampField)
- .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
- .build()));
-
- assertEquals(
- "attributes and timestamp field appended to user payload bytes field;
payload field name changed",
- Schema.builder()
- .addField(targetAttributesField)
- .addField(targetTimestampField)
- .addField(targetPayloadBytesField)
- .build(),
- transform(
- configurationBuilder()
- .setSource(
-
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
- .setPayloadFieldName("payload")
- .build()))
-
.buildTargetSchema(Schema.builder().addField(sourcePayloadBytesField).build()));
-
- assertEquals(
- "attributes and timestamp field appended to user payload row field;
payload field name changed",
- Schema.builder()
- .addField(targetAttributesField)
- .addField(targetTimestampField)
- .addField(targetPayloadRowField)
- .build(),
- transform(
- configurationBuilder()
- .setSource(
-
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
- .setPayloadFieldName("payload")
- .build()))
-
.buildTargetSchema(Schema.builder().addField(sourcePayloadRowField).build()));
-
- assertEquals(
- "attributes and timestamp fields name changed",
- Schema.builder()
- .addField(targetAttributesField)
- .addField(targetTimestampField)
- .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
- .build(),
- transform(
- configurationBuilder()
- .setSource(
-
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
- .setAttributesFieldName("attributes")
- .setTimestampFieldName("timestamp")
- .build()))
- .buildTargetSchema(
- Schema.builder()
- .addField(sourceAttributesField)
- .addField(sourceTimestampField)
- .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
- .build()));
-
- assertEquals(
- "attributes, timestamp, payload bytes fields name changed",
- Schema.builder()
- .addField(targetAttributesField)
- .addField(targetTimestampField)
- .addFields(targetPayloadBytesField)
- .build(),
- transform(
- configurationBuilder()
- .setSource(
-
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
- .setAttributesFieldName("attributes")
- .setTimestampFieldName("timestamp")
- .setPayloadFieldName("payload")
- .build()))
- .buildTargetSchema(
- Schema.builder()
- .addField(sourceAttributesField)
- .addField(sourceTimestampField)
- .addField(sourcePayloadBytesField)
- .build()));
-
- assertEquals(
- "attributes, timestamp, payload row fields name changed",
- Schema.builder()
- .addField(targetAttributesField)
- .addField(targetTimestampField)
- .addFields(targetPayloadRowField)
- .build(),
- transform(
- configurationBuilder()
- .setSource(
-
PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder()
- .setAttributesFieldName("attributes")
- .setTimestampFieldName("timestamp")
- .setPayloadFieldName("payload")
- .build()))
- .buildTargetSchema(
- Schema.builder()
- .addField(sourceAttributesField)
- .addField(sourceTimestampField)
- .addField(sourcePayloadRowField)
- .build()));
- }
-
- @Test
- public void testConvertForRowToMessageTransform() {
- Row userRow =
- rowWithAllDataTypes(
- false,
- (byte) 0,
- Instant.ofEpochMilli(CLOCK.currentTimeMillis()).toDateTime(),
- BigDecimal.valueOf(1L),
- 1.12345,
- 1.1f,
- (short) 1,
- 1,
- 1L,
- "吃葡萄不吐葡萄皮,不吃葡萄倒吐葡萄皮");
-
- Field sourceAttributes = Field.of("attributes", ATTRIBUTES_FIELD_TYPE);
- Field targetAttributes = Field.of(DEFAULT_ATTRIBUTES_KEY_NAME,
ATTRIBUTES_FIELD_TYPE);
-
- Field sourceTimestamp = Field.of("timestamp", EVENT_TIMESTAMP_FIELD_TYPE);
- Field targetTimestamp = Field.of(DEFAULT_EVENT_TIMESTAMP_KEY_NAME,
EVENT_TIMESTAMP_FIELD_TYPE);
-
- Field sourcePayloadBytes = Field.of("payload", FieldType.BYTES);
- Field targetPayloadBytes = Field.of(DEFAULT_PAYLOAD_KEY_NAME,
FieldType.BYTES);
-
- Field sourcePayloadRow = Field.of("payload",
FieldType.row(ALL_DATA_TYPES_SCHEMA));
- Field targetPayloadRow =
- Field.of(DEFAULT_PAYLOAD_KEY_NAME,
FieldType.row(ALL_DATA_TYPES_SCHEMA));
-
- Map<String, String> attributes = ImmutableMap.of("a", "1");
- Instant generatedTimestamp =
Instant.ofEpochMilli(CLOCK.currentTimeMillis());
- Instant timestampFromSource =
Instant.ofEpochMilli(CLOCK.currentTimeMillis() + 10000L);
- byte[] payloadBytes =
"吃葡萄不吐葡萄皮,不吃葡萄倒吐葡萄皮".getBytes(StandardCharsets.UTF_8);
-
- PAssert.that(
- "attributes only source yields attributes + timestamp target",
- pipeline
- .apply(
-
Create.of(Row.withSchema(Schema.of(sourceAttributes)).attachValues(attributes)))
- .setRowSchema(Schema.of(sourceAttributes))
- .apply(
- transform(
- configurationBuilder()
- .setSource(
- PubsubWriteSchemaTransformConfiguration
- .sourceConfigurationBuilder()
-
.setAttributesFieldName(sourceAttributes.getName())
- .build()))
- .convertForRowToMessage(
- Schema.of(targetAttributes, targetTimestamp),
CLOCK))
- .setRowSchema(Schema.of(targetAttributes, targetTimestamp)))
- .containsInAnyOrder(
- Row.withSchema(Schema.of(targetAttributes, targetTimestamp))
- .attachValues(attributes, generatedTimestamp));
-
- PAssert.that(
- "timestamp only source yields attributes + timestamp target",
- pipeline
- .apply(
- Create.of(
- Row.withSchema(Schema.of(sourceTimestamp))
- .attachValues(timestampFromSource)))
- .setRowSchema(Schema.of(sourceTimestamp))
- .apply(
- transform(
- configurationBuilder()
- .setSource(
- PubsubWriteSchemaTransformConfiguration
- .sourceConfigurationBuilder()
-
.setTimestampFieldName(sourceTimestamp.getName())
- .build()))
- .convertForRowToMessage(
- Schema.of(targetAttributes, targetTimestamp),
CLOCK))
- .setRowSchema(Schema.of(targetAttributes, targetTimestamp)))
- .containsInAnyOrder(
- Row.withSchema(Schema.of(targetAttributes, targetTimestamp))
- .attachValues(ImmutableMap.of(), timestampFromSource));
-
- PAssert.that(
- "timestamp and attributes source yields renamed fields in target",
- pipeline
- .apply(
- Create.of(
- Row.withSchema(Schema.of(sourceAttributes,
sourceTimestamp))
- .attachValues(attributes, timestampFromSource)))
- .setRowSchema(Schema.of(sourceAttributes, sourceTimestamp))
- .apply(
- transform(
- configurationBuilder()
- .setSource(
- PubsubWriteSchemaTransformConfiguration
- .sourceConfigurationBuilder()
-
.setAttributesFieldName(sourceAttributes.getName())
-
.setTimestampFieldName(sourceTimestamp.getName())
- .build()))
- .convertForRowToMessage(
- Schema.of(targetAttributes, targetTimestamp),
CLOCK))
- .setRowSchema(Schema.of(targetAttributes, targetTimestamp)))
- .containsInAnyOrder(
- Row.withSchema(Schema.of(targetAttributes, targetTimestamp))
- .attachValues(attributes, timestampFromSource));
-
- PAssert.that(
- "bytes payload only source yields attributes + timestamp + renamed
bytes payload target",
- pipeline
- .apply(
- Create.of(
- Row.withSchema(Schema.of(sourcePayloadBytes))
- .withFieldValue(sourcePayloadBytes.getName(),
payloadBytes)
- .build()))
- .setRowSchema(Schema.of(sourcePayloadBytes))
- .apply(
- transform(
- configurationBuilder()
- .setSource(
- PubsubWriteSchemaTransformConfiguration
- .sourceConfigurationBuilder()
-
.setPayloadFieldName(sourcePayloadBytes.getName())
- .build()))
- .convertForRowToMessage(
- Schema.of(targetAttributes, targetTimestamp,
targetPayloadBytes),
- CLOCK))
- .setRowSchema(Schema.of(targetAttributes, targetTimestamp,
targetPayloadBytes)))
- .containsInAnyOrder(
- Row.withSchema(Schema.of(targetAttributes, targetTimestamp,
targetPayloadBytes))
- .attachValues(ImmutableMap.of(), generatedTimestamp,
payloadBytes));
-
- PAssert.that(
- "row payload only source yields attributes + timestamp + renamed
row payload target",
- pipeline
-
.apply(Create.of(Row.withSchema(Schema.of(sourcePayloadRow)).attachValues(userRow)))
- .setRowSchema(Schema.of(sourcePayloadRow))
- .apply(
- transform(
- configurationBuilder()
- .setSource(
- PubsubWriteSchemaTransformConfiguration
- .sourceConfigurationBuilder()
-
.setPayloadFieldName(sourcePayloadRow.getName())
- .build()))
- .convertForRowToMessage(
- Schema.of(targetAttributes, targetTimestamp,
targetPayloadRow), CLOCK))
- .setRowSchema(Schema.of(targetAttributes, targetTimestamp,
targetPayloadRow)))
- .containsInAnyOrder(
- Row.withSchema(Schema.of(targetAttributes, targetTimestamp,
targetPayloadRow))
- .attachValues(ImmutableMap.of(), generatedTimestamp, userRow));
-
- PAssert.that(
- "user only fields source yields attributes + timestamp + user
fields target",
- pipeline
- .apply(Create.of(userRow))
- .setRowSchema(ALL_DATA_TYPES_SCHEMA)
- .apply(
- transform(configurationBuilder())
- .convertForRowToMessage(
- Schema.builder()
- .addField(targetAttributes)
- .addField(targetTimestamp)
- .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
- .build(),
- CLOCK))
- .setRowSchema(
- Schema.builder()
- .addField(targetAttributes)
- .addField(targetTimestamp)
- .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
- .build()))
- .containsInAnyOrder(
- Row.withSchema(
- Schema.builder()
- .addField(targetAttributes)
- .addField(targetTimestamp)
- .addFields(ALL_DATA_TYPES_SCHEMA.getFields())
- .build())
- .addValue(ImmutableMap.of())
- .addValue(generatedTimestamp)
- .addValues(userRow.getValues())
- .build());
-
- pipeline.run(OPTIONS);
- }
-
- @Test
- public void testGetPayloadSerializer() {
- Row withAllDataTypes =
- rowWithAllDataTypes(
- false,
- (byte) 0,
- Instant.now().toDateTime(),
- BigDecimal.valueOf(-1L),
- -3.12345,
- -4.1f,
- (short) -5,
- -2,
- -7L,
- "吃葡萄不吐葡萄皮,不吃葡萄倒吐葡萄皮");
-
- PayloadSerializer jsonPayloadSerializer =
- new
JsonPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA,
ImmutableMap.of());
- byte[] expectedJson = jsonPayloadSerializer.serialize(withAllDataTypes);
- byte[] actualJson =
- transform(configurationBuilder().setFormat("json"))
- .getPayloadSerializer(ALL_DATA_TYPES_SCHEMA)
- .serialize(withAllDataTypes);
-
- PayloadSerializer avroPayloadSerializer =
- new
AvroPayloadSerializerProvider().getSerializer(ALL_DATA_TYPES_SCHEMA,
ImmutableMap.of());
- byte[] expectedAvro = avroPayloadSerializer.serialize(withAllDataTypes);
- byte[] actualAvro =
- transform(configurationBuilder().setFormat("avro"))
- .getPayloadSerializer(ALL_DATA_TYPES_SCHEMA)
- .serialize(withAllDataTypes);
-
- assertArrayEquals(
- "configuration with json format should yield JSON PayloadSerializer",
- expectedJson,
- actualJson);
-
- assertArrayEquals(
- "configuration with avro format should yield Avro PayloadSerializer",
- expectedAvro,
- actualAvro);
- }
-
- private static PubsubWriteSchemaTransformConfiguration.Builder
configurationBuilder() {
- return PubsubWriteSchemaTransformConfiguration.builder()
- .setTopic(TOPIC)
-
.setTarget(PubsubWriteSchemaTransformConfiguration.targetConfigurationBuilder().build());
- }
-
- private static PubsubRowToMessage.Builder rowToMessageBuilder() {
- return PubsubRowToMessage.builder();
- }
-
- private static PubsubIO.Write<PubsubMessage> pubsubWrite() {
- return PubsubIO.writeMessages().to(TOPIC);
- }
-
- private static PubsubWriteSchemaTransformProvider.PubsubWriteSchemaTransform
transform(
- PubsubWriteSchemaTransformConfiguration.Builder configurationBuilder) {
- Row configurationRow = TO_ROW.apply(configurationBuilder.build());
- PubsubWriteSchemaTransformProvider provider = new
PubsubWriteSchemaTransformProvider();
- return (PubsubWriteSchemaTransform) provider.from(configurationRow);
- }
-
- private static class MockClock implements Clock, Serializable {
- private final Long millis;
-
- private MockClock(Instant timestamp) {
- this.millis = timestamp.getMillis();
- }
-
- @Override
- public long currentTimeMillis() {
- return millis;
- }
- }
-}