[ 
https://issues.apache.org/jira/browse/BEAM-4199?focusedWorklogId=103108&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103108
 ]

ASF GitHub Bot logged work on BEAM-4199:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/May/18 20:52
            Start Date: 17/May/18 20:52
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #5380: [BEAM-4199][SQL] 
Add support for DLQ to PubsubJsonTable
URL: https://github.com/apache/beam/pull/5380
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/JsonToRowUtils.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/JsonToRowUtils.java
index 6ef666cf4cb..83f1bc3e88c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/JsonToRowUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/JsonToRowUtils.java
@@ -17,10 +17,13 @@
  */
 package org.apache.beam.sdk.util;
 
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import java.io.IOException;
 import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.sdk.util.RowJsonDeserializer.UnsupportedRowJsonException;
 import org.apache.beam.sdk.values.Row;
 
 /**
@@ -42,6 +45,8 @@ public static ObjectMapper 
newObjectMapperWith(RowJsonDeserializer deserializer)
   public static Row jsonToRow(ObjectMapper objectMapper, String jsonString) {
     try {
       return objectMapper.readValue(jsonString, Row.class);
+    } catch (JsonParseException | JsonMappingException jsonException) {
+      throw new UnsupportedRowJsonException("Unable to parse Row", 
jsonException);
     } catch (IOException e) {
       throw new IllegalArgumentException("Unable to parse json object: " + 
jsonString, e);
     }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java
index 3a9902f06d2..a05a18106cb 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubIOJsonTable.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
 
+import static 
org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubMessageToRow.DLQ_TAG;
+import static 
org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubMessageToRow.MAIN_TAG;
+
 import com.google.auto.value.AutoValue;
 import java.io.Serializable;
 import javax.annotation.Nullable;
@@ -29,10 +32,13 @@
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTagList;
 
 /**
  * <i>Experimental</i>
@@ -96,6 +102,18 @@
    */
   @Nullable abstract String getTimestampAttribute();
 
