This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e83ef734219 Pub/Sub Schema Transform Read Provider (#22145)
e83ef734219 is described below
commit e83ef734219590ce8193e457b042e66f88c5f758
Author: Damon <[email protected]>
AuthorDate: Mon Aug 8 21:42:17 2022 +0000
Pub/Sub Schema Transform Read Provider (#22145)
* WIP Implement PubsubSchemaTransformReadProvider
* Complete PubsubSchemaTransformReadProvider test coverage
* Override validate
* Patch unused code
* Reorganize code for readability
* Add suppress nullness warning
---
.../pubsub/PubsubSchemaTransformReadProvider.java | 247 +++++++++++++
.../PubsubSchemaTransformReadProviderTest.java | 386 +++++++++++++++++++++
2 files changed, 633 insertions(+)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProvider.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProvider.java
new file mode 100644
index 00000000000..c4399f1c21d
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProvider.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.DLQ_TAG;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.MAIN_TAG;
+
+import com.google.api.client.util.Clock;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for Pub/Sub reads
configured using
+ * {@link PubsubSchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class PubsubSchemaTransformReadProvider
+ extends
TypedSchemaTransformProvider<PubsubSchemaTransformReadConfiguration> {
+ private static final String API = "pubsub";
+ static final String OUTPUT_TAG = "OUTPUT";
+
+ /** Returns the expected class of the configuration. */
+ @Override
+ protected Class<PubsubSchemaTransformReadConfiguration> configurationClass()
{
+ return PubsubSchemaTransformReadConfiguration.class;
+ }
+
+ /** Returns the expected {@link SchemaTransform} of the configuration. */
+ @Override
+ protected SchemaTransform from(PubsubSchemaTransformReadConfiguration
configuration) {
+ PubsubMessageToRow toRowTransform =
+
PubsubSchemaTransformMessageToRowFactory.from(configuration).buildMessageToRow();
+ return new PubsubReadSchemaTransform(configuration, toRowTransform);
+ }
+
+ /** Implementation of the {@link TypedSchemaTransformProvider} identifier
method. */
+ @Override
+ public String identifier() {
+ return String.format("%s:read", API);
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
inputCollectionNames method. Since
+ * no input is expected, this returns an empty list.
+ */
+ @Override
+ public List<String> inputCollectionNames() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * Implementation of the {@link TypedSchemaTransformProvider}
outputCollectionNames method. Since
+ * a single output is expected, this returns a list with a single name.
+ */
+ @Override
+ public List<String> outputCollectionNames() {
+ return Collections.singletonList(OUTPUT_TAG);
+ }
+
+ /**
+ * An implementation of {@link SchemaTransform} for Pub/Sub reads configured
using {@link
+ * PubsubSchemaTransformReadConfiguration}.
+ */
+ static class PubsubReadSchemaTransform
+ extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements
SchemaTransform {
+
+ private final PubsubSchemaTransformReadConfiguration configuration;
+ private final PubsubMessageToRow pubsubMessageToRow;
+
+ private PubsubClient.PubsubClientFactory clientFactory;
+
+ private Clock clock;
+
+ private PubsubReadSchemaTransform(
+ PubsubSchemaTransformReadConfiguration configuration,
+ PubsubMessageToRow pubsubMessageToRow) {
+ this.configuration = configuration;
+ this.pubsubMessageToRow = pubsubMessageToRow;
+ }
+
+ /**
+ * Sets the {@link PubsubClient.PubsubClientFactory}.
+ *
+ * <p>Used for testing.
+ */
+ void setClientFactory(PubsubClient.PubsubClientFactory value) {
+ this.clientFactory = value;
+ }
+
+ /**
+ * Sets the {@link Clock}.
+ *
+ * <p>Used for testing.
+ */
+ void setClock(Clock clock) {
+ this.clock = clock;
+ }
+
+ /** Implements {@link SchemaTransform} buildTransform method. */
+ @Override
+ public PTransform<PCollectionRowTuple, PCollectionRowTuple>
buildTransform() {
+ return this;
+ }
+
+ /** Validates the {@link PubsubSchemaTransformReadConfiguration}. */
+ @Override
+ public void validate(@Nullable PipelineOptions options) {
+ if (configuration.getSubscription() == null && configuration.getTopic()
== null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s needs to set either the topic or the subscription",
+ PubsubSchemaTransformReadConfiguration.class));
+ }
+
+ if (configuration.getSubscription() != null && configuration.getTopic()
!= null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s should not set both the topic or the subscription",
+ PubsubSchemaTransformReadConfiguration.class));
+ }
+
+ try {
+ PayloadSerializers.getSerializer(
+ configuration.getFormat(), configuration.getDataSchema(), new
HashMap<>());
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Invalid %s, no serializer provider exists for format `%s`",
+ PubsubSchemaTransformReadConfiguration.class,
configuration.getFormat()));
+ }
+ }
+
+ /** Reads from Pub/Sub according to {@link
PubsubSchemaTransformReadConfiguration}. */
+ @Override
+ public PCollectionRowTuple expand(PCollectionRowTuple input) {
+ if (!input.getAll().isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s %s input is expected to be empty",
+ input.getClass().getSimpleName(), getClass().getSimpleName()));
+ }
+
+ PCollectionTuple rowsWithDlq =
+ input
+ .getPipeline()
+ .apply("ReadFromPubsub", buildPubsubRead())
+ .apply("PubsubMessageToRow", pubsubMessageToRow);
+
+ writeToDeadLetterQueue(rowsWithDlq);
+
+ return PCollectionRowTuple.of(OUTPUT_TAG, rowsWithDlq.get(MAIN_TAG));
+ }
+
+ private void writeToDeadLetterQueue(PCollectionTuple rowsWithDlq) {
+ PubsubIO.Write<PubsubMessage> deadLetterQueue =
buildDeadLetterQueueWrite();
+ if (deadLetterQueue == null) {
+ return;
+ }
+ rowsWithDlq.get(DLQ_TAG).apply("WriteToDeadLetterQueue",
deadLetterQueue);
+ }
+
+ /**
+ * Builds {@link PubsubIO.Write} dead letter queue from {@link
+ * PubsubSchemaTransformReadConfiguration}.
+ */
+ PubsubIO.Write<PubsubMessage> buildDeadLetterQueueWrite() {
+ if (configuration.getDeadLetterQueue() == null) {
+ return null;
+ }
+
+ PubsubIO.Write<PubsubMessage> writeDlq =
+ PubsubIO.writeMessages().to(configuration.getDeadLetterQueue());
+
+ if (configuration.getTimestampAttribute() != null) {
+ writeDlq =
writeDlq.withTimestampAttribute(configuration.getTimestampAttribute());
+ }
+
+ return writeDlq;
+ }
+
+ /** Builds {@link PubsubIO.Read} from a {@link
PubsubSchemaTransformReadConfiguration}. */
+ PubsubIO.Read<PubsubMessage> buildPubsubRead() {
+ PubsubIO.Read<PubsubMessage> read =
PubsubIO.readMessagesWithAttributes();
+
+ if (configuration.getSubscription() != null) {
+ read = read.fromSubscription(configuration.getSubscription());
+ }
+
+ if (configuration.getTopic() != null) {
+ read = read.fromTopic(configuration.getTopic());
+ }
+
+ if (configuration.getTimestampAttribute() != null) {
+ read =
read.withTimestampAttribute(configuration.getTimestampAttribute());
+ }
+
+ if (configuration.getIdAttribute() != null) {
+ read = read.withIdAttribute(configuration.getIdAttribute());
+ }
+
+ if (clientFactory != null) {
+ read = read.withClientFactory(clientFactory);
+ }
+
+ if (clock != null) {
+ read = read.withClock(clock);
+ }
+
+ return read;
+ }
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProviderTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProviderTest.java
new file mode 100644
index 00000000000..5a34b538b0a
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProviderTest.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+import com.google.api.client.util.Clock;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.payloads.AvroPayloadSerializerProvider;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link PubsubSchemaTransformReadProvider}. */
+@RunWith(JUnit4.class)
+public class PubsubSchemaTransformReadProviderTest {
+
+ private static final Schema SCHEMA =
+ Schema.of(
+ Schema.Field.of("name", Schema.FieldType.STRING),
+ Schema.Field.of("number", Schema.FieldType.INT64));
+
+ private static final String SUBSCRIPTION =
"projects/project/subscriptions/subscription";
+ private static final String TOPIC = "projects/project/topics/topic";
+
+ private static final List<TestCase> cases =
+ Arrays.asList(
+ testCase(
+ "no configured topic or subscription",
+
PubsubSchemaTransformReadConfiguration.builder().setDataSchema(SCHEMA).build())
+ .expectInvalidConfiguration(),
+ testCase(
+ "both topic and subscription configured",
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setSubscription(SUBSCRIPTION)
+ .setSubscription(TOPIC)
+ .setDataSchema(SCHEMA)
+ .build())
+ .expectInvalidConfiguration(),
+ testCase(
+ "invalid format configured",
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setSubscription(SUBSCRIPTION)
+ .setDataSchema(SCHEMA)
+ .setFormat("invalidformat")
+ .build())
+ .expectInvalidConfiguration(),
+ testCase(
+ "configuration with subscription",
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setSubscription(SUBSCRIPTION)
+ .setDataSchema(SCHEMA)
+ .build())
+
.withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION)),
+ testCase(
+ "configuration with topic",
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setTopic(TOPIC)
+ .setDataSchema(SCHEMA)
+ .build())
+
.withExpectedPubsubRead(PubsubIO.readMessages().fromTopic(TOPIC)),
+ testCase(
+ "configuration with subscription, timestamp and id
attributes",
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setSubscription(SUBSCRIPTION)
+ .setTimestampAttribute("timestampAttribute")
+ .setIdAttribute("idAttribute")
+ .setDataSchema(SCHEMA)
+ .build())
+ .withExpectedPubsubRead(
+ PubsubIO.readMessages()
+ .fromSubscription(SUBSCRIPTION)
+ .withTimestampAttribute("timestampAttribute")
+ .withIdAttribute("idAttribute")),
+ testCase(
+ "configuration with subscription and dead letter queue",
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setSubscription(SUBSCRIPTION)
+ .setDataSchema(SCHEMA)
+ .setDeadLetterQueue(TOPIC)
+ .build())
+
.withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION))
+ .withExpectedDeadLetterQueue(PubsubIO.writeMessages().to(TOPIC)),
+ testCase(
+ "configuration with subscription, timestamp attribute, and
dead letter queue",
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setSubscription(SUBSCRIPTION)
+ .setTimestampAttribute("timestampAttribute")
+ .setDataSchema(SCHEMA)
+ .setDeadLetterQueue(TOPIC)
+ .build())
+ .withExpectedPubsubRead(
+ PubsubIO.readMessages()
+ .fromSubscription(SUBSCRIPTION)
+ .withTimestampAttribute("timestampAttribute"))
+ .withExpectedDeadLetterQueue(
+
PubsubIO.writeMessages().to(TOPIC).withTimestampAttribute("timestampAttribute")));
+
+ private static final AutoValueSchema AUTO_VALUE_SCHEMA = new
AutoValueSchema();
+ private static final TypeDescriptor<PubsubSchemaTransformReadConfiguration>
TYPE_DESCRIPTOR =
+ TypeDescriptor.of(PubsubSchemaTransformReadConfiguration.class);
+ private static final
SerializableFunction<PubsubSchemaTransformReadConfiguration, Row>
+ ROW_SERIALIZABLE_FUNCTION =
AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+ private static final List<Row> ROWS =
+ Arrays.asList(
+ Row.withSchema(SCHEMA).withFieldValue("name",
"a").withFieldValue("number", 100L).build(),
+ Row.withSchema(SCHEMA).withFieldValue("name",
"b").withFieldValue("number", 200L).build(),
+ Row.withSchema(SCHEMA)
+ .withFieldValue("name", "c")
+ .withFieldValue("number", 300L)
+ .build());
+
+ private static final Clock CLOCK = (Clock & Serializable) () ->
1656788475425L;
+
+ private static final AvroPayloadSerializerProvider
AVRO_PAYLOAD_SERIALIZER_PROVIDER =
+ new AvroPayloadSerializerProvider();
+ private static final PayloadSerializer AVRO_PAYLOAD_SERIALIZER =
+ AVRO_PAYLOAD_SERIALIZER_PROVIDER.getSerializer(SCHEMA, new HashMap<>());
+
+ @Rule public TestPipeline p =
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+ @Test
+ public void testBuildDeadLetterQueueWrite() {
+ for (TestCase testCase : cases) {
+ PubsubIO.Write<PubsubMessage> dlq =
+ testCase.pubsubReadSchemaTransform().buildDeadLetterQueueWrite();
+
+ if (testCase.expectedDeadLetterQueue == null) {
+ assertNull(testCase.name, dlq);
+ return;
+ }
+
+ Map<DisplayData.Identifier, DisplayData.Item> actual =
DisplayData.from(dlq).asMap();
+ Map<DisplayData.Identifier, DisplayData.Item> expected =
testCase.expectedDeadLetterQueue;
+
+ assertEquals(testCase.name, expected, actual);
+ }
+ }
+
+ @Test
+ public void testReadAvro() throws IOException {
+ PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
+ PubsubSchemaTransformReadProvider.PubsubReadSchemaTransform transform =
+ schemaTransformWithClock("avro");
+ PubsubTestClient.PubsubTestClientFactory clientFactory =
+ clientFactory(incomingAvroMessagesOf(CLOCK.currentTimeMillis()));
+ transform.setClientFactory(clientFactory);
+ PCollectionRowTuple reads = begin.apply(transform.buildTransform());
+
+
PAssert.that(reads.get(PubsubSchemaTransformReadProvider.OUTPUT_TAG)).containsInAnyOrder(ROWS);
+
+ p.run().waitUntilFinish();
+ clientFactory.close();
+ }
+
+ @Test
+ public void testReadJson() throws IOException {
+ PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
+ PubsubSchemaTransformReadProvider.PubsubReadSchemaTransform transform =
+ schemaTransformWithClock("json");
+ PubsubTestClient.PubsubTestClientFactory clientFactory =
+ clientFactory(incomingJsonMessagesOf(CLOCK.currentTimeMillis()));
+ transform.setClientFactory(clientFactory);
+ PCollectionRowTuple reads = begin.apply(transform.buildTransform());
+
+
PAssert.that(reads.get(PubsubSchemaTransformReadProvider.OUTPUT_TAG)).containsInAnyOrder(ROWS);
+
+ p.run().waitUntilFinish();
+
+ clientFactory.close();
+ }
+
+ @Test
+ public void testBuildPubSubRead() {
+ for (TestCase testCase : cases) {
+ if (testCase.invalidConfigurationExpected) {
+ continue;
+ }
+ Map<DisplayData.Identifier, DisplayData.Item> actual =
+
DisplayData.from(testCase.pubsubReadSchemaTransform().buildPubsubRead()).asMap();
+
+ Map<DisplayData.Identifier, DisplayData.Item> expected =
testCase.expectedPubsubRead;
+
+ assertEquals(testCase.name, expected, actual);
+ }
+ }
+
+ @Test
+ public void testInvalidConfiguration() {
+ for (TestCase testCase : cases) {
+ PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
+ if (testCase.invalidConfigurationExpected) {
+ assertThrows(
+ testCase.name,
+ RuntimeException.class,
+ () ->
begin.apply(testCase.pubsubReadSchemaTransform().buildTransform()));
+ }
+ }
+ }
+
+ @Test
+ public void testInvalidInput() {
+ PCollectionRowTuple begin = PCollectionRowTuple.of("BadInput",
p.apply(Create.of(ROWS)));
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ begin.apply(
+ new PubsubSchemaTransformReadProvider()
+ .from(
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setDataSchema(SCHEMA)
+ .build())
+ .buildTransform()));
+ }
+
+ private PubsubSchemaTransformReadProvider.PubsubReadSchemaTransform
schemaTransformWithClock(
+ String format) {
+ PubsubSchemaTransformReadProvider.PubsubReadSchemaTransform transform =
+ (PubsubSchemaTransformReadProvider.PubsubReadSchemaTransform)
+ new PubsubSchemaTransformReadProvider()
+ .from(
+ PubsubSchemaTransformReadConfiguration.builder()
+ .setDataSchema(SCHEMA)
+ .setSubscription(SUBSCRIPTION)
+ .setFormat(format)
+ .build())
+ .buildTransform();
+
+ transform.setClock(CLOCK);
+
+ return transform;
+ }
+
+ private static PubsubTestClient.PubsubTestClientFactory clientFactory(
+ List<PubsubClient.IncomingMessage> messages) {
+ return PubsubTestClient.createFactoryForPull(
+ CLOCK, PubsubClient.subscriptionPathFromPath(SUBSCRIPTION), 60,
messages);
+ }
+
+ private static List<PubsubClient.IncomingMessage>
incomingAvroMessagesOf(long millisSinceEpoch) {
+ return ROWS.stream()
+ .map(row -> incomingAvroMessageOf(row, millisSinceEpoch))
+ .collect(Collectors.toList());
+ }
+
+ private static PubsubClient.IncomingMessage incomingAvroMessageOf(
+ Row row, long millisSinceEpoch) {
+ byte[] bytes = AVRO_PAYLOAD_SERIALIZER.serialize(row);
+ return incomingMessageOf(bytes, millisSinceEpoch);
+ }
+
+ private static List<PubsubClient.IncomingMessage>
incomingJsonMessagesOf(long millisSinceEpoch) {
+ return PubsubSchemaTransformReadProviderTest.ROWS.stream()
+ .map(row -> incomingJsonMessageOf(row, millisSinceEpoch))
+ .collect(Collectors.toList());
+ }
+
+ private static PubsubClient.IncomingMessage incomingJsonMessageOf(
+ Row row, long millisSinceEpoch) {
+ String name = Objects.requireNonNull(row.getString("name"));
+ long number = Objects.requireNonNull(row.getInt64("number"));
+ return incomingJsonMessageOf(name, number, millisSinceEpoch);
+ }
+
+ private static PubsubClient.IncomingMessage incomingJsonMessageOf(
+ String name, long number, long millisSinceEpoch) {
+ Gson gson = new Gson();
+ JsonObject obj = new JsonObject();
+ obj.add("name", new JsonPrimitive(name));
+ obj.add("number", new JsonPrimitive(number));
+ byte[] bytes = gson.toJson(obj).getBytes(StandardCharsets.UTF_8);
+ return incomingMessageOf(bytes, millisSinceEpoch);
+ }
+
+ private static PubsubClient.IncomingMessage incomingMessageOf(
+ byte[] bytes, long millisSinceEpoch) {
+ int nanos = Long.valueOf(millisSinceEpoch).intValue() * 1000;
+ Timestamp timestamp = Timestamp.newBuilder().setNanos(nanos).build();
+ return PubsubClient.IncomingMessage.of(
+ com.google.pubsub.v1.PubsubMessage.newBuilder()
+ .setData(ByteString.copyFrom(bytes))
+ .setPublishTime(timestamp)
+ .build(),
+ millisSinceEpoch,
+ 0,
+ UUID.randomUUID().toString(),
+ UUID.randomUUID().toString());
+ }
+
+ static TestCase testCase(String name, PubsubSchemaTransformReadConfiguration
configuration) {
+ return new TestCase(name, configuration);
+ }
+
+ private static class TestCase {
+
+ private final String name;
+ private final PubsubSchemaTransformReadConfiguration configuration;
+
+ private Map<DisplayData.Identifier, DisplayData.Item>
expectedDeadLetterQueue;
+
+ private Map<DisplayData.Identifier, DisplayData.Item> expectedPubsubRead =
+ DisplayData.from(PubsubIO.readMessages()).asMap();
+
+ private boolean invalidConfigurationExpected = false;
+
+ TestCase(String name, PubsubSchemaTransformReadConfiguration
configuration) {
+ this.name = name;
+ this.configuration = configuration;
+ }
+
+ SchemaTransform schemaTransform() {
+ PubsubSchemaTransformReadProvider provider = new
PubsubSchemaTransformReadProvider();
+ Row configurationRow = toBeamRow();
+ return provider.from(configurationRow);
+ }
+
+ PubsubSchemaTransformReadProvider.PubsubReadSchemaTransform
pubsubReadSchemaTransform() {
+ return (PubsubSchemaTransformReadProvider.PubsubReadSchemaTransform)
+ schemaTransform().buildTransform();
+ }
+
+ private Row toBeamRow() {
+ return ROW_SERIALIZABLE_FUNCTION.apply(configuration);
+ }
+
+ TestCase withExpectedDeadLetterQueue(PubsubIO.Write<PubsubMessage> value) {
+ this.expectedDeadLetterQueue = DisplayData.from(value).asMap();
+ return this;
+ }
+
+ TestCase withExpectedPubsubRead(PubsubIO.Read<PubsubMessage> value) {
+ this.expectedPubsubRead = DisplayData.from(value).asMap();
+ return this;
+ }
+
+ TestCase expectInvalidConfiguration() {
+ this.invalidConfigurationExpected = true;
+ return this;
+ }
+ }
+}