This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e83ef734219 Pub/Sub Schema Transform Read Provider (#22145)
e83ef734219 is described below

commit e83ef734219590ce8193e457b042e66f88c5f758
Author: Damon <[email protected]>
AuthorDate: Mon Aug 8 21:42:17 2022 +0000

    Pub/Sub Schema Transform Read Provider (#22145)
    
    * WIP Implement PubsubSchemaTransformReadProvider
    
    * Complete PubsubSchemaTransformReadProvider test coverage
    
    * Override validate
    
    * Patch unused code
    
    * Reorganize code for readability
    
    * Add suppress nullness warning
---
 .../pubsub/PubsubSchemaTransformReadProvider.java  | 247 +++++++++++++
 .../PubsubSchemaTransformReadProviderTest.java     | 386 +++++++++++++++++++++
 2 files changed, 633 insertions(+)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProvider.java
new file mode 100644
index 00000000000..c4399f1c21d
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProvider.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.DLQ_TAG;
+import static org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageToRow.MAIN_TAG;
+
+import com.google.api.client.util.Clock;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * An implementation of {@link TypedSchemaTransformProvider} for Pub/Sub reads 
configured using
+ * {@link PubsubSchemaTransformReadConfiguration}.
+ *
+ * <p><b>Internal only:</b> This class is actively being worked on, and it 
will likely change. We
+ * provide no backwards compatibility guarantees, and it should not be 
implemented outside the Beam
+ * repository.
+ */
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class PubsubSchemaTransformReadProvider
+    extends 
TypedSchemaTransformProvider<PubsubSchemaTransformReadConfiguration> {
+  private static final String API = "pubsub";
+  static final String OUTPUT_TAG = "OUTPUT";
+
+  /** Returns the expected class of the configuration. */
+  @Override
+  protected Class<PubsubSchemaTransformReadConfiguration> configurationClass() 
{
+    return PubsubSchemaTransformReadConfiguration.class;
+  }
+
+  /** Returns the expected {@link SchemaTransform} of the configuration. */
+  @Override
+  protected SchemaTransform from(PubsubSchemaTransformReadConfiguration 
configuration) {
+    PubsubMessageToRow toRowTransform =
+        
PubsubSchemaTransformMessageToRowFactory.from(configuration).buildMessageToRow();
+    return new PubsubReadSchemaTransform(configuration, toRowTransform);
+  }
+
+  /** Implementation of the {@link TypedSchemaTransformProvider} identifier 
method. */
+  @Override
+  public String identifier() {
+    return String.format("%s:read", API);
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
inputCollectionNames method. Since
+   * no input is expected, this returns an empty list.
+   */
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  /**
+   * Implementation of the {@link TypedSchemaTransformProvider} 
outputCollectionNames method. Since
+   * a single output is expected, this returns a list with a single name.
+   */
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG);
+  }
+
+  /**
+   * An implementation of {@link SchemaTransform} for Pub/Sub reads configured 
using {@link
+   * PubsubSchemaTransformReadConfiguration}.
+   */
+  static class PubsubReadSchemaTransform
+      extends PTransform<PCollectionRowTuple, PCollectionRowTuple> implements 
SchemaTransform {
+
+    private final PubsubSchemaTransformReadConfiguration configuration;
+    private final PubsubMessageToRow pubsubMessageToRow;
+
+    private PubsubClient.PubsubClientFactory clientFactory;
+
+    private Clock clock;
+
+    private PubsubReadSchemaTransform(
+        PubsubSchemaTransformReadConfiguration configuration,
+        PubsubMessageToRow pubsubMessageToRow) {
+      this.configuration = configuration;
+      this.pubsubMessageToRow = pubsubMessageToRow;
+    }
+
+    /**
+     * Sets the {@link PubsubClient.PubsubClientFactory}.
+     *
+     * <p>Used for testing.
+     */
+    void setClientFactory(PubsubClient.PubsubClientFactory value) {
+      this.clientFactory = value;
+    }
+
+    /**
+     * Sets the {@link Clock}.
+     *
+     * <p>Used for testing.
+     */
+    void setClock(Clock clock) {
+      this.clock = clock;
+    }
+
+    /** Implements {@link SchemaTransform} buildTransform method. */
+    @Override
+    public PTransform<PCollectionRowTuple, PCollectionRowTuple> 
buildTransform() {
+      return this;
+    }
+
+    /** Validates the {@link PubsubSchemaTransformReadConfiguration}. */
+    @Override
+    public void validate(@Nullable PipelineOptions options) {
+      if (configuration.getSubscription() == null && configuration.getTopic() 
== null) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s needs to set either the topic or the subscription",
+                PubsubSchemaTransformReadConfiguration.class));
+      }
+
+      if (configuration.getSubscription() != null && configuration.getTopic() 
!= null) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s should not set both the topic or the subscription",
+                PubsubSchemaTransformReadConfiguration.class));
+      }
+
+      try {
+        PayloadSerializers.getSerializer(
+            configuration.getFormat(), configuration.getDataSchema(), new 
HashMap<>());
+      } catch (IllegalArgumentException e) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Invalid %s, no serializer provider exists for format `%s`",
+                PubsubSchemaTransformReadConfiguration.class, 
configuration.getFormat()));
+      }
+    }
+
+    /** Reads from Pub/Sub according to {@link 
PubsubSchemaTransformReadConfiguration}. */
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      if (!input.getAll().isEmpty()) {
+        throw new IllegalArgumentException(
+            String.format(
+                "%s %s input is expected to be empty",
+                input.getClass().getSimpleName(), getClass().getSimpleName()));
+      }
+
+      PCollectionTuple rowsWithDlq =
+          input
+              .getPipeline()
+              .apply("ReadFromPubsub", buildPubsubRead())
+              .apply("PubsubMessageToRow", pubsubMessageToRow);
+
+      writeToDeadLetterQueue(rowsWithDlq);
+
+      return PCollectionRowTuple.of(OUTPUT_TAG, rowsWithDlq.get(MAIN_TAG));
+    }
+
+    private void writeToDeadLetterQueue(PCollectionTuple rowsWithDlq) {
+      PubsubIO.Write<PubsubMessage> deadLetterQueue = 
buildDeadLetterQueueWrite();
+      if (deadLetterQueue == null) {
+        return;
+      }
+      rowsWithDlq.get(DLQ_TAG).apply("WriteToDeadLetterQueue", 
deadLetterQueue);
+    }
+
+    /**
+     * Builds {@link PubsubIO.Write} dead letter queue from {@link
+     * PubsubSchemaTransformReadConfiguration}.
+     */
+    PubsubIO.Write<PubsubMessage> buildDeadLetterQueueWrite() {
+      if (configuration.getDeadLetterQueue() == null) {
+        return null;
+      }
+
+      PubsubIO.Write<PubsubMessage> writeDlq =
+          PubsubIO.writeMessages().to(configuration.getDeadLetterQueue());
+
+      if (configuration.getTimestampAttribute() != null) {
+        writeDlq = 
writeDlq.withTimestampAttribute(configuration.getTimestampAttribute());
+      }
+
+      return writeDlq;
+    }
+
+    /** Builds {@link PubsubIO.Read} from a {@link 
PubsubSchemaTransformReadConfiguration}. */
+    PubsubIO.Read<PubsubMessage> buildPubsubRead() {
+      PubsubIO.Read<PubsubMessage> read = 
PubsubIO.readMessagesWithAttributes();
+
+      if (configuration.getSubscription() != null) {
+        read = read.fromSubscription(configuration.getSubscription());
+      }
+
+      if (configuration.getTopic() != null) {
+        read = read.fromTopic(configuration.getTopic());
+      }
+
+      if (configuration.getTimestampAttribute() != null) {
+        read = 
read.withTimestampAttribute(configuration.getTimestampAttribute());
+      }
+
+      if (configuration.getIdAttribute() != null) {
+        read = read.withIdAttribute(configuration.getIdAttribute());
+      }
+
+      if (clientFactory != null) {
+        read = read.withClientFactory(clientFactory);
+      }
+
+      if (clock != null) {
+        read = read.withClock(clock);
+      }
+
+      return read;
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProviderTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProviderTest.java
new file mode 100644
index 00000000000..5a34b538b0a
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubSchemaTransformReadProviderTest.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+
+import com.google.api.client.util.Clock;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.payloads.AvroPayloadSerializerProvider;
+import org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link PubsubSchemaTransformReadProvider}. */
+@RunWith(JUnit4.class)
+public class PubsubSchemaTransformReadProviderTest {
+
+  private static final Schema SCHEMA =
+      Schema.of(
+          Schema.Field.of("name", Schema.FieldType.STRING),
+          Schema.Field.of("number", Schema.FieldType.INT64));
+
+  private static final String SUBSCRIPTION = 
"projects/project/subscriptions/subscription";
+  private static final String TOPIC = "projects/project/topics/topic";
+
+  private static final List<TestCase> cases =
+      Arrays.asList(
+          testCase(
+                  "no configured topic or subscription",
+                  
PubsubSchemaTransformReadConfiguration.builder().setDataSchema(SCHEMA).build())
+              .expectInvalidConfiguration(),
+          testCase(
+                  "both topic and subscription configured",
+                  PubsubSchemaTransformReadConfiguration.builder()
+                      .setSubscription(SUBSCRIPTION)
+                      .setSubscription(TOPIC)
+                      .setDataSchema(SCHEMA)
+                      .build())
+              .expectInvalidConfiguration(),
+          testCase(
+                  "invalid format configured",
+                  PubsubSchemaTransformReadConfiguration.builder()
+                      .setSubscription(SUBSCRIPTION)
+                      .setDataSchema(SCHEMA)
+                      .setFormat("invalidformat")
+                      .build())
+              .expectInvalidConfiguration(),
+          testCase(
+                  "configuration with subscription",
+                  PubsubSchemaTransformReadConfiguration.builder()
+                      .setSubscription(SUBSCRIPTION)
+                      .setDataSchema(SCHEMA)
+                      .build())
+              
.withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION)),
+          testCase(
+                  "configuration with topic",
+                  PubsubSchemaTransformReadConfiguration.builder()
+                      .setTopic(TOPIC)
+                      .setDataSchema(SCHEMA)
+                      .build())
+              
.withExpectedPubsubRead(PubsubIO.readMessages().fromTopic(TOPIC)),
+          testCase(
+                  "configuration with subscription, timestamp and id 
attributes",
+                  PubsubSchemaTransformReadConfiguration.builder()
+                      .setSubscription(SUBSCRIPTION)
+                      .setTimestampAttribute("timestampAttribute")
+                      .setIdAttribute("idAttribute")
+                      .setDataSchema(SCHEMA)
+                      .build())
+              .withExpectedPubsubRead(
+                  PubsubIO.readMessages()
+                      .fromSubscription(SUBSCRIPTION)
+                      .withTimestampAttribute("timestampAttribute")
+                      .withIdAttribute("idAttribute")),
+          testCase(
+                  "configuration with subscription and dead letter queue",
+                  PubsubSchemaTransformReadConfiguration.builder()
+                      .setSubscription(SUBSCRIPTION)
+                      .setDataSchema(SCHEMA)
+                      .setDeadLetterQueue(TOPIC)
+                      .build())
+              
.withExpectedPubsubRead(PubsubIO.readMessages().fromSubscription(SUBSCRIPTION))
+              .withExpectedDeadLetterQueue(PubsubIO.writeMessages().to(TOPIC)),
+          testCase(
+                  "configuration with subscription, timestamp attribute, and 
dead letter queue",
+                  PubsubSchemaTransformReadConfiguration.builder()
+                      .setSubscription(SUBSCRIPTION)
+                      .setTimestampAttribute("timestampAttribute")
+                      .setDataSchema(SCHEMA)
+                      .setDeadLetterQueue(TOPIC)
+                      .build())
+              .withExpectedPubsubRead(
+                  PubsubIO.readMessages()
+                      .fromSubscription(SUBSCRIPTION)
+                      .withTimestampAttribute("timestampAttribute"))
+              .withExpectedDeadLetterQueue(
+                  
PubsubIO.writeMessages().to(TOPIC).withTimestampAttribute("timestampAttribute")));
+
+  private static final AutoValueSchema AUTO_VALUE_SCHEMA = new 
AutoValueSchema();
+  private static final TypeDescriptor<PubsubSchemaTransformReadConfiguration> 
TYPE_DESCRIPTOR =
+      TypeDescriptor.of(PubsubSchemaTransformReadConfiguration.class);
+  private static final 
SerializableFunction<PubsubSchemaTransformReadConfiguration, Row>
+      ROW_SERIALIZABLE_FUNCTION = 
AUTO_VALUE_SCHEMA.toRowFunction(TYPE_DESCRIPTOR);
+
+  private static final List<Row> ROWS =
+      Arrays.asList(
+          Row.withSchema(SCHEMA).withFieldValue("name", 
"a").withFieldValue("number", 100L).build(),
+          Row.withSchema(SCHEMA).withFieldValue("name", 
"b").withFieldValue("number", 200L).build(),
+          Row.withSchema(SCHEMA)
+              .withFieldValue("name", "c")
+              .withFieldValue("number", 300L)
+              .build());
+
+  private static final Clock CLOCK = (Clock & Serializable) () -> 
1656788475425L;
+
+  private static final AvroPayloadSerializerProvider 
AVRO_PAYLOAD_SERIALIZER_PROVIDER =
+      new AvroPayloadSerializerProvider();
+  private static final PayloadSerializer AVRO_PAYLOAD_SERIALIZER =
+      AVRO_PAYLOAD_SERIALIZER_PROVIDER.getSerializer(SCHEMA, new HashMap<>());
+
+  @Rule public TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  @Test
+  public void testBuildDeadLetterQueueWrite() {
+    for (TestCase testCase : cases) {
+      PubsubIO.Write<PubsubMessage> dlq =
+          testCase.pubsubReadSchemaTransform().buildDeadLetterQueueWrite();
+
+      if (testCase.expectedDeadLetterQueue == null) {
+        assertNull(testCase.name, dlq);
+        return;
+      }
+
+      Map<DisplayData.Identifier, DisplayData.Item> actual = 
DisplayData.from(dlq).asMap();
+      Map<DisplayData.Identifier, DisplayData.Item> expected = 
testCase.expectedDeadLetterQueue;
+
+      assertEquals(testCase.name, expected, actual);
+    }
+  }
+
+  @Test
+  public void testReadAvro() throws IOException {
+    PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
+    PubsubSchemaTransformReadProvider.PubsubReadSchemaTransform transform =
+        schemaTransformWithClock("avro");
+    PubsubTestClient.PubsubTestClientFactory clientFactory =
+        clientFactory(incomingAvroMessagesOf(CLOCK.currentTimeMillis()));
+    transform.setClientFactory(clientFactory);
+    PCollectionRowTuple reads = begin.apply(transform.buildTransform());
+
+    
PAssert.that(reads.get(PubsubSchemaTransformReadProvider.OUTPUT_TAG)).containsInAnyOrder(ROWS);
+
+    p.run().waitUntilFinish();
+    clientFactory.close();
+  }
+
+  @Test
+  public void testReadJson() throws IOException {
+    PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
+    PubsubSchemaTransformReadProvider.PubsubReadSchemaTransform transform =
+        schemaTransformWithClock("json");
+    PubsubTestClient.PubsubTestClientFactory clientFactory =
+        clientFactory(incomingJsonMessagesOf(CLOCK.currentTimeMillis()));
+    transform.setClientFactory(clientFactory);
+    PCollectionRowTuple reads = begin.apply(transform.buildTransform());
+
+    
PAssert.that(reads.get(PubsubSchemaTransformReadProvider.OUTPUT_TAG)).containsInAnyOrder(ROWS);
+
+    p.run().waitUntilFinish();
+
+    clientFactory.close();
+  }
+
+  @Test
+  public void testBuildPubSubRead() {
+    for (TestCase testCase : cases) {
+      if (testCase.invalidConfigurationExpected) {
+        continue;
+      }
+      Map<DisplayData.Identifier, DisplayData.Item> actual =
+          
DisplayData.from(testCase.pubsubReadSchemaTransform().buildPubsubRead()).asMap();
+
+      Map<DisplayData.Identifier, DisplayData.Item> expected = 
testCase.expectedPubsubRead;
+
+      assertEquals(testCase.name, expected, actual);
+    }
+  }
+
+  @Test
+  public void testInvalidConfiguration() {
+    for (TestCase testCase : cases) {
+      PCollectionRowTuple begin = PCollectionRowTuple.empty(p);
+      if (testCase.invalidConfigurationExpected) {
+        assertThrows(
+            testCase.name,
+            RuntimeException.class,
+            () -> 
begin.apply(testCase.pubsubReadSchemaTransform().buildTransform()));
+      }
+    }
+  }
+
+  @Test
+  public void testInvalidInput() {
+    PCollectionRowTuple begin = PCollectionRowTuple.of("BadInput", 
p.apply(Create.of(ROWS)));
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            begin.apply(
+                new PubsubSchemaTransformReadProvider()
+                    .from(
+                        PubsubSchemaTransformReadConfiguration.builder()
+                            .setDataSchema(SCHEMA)
+                            .build())
+                    .buildTransform()));
+  }
+
+  private PubsubSchemaTransformReadProvider.PubsubReadSchemaTransform 
schemaTransformWithClock(
+      String format) {
+    PubsubSchemaTransformReadProvider.PubsubReadSchemaTransform transform =
+        (PubsubSchemaTransformReadProvider.PubsubReadSchemaTransform)
+            new PubsubSchemaTransformReadProvider()
+                .from(
+                    PubsubSchemaTransformReadConfiguration.builder()
+                        .setDataSchema(SCHEMA)
+                        .setSubscription(SUBSCRIPTION)
+                        .setFormat(format)
+                        .build())
+                .buildTransform();
+
+    transform.setClock(CLOCK);
+
+    return transform;
+  }
+
+  private static PubsubTestClient.PubsubTestClientFactory clientFactory(
+      List<PubsubClient.IncomingMessage> messages) {
+    return PubsubTestClient.createFactoryForPull(
+        CLOCK, PubsubClient.subscriptionPathFromPath(SUBSCRIPTION), 60, 
messages);
+  }
+
+  private static List<PubsubClient.IncomingMessage> 
incomingAvroMessagesOf(long millisSinceEpoch) {
+    return ROWS.stream()
+        .map(row -> incomingAvroMessageOf(row, millisSinceEpoch))
+        .collect(Collectors.toList());
+  }
+
+  private static PubsubClient.IncomingMessage incomingAvroMessageOf(
+      Row row, long millisSinceEpoch) {
+    byte[] bytes = AVRO_PAYLOAD_SERIALIZER.serialize(row);
+    return incomingMessageOf(bytes, millisSinceEpoch);
+  }
+
+  private static List<PubsubClient.IncomingMessage> 
incomingJsonMessagesOf(long millisSinceEpoch) {
+    return PubsubSchemaTransformReadProviderTest.ROWS.stream()
+        .map(row -> incomingJsonMessageOf(row, millisSinceEpoch))
+        .collect(Collectors.toList());
+  }
+
+  private static PubsubClient.IncomingMessage incomingJsonMessageOf(
+      Row row, long millisSinceEpoch) {
+    String name = Objects.requireNonNull(row.getString("name"));
+    long number = Objects.requireNonNull(row.getInt64("number"));
+    return incomingJsonMessageOf(name, number, millisSinceEpoch);
+  }
+
+  private static PubsubClient.IncomingMessage incomingJsonMessageOf(
+      String name, long number, long millisSinceEpoch) {
+    Gson gson = new Gson();
+    JsonObject obj = new JsonObject();
+    obj.add("name", new JsonPrimitive(name));
+    obj.add("number", new JsonPrimitive(number));
+    byte[] bytes = gson.toJson(obj).getBytes(StandardCharsets.UTF_8);
+    return incomingMessageOf(bytes, millisSinceEpoch);
+  }
+
+  private static PubsubClient.IncomingMessage incomingMessageOf(
+      byte[] bytes, long millisSinceEpoch) {
+    int nanos = Long.valueOf(millisSinceEpoch).intValue() * 1000;
+    Timestamp timestamp = Timestamp.newBuilder().setNanos(nanos).build();
+    return PubsubClient.IncomingMessage.of(
+        com.google.pubsub.v1.PubsubMessage.newBuilder()
+            .setData(ByteString.copyFrom(bytes))
+            .setPublishTime(timestamp)
+            .build(),
+        millisSinceEpoch,
+        0,
+        UUID.randomUUID().toString(),
+        UUID.randomUUID().toString());
+  }
+
+  static TestCase testCase(String name, PubsubSchemaTransformReadConfiguration 
configuration) {
+    return new TestCase(name, configuration);
+  }
+
+  private static class TestCase {
+
+    private final String name;
+    private final PubsubSchemaTransformReadConfiguration configuration;
+
+    private Map<DisplayData.Identifier, DisplayData.Item> 
expectedDeadLetterQueue;
+
+    private Map<DisplayData.Identifier, DisplayData.Item> expectedPubsubRead =
+        DisplayData.from(PubsubIO.readMessages()).asMap();
+
+    private boolean invalidConfigurationExpected = false;
+
+    TestCase(String name, PubsubSchemaTransformReadConfiguration 
configuration) {
+      this.name = name;
+      this.configuration = configuration;
+    }
+
+    SchemaTransform schemaTransform() {
+      PubsubSchemaTransformReadProvider provider = new 
PubsubSchemaTransformReadProvider();
+      Row configurationRow = toBeamRow();
+      return provider.from(configurationRow);
+    }
+
+    PubsubSchemaTransformReadProvider.PubsubReadSchemaTransform 
pubsubReadSchemaTransform() {
+      return (PubsubSchemaTransformReadProvider.PubsubReadSchemaTransform)
+          schemaTransform().buildTransform();
+    }
+
+    private Row toBeamRow() {
+      return ROW_SERIALIZABLE_FUNCTION.apply(configuration);
+    }
+
+    TestCase withExpectedDeadLetterQueue(PubsubIO.Write<PubsubMessage> value) {
+      this.expectedDeadLetterQueue = DisplayData.from(value).asMap();
+      return this;
+    }
+
+    TestCase withExpectedPubsubRead(PubsubIO.Read<PubsubMessage> value) {
+      this.expectedPubsubRead = DisplayData.from(value).asMap();
+      return this;
+    }
+
+    TestCase expectInvalidConfiguration() {
+      this.invalidConfigurationExpected = true;
+      return this;
+    }
+  }
+}

Reply via email to