+  /**
+   * Optional topic path which will be used as a dead letter queue.
+   *
+   * <p>Messages that cannot be processed will be sent to this topic. If it is 
not specified then
+   * exception will be thrown for errors during processing causing the 
pipeline to crash.
+   */
+  @Nullable abstract String getDeadLetterQueue();
+
+  private boolean useDlq() {
+    return getDeadLetterQueue() != null;
+  }
+
   /**
    * Pubsub topic name.
    *
@@ -123,12 +141,32 @@ public BeamIOType getSourceType() {
 
   @Override
   public PCollection<Row> buildIOReader(Pipeline pipeline) {
-    return
+    PCollectionTuple rowsWithDlq =
         PBegin
             .in(pipeline)
             .apply("readFromPubsub", readMessagesWithAttributes())
-            .apply("parseMessageToRow", 
PubsubMessageToRow.forSchema(getSchema()))
-            .setCoder(getSchema().getRowCoder());
+            .apply("parseMessageToRow", createParserParDo());
+
+    if (useDlq()) {
+      rowsWithDlq.get(DLQ_TAG).apply(writeMessagesToDlq());
+    }
+
+    return rowsWithDlq.get(MAIN_TAG).setCoder(getSchema().getRowCoder());
+  }
+
+  private ParDo.MultiOutput<PubsubMessage, Row> createParserParDo() {
+    return
+        ParDo
+            .of(PubsubMessageToRow
+                    .builder()
+                    .messageSchema(getSchema())
+                    .useDlq(getDeadLetterQueue() != null)
+                    .build())
+            .withOutputTags(
+                MAIN_TAG,
+                useDlq()
+                    ? TupleTagList.of(DLQ_TAG)
+                    : TupleTagList.empty());
   }
 
   private PubsubIO.Read<PubsubMessage> readMessagesWithAttributes() {
@@ -141,6 +179,16 @@ public BeamIOType getSourceType() {
         : read.withTimestampAttribute(getTimestampAttribute());
   }
 
+  private PubsubIO.Write<PubsubMessage> writeMessagesToDlq() {
+    PubsubIO.Write<PubsubMessage> write = PubsubIO
+        .writeMessages()
+        .to(getDeadLetterQueue());
+
+    return (getTimestampAttribute() == null)
+        ? write
+        : write.withTimestampAttribute(getTimestampAttribute());
+  }
+
   @Override
   public PTransform<? super PCollection<Row>, POutput> buildIOWriter() {
     throw new UnsupportedOperationException("Writing to a Pubsub topic is not 
supported");
@@ -150,6 +198,7 @@ public BeamIOType getSourceType() {
   abstract static class Builder {
     abstract Builder setSchema(Schema schema);
     abstract Builder setTimestampAttribute(String timestampAttribute);
+    abstract Builder setDeadLetterQueue(String deadLetterQueue);
     abstract Builder setTopic(String topic);
 
     abstract PubsubIOJsonTable build();
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
index b814f9e5740..a8de35e6a0f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonTableProvider.java
@@ -54,12 +54,15 @@ public BeamSqlTable buildBeamSqlTable(Table tableDefintion) 
{
 
     JSONObject tableProperties = tableDefintion.getProperties();
     String timestampAttributeKey = 
tableProperties.getString("timestampAttributeKey");
+    String deadLetterQueue = tableProperties.getString("deadLetterQueue");
+    validateDlq(deadLetterQueue);
 
     return
         PubsubIOJsonTable
             .builder()
             .setSchema(tableDefintion.getSchema())
             .setTimestampAttribute(timestampAttributeKey)
+            .setDeadLetterQueue(deadLetterQueue)
             .setTopic(tableDefintion.getLocation())
             .build();
   }
@@ -85,4 +88,10 @@ private void validatePubsubMessageSchema(Table 
tableDefinition) {
   private boolean fieldPresent(Schema schema, String field, Schema.FieldType 
expectedType) {
     return schema.hasField(field) && 
expectedType.equals(schema.getField(field).getType());
   }
+
+  private void validateDlq(String deadLetterQueue) {
+    if (deadLetterQueue != null && deadLetterQueue.isEmpty()) {
+      throw new IllegalArgumentException("Dead letter queue topic name is not 
specified");
+    }
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java
index 760defc70e9..2215f24df0f 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRow.java
@@ -21,6 +21,7 @@
 import static org.apache.beam.sdk.util.JsonToRowUtils.newObjectMapperWith;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.auto.value.AutoValue;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import javax.annotation.Nullable;
@@ -31,10 +32,11 @@
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.JsonToRowUtils;
 import org.apache.beam.sdk.util.RowJsonDeserializer;
+import 
org.apache.beam.sdk.util.RowJsonDeserializer.UnsupportedRowJsonException;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
 
 /**
@@ -42,13 +44,15 @@
  */
 @Internal
 @Experimental
-public class PubsubMessageToRow extends DoFn<PubsubMessage, Row> {
+@AutoValue
+public abstract class PubsubMessageToRow extends DoFn<PubsubMessage, Row> {
   static final String TIMESTAMP_FIELD = "event_timestamp";
   static final String ATTRIBUTES_FIELD = "attributes";
   static final String PAYLOAD_FIELD = "payload";
+  static final TupleTag<PubsubMessage> DLQ_TAG = new TupleTag<PubsubMessage>() 
{};
+  static final TupleTag<Row> MAIN_TAG = new TupleTag<Row>() {};
 
   private transient volatile @Nullable ObjectMapper objectMapper;
-  private Schema messageSchema;
 
   /**
    * Schema of the Pubsub message.
@@ -62,34 +66,47 @@
    *
    * <p>Only UTF-8 JSON objects are supported.
    */
-  public Schema getMessageSchema() {
-    return messageSchema;
-  }
+  public abstract Schema messageSchema();
+
+  public abstract boolean useDlq();
 
-  public static ParDo.SingleOutput<PubsubMessage, Row> forSchema(Schema 
messageSchema) {
-    return ParDo.of(new PubsubMessageToRow(messageSchema));
+  private Schema payloadSchema() {
+    return messageSchema().getField(PAYLOAD_FIELD).getType().getRowSchema();
   }
 
-  private PubsubMessageToRow(Schema messageSchema) {
-    this.messageSchema = messageSchema;
+  public static Builder builder() {
+    return new AutoValue_PubsubMessageToRow.Builder();
   }
 
   @DoFn.ProcessElement
   public void processElement(ProcessContext context) {
-    // get values for fields
-    // in the same order they're specified in schema
-    List<Object> values = getMessageSchema()
-        .getFields()
-        .stream()
-        .map(field ->
-                 getValueForField(
-                     field,
-                     context.timestamp(),
-                     context.element()))
-        .collect(toList());
-
-    context.output(
-        Row.withSchema(getMessageSchema()).addValues(values).build());
+    try {
+      List<Object> values = getFieldValues(context);
+      
context.output(Row.withSchema(messageSchema()).addValues(values).build());
+    } catch (UnsupportedRowJsonException jsonException) {
+      if (useDlq()) {
+        context.output(DLQ_TAG, context.element());
+      } else {
+        throw new RuntimeException("Error parsing message", jsonException);
+      }
+    }
+  }
+
+  /**
+   * Get values for fields in the same order they're specified in schema, 
including
+   * timestamp, payload, and attributes.
+   */
+  private List<Object> getFieldValues(ProcessContext context) {
+    return
+        messageSchema()
+            .getFields()
+            .stream()
+            .map(field ->
+                     getValueForField(
+                         field,
+                         context.timestamp(),
+                         context.element()))
+            .collect(toList());
   }
 
   private Object getValueForField(
@@ -117,13 +134,17 @@ private Row parsePayloadJsonRow(PubsubMessage 
pubsubMessage) {
 
     if (objectMapper == null) {
       objectMapper =
-          
newObjectMapperWith(RowJsonDeserializer.forSchema(getPayloadSchema()));
+          newObjectMapperWith(RowJsonDeserializer.forSchema(payloadSchema()));
     }
 
     return JsonToRowUtils.jsonToRow(objectMapper, payloadJson);
   }
 
-  private Schema getPayloadSchema() {
-    return getMessageSchema().getField(PAYLOAD_FIELD).getType().getRowSchema();
+  @AutoValue.Builder
+  abstract static class Builder {
+    public abstract Builder messageSchema(Schema messageSchema);
+    public abstract Builder useDlq(boolean useDlq);
+
+    public abstract PubsubMessageToRow build();
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
index af1dfa92dce..4c864ac9244 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
@@ -23,11 +23,15 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Set;
 import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
 import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
 import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal;
 import org.apache.beam.sdk.schemas.Schema;
@@ -57,9 +61,14 @@
           .withVarcharField("name")
           .build();
 
-  @Rule public transient TestPubsub pubsub = TestPubsub.create();
-  @Rule public transient TestPubsubSignal signal = TestPubsubSignal.create();
-  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule
+  public transient TestPubsub eventsTopic = TestPubsub.create();
+  @Rule
+  public transient TestPubsub dlqTopic = TestPubsub.create();
+  @Rule
+  public transient TestPubsubSignal signal = TestPubsubSignal.create();
+  @Rule
+  public transient TestPipeline pipeline = TestPipeline.create();
 
   @Test
   public void testSelectsPayloadContent() throws Exception {
@@ -73,7 +82,7 @@ public void testSelectsPayloadContent() throws Exception {
         + "           > \n"
         + ") \n"
         + "TYPE 'pubsub' \n"
-        + "LOCATION '" + pubsub.eventsTopicPath() + "' \n"
+        + "LOCATION '" + eventsTopic.topicPath() + "' \n"
         + "TBLPROPERTIES '{ \"timestampAttributeKey\" : \"ts\" }'";
 
     String queryString = "SELECT message.payload.id, message.payload.name from 
message";
@@ -100,10 +109,72 @@ public void testSelectsPayloadContent() throws Exception {
                         row(PAYLOAD_SCHEMA, 7, "baz")))));
 
     pipeline.run();
-    pubsub.publish(messages);
+    eventsTopic.publish(messages);
     signal.waitForSuccess(Duration.standardSeconds(120));
   }
 
+  @Test
+  public void testUsesDlq() throws Exception {
+    String createTableString =
+        "CREATE TABLE message (\n"
+        + "event_timestamp TIMESTAMP, \n"
+        + "attributes MAP<VARCHAR, VARCHAR>, \n"
+        + "payload ROW< \n"
+        + "             id INTEGER, \n"
+        + "             name VARCHAR \n"
+        + "           > \n"
+        + ") \n"
+        + "TYPE 'pubsub' \n"
+        + "LOCATION '" + eventsTopic.topicPath() + "' \n"
+        + "TBLPROPERTIES "
+        + "    '{ "
+        + "       \"timestampAttributeKey\" : \"ts\", "
+        + "       \"deadLetterQueue\" : \"" + dlqTopic.topicPath() + "\""
+        + "     }'";
+
+    String queryString = "SELECT message.payload.id, message.payload.name from 
message";
+
+    List<PubsubMessage> messages = ImmutableList.of(
+        message(ts(1), 3, "foo"),
+        message(ts(2), 5, "bar"),
+        message(ts(3), 7, "baz"),
+        message(ts(4), "{ - }"),   // invalid
+        message(ts(5), "{ + }"));  // invalid
+
+    BeamSqlEnv sqlEnv = newSqlEnv();
+
+    createTable(sqlEnv, createTableString);
+    query(sqlEnv, pipeline, queryString);
+
+    PCollection<PubsubMessage> dlq =
+        
pipeline.apply(PubsubIO.readMessagesWithAttributes().fromTopic(dlqTopic.topicPath()));
+
+    dlq.apply(
+        "waitForDlq",
+        signal.signalSuccessWhen(
+            PubsubMessageWithAttributesCoder.of(),
+            dlqMessages -> containsAll(dlqMessages,
+                                       message(ts(4), "{ - }"),
+                                       message(ts(5), "{ + }"))));
+
+    pipeline.run();
+    eventsTopic.publish(messages);
+    signal.waitForSuccess(Duration.standardSeconds(120));
+  }
+
+  private static Boolean containsAll(Set<PubsubMessage> set, PubsubMessage... 
subsetCandidate) {
+    return
+        Arrays
+            .stream(subsetCandidate)
+            .allMatch(
+                candidate -> set.stream().anyMatch(element -> 
messagesEqual(element, candidate)));
+  }
+
+  private static boolean messagesEqual(PubsubMessage message1, PubsubMessage 
message2) {
+    return message1.getAttributeMap().equals(message2.getAttributeMap())
+           && Arrays.equals(message1.getPayload(), message2.getPayload());
+  }
+
   private BeamSqlEnv newSqlEnv() {
     InMemoryMetaStore metaStore = new InMemoryMetaStore();
     metaStore.registerProvider(new PubsubJsonTableProvider());
@@ -129,9 +200,13 @@ private Row row(Schema schema, Object... values) {
   }
 
   private PubsubMessage message(Instant timestamp, int id, String name) {
+    return message(timestamp, jsonString(id, name));
+  }
+
+  private PubsubMessage message(Instant timestamp, String jsonPayload) {
     return
         new PubsubMessage(
-            jsonString(id, name).getBytes(UTF_8),
+            jsonPayload.getBytes(UTF_8),
             ImmutableMap.of("ts", String.valueOf(timestamp.getMillis())));
   }
 
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRowTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRowTest.java
index 63932def64d..3bdec91d616 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRowTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubMessageToRowTest.java
@@ -17,21 +17,33 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
 
+import static com.google.common.collect.Iterables.size;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toSet;
+import static 
org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubMessageToRow.DLQ_TAG;
+import static 
org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubMessageToRow.MAIN_TAG;
 import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR;
+import static org.junit.Assert.assertEquals;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.StreamSupport;
 import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.joda.time.DateTime;
 import org.joda.time.Instant;
 import org.junit.Rule;
@@ -69,7 +81,13 @@ public void testConvertsMessages() {
                    message(2, map("bttr", "vbl"), "{ \"name\" : \"baz\", 
\"id\" : 5 }"),
                    message(3, map("cttr", "vcl"), "{ \"id\" : 7, \"name\" : 
\"bar\" }"),
                    message(4, map("dttr", "vdl"), "{ \"name\" : \"qaz\", 
\"id\" : 8 }")))
-        .apply("convert", PubsubMessageToRow.forSchema(messageSchema));
+        .apply("convert",
+               ParDo.of(
+                   PubsubMessageToRow
+                            .builder()
+                            .messageSchema(messageSchema)
+                            .useDlq(false)
+                            .build()));
 
     PAssert
         .that(rows)
@@ -91,6 +109,69 @@ public void testConvertsMessages() {
     pipeline.run();
   }
 
+  @Test
+  public void testSendsInvalidToDLQ() {
+    Schema payloadSchema =
+        RowSqlTypes
+            .builder()
+            .withIntegerField("id")
+            .withVarcharField("name")
+            .build();
+
+    Schema messageSchema =
+        RowSqlTypes
+            .builder()
+            .withTimestampField("event_timestamp")
+            .withMapField("attributes", VARCHAR, VARCHAR)
+            .withRowField("payload", payloadSchema)
+            .build();
+
+    PCollectionTuple outputs = pipeline
+        .apply("create",
+               Create.timestamped(
+                   message(1, map("attr1", "val1"), "{ \"invalid1\" : 
\"sdfsd\" }"),
+                   message(2, map("attr2", "val2"), "{ \"invalid2"),
+                   message(3, map("attr", "val"), "{ \"id\" : 3, \"name\" : 
\"foo\" }"),
+                   message(4, map("bttr", "vbl"), "{ \"name\" : \"baz\", 
\"id\" : 5 }")))
+        .apply("convert",
+               ParDo.of(
+                   PubsubMessageToRow
+                       .builder()
+                       .messageSchema(messageSchema)
+                       .useDlq(true)
+                       .build())
+                    .withOutputTags(MAIN_TAG, TupleTagList.of(DLQ_TAG)));
+
+    PCollection<Row> rows = outputs.get(MAIN_TAG);
+    PCollection<PubsubMessage> dlqMessages = outputs.get(DLQ_TAG);
+
+    PAssert
+        .that(dlqMessages)
+        .satisfies(messages -> {
+          assertEquals(2, size(messages));
+          assertEquals(
+              ImmutableSet.of(map("attr1", "val1"), map("attr2", "val2")),
+              convertToSet(messages, m -> m.getAttributeMap()));
+
+          assertEquals(
+              ImmutableSet.of("{ \"invalid1\" : \"sdfsd\" }", "{ \"invalid2"),
+              convertToSet(messages, m -> new String(m.getPayload(), UTF_8)));
+          return null;
+        });
+
+    PAssert
+        .that(rows)
+        .containsInAnyOrder(
+            Row.withSchema(messageSchema)
+               .addValues(ts(3), map("attr", "val"), row(payloadSchema, 3, 
"foo"))
+               .build(),
+            Row.withSchema(messageSchema)
+               .addValues(ts(4), map("bttr", "vbl"), row(payloadSchema, 5, 
"baz"))
+               .build());
+
+    pipeline.run();
+  }
+
   private Row row(Schema schema, int id, String name) {
     return Row.withSchema(schema).addValues(id, name).build();
   }
@@ -105,11 +186,18 @@ private Row row(Schema schema, int id, String name) {
       String payload) {
 
     return TimestampedValue.of(
-        new PubsubMessage(payload.getBytes(StandardCharsets.UTF_8), 
attributes),
+        new PubsubMessage(payload.getBytes(UTF_8), attributes),
         ts(timestamp));
   }
 
   private Instant ts(long timestamp) {
     return new DateTime(timestamp).toInstant();
   }
+
+  private static <V> Set<V> convertToSet(
+      Iterable<PubsubMessage> messages,
+      Function<? super PubsubMessage, V> mapper) {
+
+    return StreamSupport.stream(messages.spliterator(), 
false).map(mapper).collect(toSet());
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
index 731570695ee..e0a7e752d75 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
@@ -145,12 +145,12 @@ static String createTopicName(Description description) 
throws IOException {
   /**
    * Topic path where events will be published to.
    */
-  public String eventsTopicPath() {
+  public String topicPath() {
     return eventsTopicPath;
   }
 
   /**
-   * Publish messages to {@link #eventsTopicPath()}.
+   * Publish messages to {@link #topicPath()}.
    */
   public void publish(List<PubsubMessage> messages) throws IOException {
     List<PubsubClient.OutgoingMessage> outgoingMessages =


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 103108)
    Time Spent: 2.5h  (was: 2h 20m)

> [SQL] Add a DLQ support for Pubsub tables
> -----------------------------------------
>
>                 Key: BEAM-4199
>                 URL: https://issues.apache.org/jira/browse/BEAM-4199
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Assignee: Anton Kedin
>            Priority: Major
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Currently we crash the pipeline if there's any error processing the message 
> from the pubsub, including if it has incorrect JSON format, like missing 
> fields etc.
> Correct solution would be for the user to specify a way to handle the errors, 
> and ideally point to a dead-letter-queue where Beam should send the messages 
> it could not process.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to