robertwb commented on code in PR #29164:
URL: https://github.com/apache/beam/pull/29164#discussion_r1389714424


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/BadRecordRouter.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.errorhandling;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecord.Failure;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecord.Record;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.TupleTag;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public interface BadRecordRouter extends Serializable {
+
+  BadRecordRouter THROWING_ROUTER = new ThrowingBadRecordRouter();
+
+  BadRecordRouter RECORDING_ROUTER = new RecordingBadRecordRouter();
+
+  TupleTag<BadRecord> BAD_RECORD_TAG = new TupleTag<>();
+
+  <RecordT> void route(
+      MultiOutputReceiver outputReceiver,
+      RecordT record,
+      @Nullable Coder<RecordT> coder,
+      @Nullable Exception exception,
+      String description,
+      String failingTransform)
+      throws Exception;
+
+  class ThrowingBadRecordRouter implements BadRecordRouter {
+
+    @Override
+    public <RecordT> void route(
+        MultiOutputReceiver outputReceiver,
+        RecordT record,
+        @Nullable Coder<RecordT> coder,
+        @Nullable Exception exception,
+        String description,
+        String failingTransform)
+        throws Exception {
+      if (exception != null) {
+        throw exception;
+      }
+    }
+  }
+
+  class RecordingBadRecordRouter implements BadRecordRouter {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RecordingBadRecordRouter.class);
+
+    @Override
+    public <RecordT> void route(
+        MultiOutputReceiver outputReceiver,
+        RecordT record,
+        @Nullable Coder<RecordT> coder,
+        @Nullable Exception exception,
+        String description,
+        String failingTransform)
+        throws Exception {
+      Preconditions.checkArgumentNotNull(record);
+      ObjectWriter objectWriter = new 
ObjectMapper().writer().withDefaultPrettyPrinter();
+
+      // Build up record information
+      BadRecord.Record.Builder recordBuilder = Record.builder();
+      try {
+        recordBuilder.setJsonRecord(objectWriter.writeValueAsString(record));
+      } catch (Exception e) {
+        LOG.error("Unable to serialize record as JSON. Human readable record 
will be null", e);

Review Comment:
   Should we fall back to toString()? 



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/BadRecordRouter.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.errorhandling;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecord.Failure;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecord.Record;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.TupleTag;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public interface BadRecordRouter extends Serializable {
+
+  BadRecordRouter THROWING_ROUTER = new ThrowingBadRecordRouter();
+
+  BadRecordRouter RECORDING_ROUTER = new RecordingBadRecordRouter();
+
+  TupleTag<BadRecord> BAD_RECORD_TAG = new TupleTag<>();
+
+  <RecordT> void route(
+      MultiOutputReceiver outputReceiver,
+      RecordT record,
+      @Nullable Coder<RecordT> coder,
+      @Nullable Exception exception,
+      String description,
+      String failingTransform)
+      throws Exception;
+
+  class ThrowingBadRecordRouter implements BadRecordRouter {
+
+    @Override
+    public <RecordT> void route(
+        MultiOutputReceiver outputReceiver,
+        RecordT record,
+        @Nullable Coder<RecordT> coder,
+        @Nullable Exception exception,
+        String description,
+        String failingTransform)
+        throws Exception {
+      if (exception != null) {
+        throw exception;
+      }
+    }
+  }
+
+  class RecordingBadRecordRouter implements BadRecordRouter {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RecordingBadRecordRouter.class);
+
+    @Override
+    public <RecordT> void route(
+        MultiOutputReceiver outputReceiver,
+        RecordT record,
+        @Nullable Coder<RecordT> coder,
+        @Nullable Exception exception,
+        String description,
+        String failingTransform)
+        throws Exception {
+      Preconditions.checkArgumentNotNull(record);
+      ObjectWriter objectWriter = new 
ObjectMapper().writer().withDefaultPrettyPrinter();
+
+      // Build up record information
+      BadRecord.Record.Builder recordBuilder = Record.builder();
+      try {
+        recordBuilder.setJsonRecord(objectWriter.writeValueAsString(record));
+      } catch (Exception e) {
+        LOG.error("Unable to serialize record as JSON. Human readable record 
will be null", e);
+      }
+
+      // We will sometimes not have a coder for a failing record, for example 
if it has already been
+      // modified within the dofn.
+      if (coder != null) {
+        recordBuilder.setCoder(coder.toString());
+
+        try {
+          ByteArrayOutputStream stream = new ByteArrayOutputStream();

Review Comment:
   IIRC, in CoderUtils that can make this nicer. 



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/BRHEnabledPTransform.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.errorhandling;
+
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import 
org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.NoOpErrorHandler;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Dummy PTransform that is configurable with a Bad Record Handler. */
+public class BRHEnabledPTransform extends PTransform<PCollection<Integer>, 
PCollection<Integer>> {
+
+  private ErrorHandler<BadRecord, ?> errorHandler = new NoOpErrorHandler<>();
+
+  private BadRecordRouter badRecordRouter = BadRecordRouter.THROWING_ROUTER;
+
+  private static final TupleTag<Integer> RECORDS = new TupleTag<>();
+
+  public BRHEnabledPTransform() {}
+
+  public BRHEnabledPTransform withBadRecordHandler(ErrorHandler<BadRecord, ?> 
errorHandler) {
+    this.errorHandler = errorHandler;
+    this.badRecordRouter = BadRecordRouter.RECORDING_ROUTER;
+    return this;
+  }
+
+  @Override
+  public PCollection<Integer> expand(PCollection<Integer> input) {
+    PCollectionTuple pCollectionTuple =
+        input.apply(
+            "NoOpDoFn",
+            ParDo.of(new OddIsBad(badRecordRouter))
+                .withOutputTags(RECORDS, 
TupleTagList.of(BadRecordRouter.BAD_RECORD_TAG)));
+
+    Coder<BadRecord> badRecordCoder;
+
+    try {
+      SchemaRegistry schemaRegistry = input.getPipeline().getSchemaRegistry();
+      badRecordCoder =
+          SchemaCoder.of(
+              schemaRegistry.getSchema(BadRecord.class),
+              TypeDescriptor.of(BadRecord.class),
+              schemaRegistry.getToRowFunction(BadRecord.class),
+              schemaRegistry.getFromRowFunction(BadRecord.class));
+    } catch (NoSuchSchemaException e) {
+      throw new RuntimeException(e);
+    }
+
+    errorHandler.addErrorCollection(

Review Comment:
   Let's at least drop a TODO to add a more native ParDo.withBadRecordHandler() 
to reduce this boilerplate. 



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlerTest.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.errorhandling;
+
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ErrorHandlerTest {
+  @Rule public final TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testGoodErrorHandlerUsage() throws Exception {
+    try (ErrorHandler<String, PCollection<String>> eh =
+        pipeline.registerErrorHandler(new DummySinkTransform<>())) {}
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testBadErrorHandlerUsage() {
+
+    pipeline.registerErrorHandler(new 
DummySinkTransform<PCollection<String>>());
+
+    thrown.expect(IllegalStateException.class);
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testBRHEnabledPTransform() {
+    PCollection<Integer> record = pipeline.apply(Create.of(1, 2, 3, 4));
+    record.apply(new BRHEnabledPTransform());
+
+    thrown.expect(RuntimeException.class);

Review Comment:
   Similarly, we no error handler was registered, so the pipeline is expected 
to fail.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.errorhandling;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.POutput;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An Error Handler is a utility object used for plumbing error PCollections 
to a configured sink
+ * Error Handlers must be closed before a pipeline is run to properly pipe 
error collections to the
+ * sink, and the pipeline will be rejected if any handlers aren't closed.
+ *
+ * @param <ErrorT> The type of the error object. This will usually be a {@link 
BadRecord}, but can
+ *     be any type
+ * @param <OutputT> The return type of the sink PTransform.
+ *     <p>Usage of Error Handlers:
+ *     <p>Simple usage with one DLQ
+ *     <pre>{@code
+ * PCollection<?> records = ...;
+ * try (ErrorHandler<E,T> errorHandler = 
pipeline.registerErrorHandler(SomeSink.write())) {
+ *  PCollection<?> results = 
records.apply(SomeIO.write().withDeadLetterQueue(errorHandler));
+ * }
+ * results.apply(SomeOtherTransform);
+ * }</pre>
+ *     Usage with multiple DLQ stages
+ *     <pre>{@code
+ * PCollection<?> records = ...;
+ * try (ErrorHandler<E,T> errorHandler = 
pipeline.registerErrorHandler(SomeSink.write())) {
+ *  PCollection<?> results = 
records.apply(SomeIO.write().withDeadLetterQueue(errorHandler))
+ *                        
.apply(OtherTransform.builder().withDeadLetterQueue(errorHandler));
+ * }
+ * results.apply(SomeOtherTransform);
+ * }</pre>
+ */
+public interface ErrorHandler<ErrorT, OutputT extends POutput> extends 
AutoCloseable {
+
+  void addErrorCollection(PCollection<ErrorT> errorCollection);
+
+  boolean isClosed();
+
+  OutputT getOutput();
+
+  class PTransformErrorHandler<ErrorT, OutputT extends POutput>
+      implements ErrorHandler<ErrorT, OutputT> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PTransformErrorHandler.class);
+    private final PTransform<PCollection<ErrorT>, OutputT> sinkTransform;
+
+    private final List<PCollection<ErrorT>> errorCollections = new 
ArrayList<>();
+
+    private @Nullable OutputT sinkOutput = null;
+
+    private boolean closed = false;
+
+    /**
+     * Constructs a new ErrorHandler, but should not be called directly. 
Instead, call
+     * pipeline.registerErrorHandler to ensure safe pipeline construction
+     */
+    @Internal
+    public PTransformErrorHandler(PTransform<PCollection<ErrorT>, OutputT> 
sinkTransform) {
+      this.sinkTransform = sinkTransform;
+    }
+
+    @Override
+    public void addErrorCollection(PCollection<ErrorT> errorCollection) {
+      errorCollections.add(errorCollection);
+    }
+
+    @Override
+    public boolean isClosed() {
+      return closed;
+    }
+
+    @Override
+    public OutputT getOutput() {
+      if (!this.isClosed()) {
+        throw new IllegalStateException(
+            "ErrorHandler must be finalized before the output can be 
returned");
+      }
+      // make the static analysis checker happy
+      Preconditions.checkArgumentNotNull(sinkOutput);
+      return sinkOutput;
+    }
+
+    @Override
+    public void close() {
+      closed = true;
+      if (errorCollections.isEmpty()) {
+        LOG.warn("Empty list of error pcollections passed to ErrorHandler.");
+        return;
+      }
+      LOG.debug(
+          "{} error collections are being sent to {}",
+          errorCollections.size(),
+          sinkTransform.getName());
+      sinkOutput =
+          PCollectionList.of(errorCollections)
+              .apply(Flatten.pCollections())
+              .apply(new WriteErrorMetrics<ErrorT>(sinkTransform))
+              .apply(
+                  sinkTransform.addAnnotation(
+                      "FeatureMetric", 
"ErrorHandler".getBytes(StandardCharsets.UTF_8)));
+    }
+
+    public static class WriteErrorMetrics<ErrorT>
+        extends PTransform<PCollection<ErrorT>, PCollection<ErrorT>> {
+
+      private final Counter errorCounter;
+
+      public WriteErrorMetrics(PTransform<?, ?> sinkTransform) {

Review Comment:
   Why is sinkTransform being passed here? (Creating a whole Transform for this 
seems overkill--perhaps apply the ParDo directly in the close method above. 
Alternatively, perhaps the Flatten + WriteErrorMetrics + ApplySink should be 
nested in a single transform.)



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/BadRecordRouter.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.errorhandling;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecord.Failure;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecord.Record;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.TupleTag;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public interface BadRecordRouter extends Serializable {
+
+  BadRecordRouter THROWING_ROUTER = new ThrowingBadRecordRouter();
+
+  BadRecordRouter RECORDING_ROUTER = new RecordingBadRecordRouter();
+
+  TupleTag<BadRecord> BAD_RECORD_TAG = new TupleTag<>();

Review Comment:
   static final? (Or is this implied?)



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.errorhandling;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.POutput;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An Error Handler is a utility object used for plumbing error PCollections 
to a configured sink
+ * Error Handlers must be closed before a pipeline is run to properly pipe 
error collections to the
+ * sink, and the pipeline will be rejected if any handlers aren't closed.
+ *
+ * @param <ErrorT> The type of the error object. This will usually be a {@link 
BadRecord}, but can
+ *     be any type
+ * @param <OutputT> The return type of the sink PTransform.
+ *     <p>Usage of Error Handlers:
+ *     <p>Simple usage with one DLQ
+ *     <pre>{@code
+ * PCollection<?> records = ...;
+ * try (ErrorHandler<E,T> errorHandler = 
pipeline.registerErrorHandler(SomeSink.write())) {
+ *  PCollection<?> results = 
records.apply(SomeIO.write().withDeadLetterQueue(errorHandler));
+ * }
+ * results.apply(SomeOtherTransform);
+ * }</pre>
+ *     Usage with multiple DLQ stages
+ *     <pre>{@code
+ * PCollection<?> records = ...;
+ * try (ErrorHandler<E,T> errorHandler = 
pipeline.registerErrorHandler(SomeSink.write())) {
+ *  PCollection<?> results = 
records.apply(SomeIO.write().withDeadLetterQueue(errorHandler))
+ *                        
.apply(OtherTransform.builder().withDeadLetterQueue(errorHandler));
+ * }
+ * results.apply(SomeOtherTransform);
+ * }</pre>
+ */
+public interface ErrorHandler<ErrorT, OutputT extends POutput> extends 
AutoCloseable {
+
+  void addErrorCollection(PCollection<ErrorT> errorCollection);
+
+  boolean isClosed();
+
+  OutputT getOutput();
+
+  class PTransformErrorHandler<ErrorT, OutputT extends POutput>
+      implements ErrorHandler<ErrorT, OutputT> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PTransformErrorHandler.class);
+    private final PTransform<PCollection<ErrorT>, OutputT> sinkTransform;
+
+    private final List<PCollection<ErrorT>> errorCollections = new 
ArrayList<>();
+
+    private @Nullable OutputT sinkOutput = null;
+
+    private boolean closed = false;
+
+    /**
+     * Constructs a new ErrorHandler, but should not be called directly. 
Instead, call
+     * pipeline.registerErrorHandler to ensure safe pipeline construction
+     */
+    @Internal
+    public PTransformErrorHandler(PTransform<PCollection<ErrorT>, OutputT> 
sinkTransform) {
+      this.sinkTransform = sinkTransform;
+    }
+
+    @Override
+    public void addErrorCollection(PCollection<ErrorT> errorCollection) {
+      errorCollections.add(errorCollection);
+    }
+
+    @Override
+    public boolean isClosed() {
+      return closed;
+    }
+
+    @Override
+    public OutputT getOutput() {
+      if (!this.isClosed()) {
+        throw new IllegalStateException(
+            "ErrorHandler must be finalized before the output can be 
returned");
+      }
+      // make the static analysis checker happy
+      Preconditions.checkArgumentNotNull(sinkOutput);
+      return sinkOutput;
+    }
+
+    @Override
+    public void close() {
+      closed = true;
+      if (errorCollections.isEmpty()) {
+        LOG.warn("Empty list of error pcollections passed to ErrorHandler.");
+        return;

Review Comment:
   Shouldn't we still write an empty set of errors in this case? (Perhaps we 
would have to pass the pipeline object in close to do that.)



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/BadRecordRouter.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.errorhandling;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecord.Failure;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecord.Record;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.TupleTag;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public interface BadRecordRouter extends Serializable {
+
+  BadRecordRouter THROWING_ROUTER = new ThrowingBadRecordRouter();
+
+  BadRecordRouter RECORDING_ROUTER = new RecordingBadRecordRouter();
+
+  TupleTag<BadRecord> BAD_RECORD_TAG = new TupleTag<>();
+
+  <RecordT> void route(
+      MultiOutputReceiver outputReceiver,
+      RecordT record,
+      @Nullable Coder<RecordT> coder,
+      @Nullable Exception exception,
+      String description,
+      String failingTransform)
+      throws Exception;
+
+  class ThrowingBadRecordRouter implements BadRecordRouter {
+
+    @Override
+    public <RecordT> void route(
+        MultiOutputReceiver outputReceiver,
+        RecordT record,
+        @Nullable Coder<RecordT> coder,
+        @Nullable Exception exception,
+        String description,
+        String failingTransform)
+        throws Exception {
+      if (exception != null) {
+        throw exception;
+      }
+    }
+  }
+
+  class RecordingBadRecordRouter implements BadRecordRouter {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RecordingBadRecordRouter.class);
+
+    @Override
+    public <RecordT> void route(
+        MultiOutputReceiver outputReceiver,
+        RecordT record,
+        @Nullable Coder<RecordT> coder,
+        @Nullable Exception exception,
+        String description,
+        String failingTransform)
+        throws Exception {
+      Preconditions.checkArgumentNotNull(record);
+      ObjectWriter objectWriter = new 
ObjectMapper().writer().withDefaultPrettyPrinter();
+
+      // Build up record information
+      BadRecord.Record.Builder recordBuilder = Record.builder();
+      try {
+        recordBuilder.setJsonRecord(objectWriter.writeValueAsString(record));
+      } catch (Exception e) {
+        LOG.error("Unable to serialize record as JSON. Human readable record 
will be null", e);
+      }
+
+      // We will sometimes not have a coder for a failing record, for example 
if it has already been
+      // modified within the dofn.
+      if (coder != null) {
+        recordBuilder.setCoder(coder.toString());
+
+        try {
+          ByteArrayOutputStream stream = new ByteArrayOutputStream();
+          coder.encode(record, stream);
+          byte[] bytes = stream.toByteArray();
+          recordBuilder.setEncodedRecord(bytes);

Review Comment:
   So a common case we will want to handle is malformed input bytes when trying 
to read/parse a record. How is that handled here? 



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/BRHEnabledPTransform.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.errorhandling;
+
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import 
org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.NoOpErrorHandler;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Dummy PTransform that is configurable with a Bad Record Handler. */
+public class BRHEnabledPTransform extends PTransform<PCollection<Integer>, 
PCollection<Integer>> {
+
+  private ErrorHandler<BadRecord, ?> errorHandler = new NoOpErrorHandler<>();
+
+  private BadRecordRouter badRecordRouter = BadRecordRouter.THROWING_ROUTER;
+
+  private static final TupleTag<Integer> RECORDS = new TupleTag<>();
+
+  public BRHEnabledPTransform() {}
+
+  public BRHEnabledPTransform withBadRecordHandler(ErrorHandler<BadRecord, ?> 
errorHandler) {
+    this.errorHandler = errorHandler;
+    this.badRecordRouter = BadRecordRouter.RECORDING_ROUTER;
+    return this;
+  }
+
+  @Override
+  public PCollection<Integer> expand(PCollection<Integer> input) {
+    PCollectionTuple pCollectionTuple =
+        input.apply(
+            "NoOpDoFn",
+            ParDo.of(new OddIsBad(badRecordRouter))
+                .withOutputTags(RECORDS, 
TupleTagList.of(BadRecordRouter.BAD_RECORD_TAG)));
+
+    Coder<BadRecord> badRecordCoder;
+
+    try {
+      SchemaRegistry schemaRegistry = input.getPipeline().getSchemaRegistry();
+      badRecordCoder =
+          SchemaCoder.of(
+              schemaRegistry.getSchema(BadRecord.class),
+              TypeDescriptor.of(BadRecord.class),
+              schemaRegistry.getToRowFunction(BadRecord.class),
+              schemaRegistry.getFromRowFunction(BadRecord.class));
+    } catch (NoSuchSchemaException e) {
+      throw new RuntimeException(e);
+    }
+
+    errorHandler.addErrorCollection(
+        
pCollectionTuple.get(BadRecordRouter.BAD_RECORD_TAG).setCoder(badRecordCoder));
+
+    return pCollectionTuple.get(RECORDS).setCoder(BigEndianIntegerCoder.of());
+  }
+
+  public static class OddIsBad extends DoFn<Integer, Integer> {
+
+    private BadRecordRouter badRecordRouter;
+
+    public OddIsBad(BadRecordRouter badRecordRouter) {
+      this.badRecordRouter = badRecordRouter;
+    }
+
+    @ProcessElement
+    public void processElement(@Element Integer element, MultiOutputReceiver 
receiver)
+        throws Exception {
+      if (element % 2 == 0) {
+        receiver.get(RECORDS).output(element);
+      } else {
+        badRecordRouter.route(
+            receiver,
+            element,
+            BigEndianIntegerCoder.of(),
+            new RuntimeException(),
+            "Integer was odd",
+            "NoOpDoFn");

Review Comment:
   This last argument should be system provided. 



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/BRHEnabledPTransform.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.errorhandling;
+
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import 
org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.NoOpErrorHandler;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Dummy PTransform that is configurable with a Bad Record Handler. */
+public class BRHEnabledPTransform extends PTransform<PCollection<Integer>, 
PCollection<Integer>> {
+
+  private ErrorHandler<BadRecord, ?> errorHandler = new NoOpErrorHandler<>();
+
+  private BadRecordRouter badRecordRouter = BadRecordRouter.THROWING_ROUTER;
+
+  private static final TupleTag<Integer> RECORDS = new TupleTag<>();
+
+  public BRHEnabledPTransform() {}
+
+  public BRHEnabledPTransform withBadRecordHandler(ErrorHandler<BadRecord, ?> 
errorHandler) {
+    this.errorHandler = errorHandler;
+    this.badRecordRouter = BadRecordRouter.RECORDING_ROUTER;
+    return this;
+  }
+
+  @Override
+  public PCollection<Integer> expand(PCollection<Integer> input) {
+    PCollectionTuple pCollectionTuple =
+        input.apply(
+            "NoOpDoFn",
+            ParDo.of(new OddIsBad(badRecordRouter))
+                .withOutputTags(RECORDS, 
TupleTagList.of(BadRecordRouter.BAD_RECORD_TAG)));
+
+    Coder<BadRecord> badRecordCoder;
+
+    try {
+      SchemaRegistry schemaRegistry = input.getPipeline().getSchemaRegistry();
+      badRecordCoder =
+          SchemaCoder.of(
+              schemaRegistry.getSchema(BadRecord.class),
+              TypeDescriptor.of(BadRecord.class),
+              schemaRegistry.getToRowFunction(BadRecord.class),
+              schemaRegistry.getFromRowFunction(BadRecord.class));
+    } catch (NoSuchSchemaException e) {
+      throw new RuntimeException(e);
+    }
+
+    errorHandler.addErrorCollection(
+        
pCollectionTuple.get(BadRecordRouter.BAD_RECORD_TAG).setCoder(badRecordCoder));
+
+    return pCollectionTuple.get(RECORDS).setCoder(BigEndianIntegerCoder.of());
+  }
+
+  public static class OddIsBad extends DoFn<Integer, Integer> {
+
+    private BadRecordRouter badRecordRouter;
+
+    public OddIsBad(BadRecordRouter badRecordRouter) {
+      this.badRecordRouter = badRecordRouter;
+    }
+
+    @ProcessElement
+    public void processElement(@Element Integer element, MultiOutputReceiver 
receiver)
+        throws Exception {
+      if (element % 2 == 0) {
+        receiver.get(RECORDS).output(element);
+      } else {
+        badRecordRouter.route(
+            receiver,
+            element,
+            BigEndianIntegerCoder.of(),
+            new RuntimeException(),

Review Comment:
   Provide a message?



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.errorhandling;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.POutput;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An Error Handler is a utility object used for plumbing error PCollections 
to a configured sink
+ * Error Handlers must be closed before a pipeline is run to properly pipe 
error collections to the
+ * sink, and the pipeline will be rejected if any handlers aren't closed.
+ *
+ * @param <ErrorT> The type of the error object. This will usually be a {@link 
BadRecord}, but can
+ *     be any type
+ * @param <OutputT> The return type of the sink PTransform.
+ *     <p>Usage of Error Handlers:
+ *     <p>Simple usage with one DLQ
+ *     <pre>{@code
+ * PCollection<?> records = ...;
+ * try (ErrorHandler<E,T> errorHandler = 
pipeline.registerErrorHandler(SomeSink.write())) {
+ *  PCollection<?> results = 
records.apply(SomeIO.write().withDeadLetterQueue(errorHandler));
+ * }
+ * results.apply(SomeOtherTransform);
+ * }</pre>
+ *     Usage with multiple DLQ stages
+ *     <pre>{@code
+ * PCollection<?> records = ...;
+ * try (ErrorHandler<E,T> errorHandler = 
pipeline.registerErrorHandler(SomeSink.write())) {
+ *  PCollection<?> results = 
records.apply(SomeIO.write().withDeadLetterQueue(errorHandler))
+ *                        
.apply(OtherTransform.builder().withDeadLetterQueue(errorHandler));
+ * }
+ * results.apply(SomeOtherTransform);
+ * }</pre>
+ */
+public interface ErrorHandler<ErrorT, OutputT extends POutput> extends 
AutoCloseable {
+
+  void addErrorCollection(PCollection<ErrorT> errorCollection);
+
+  boolean isClosed();
+
+  OutputT getOutput();
+
+  class PTransformErrorHandler<ErrorT, OutputT extends POutput>
+      implements ErrorHandler<ErrorT, OutputT> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PTransformErrorHandler.class);
+    private final PTransform<PCollection<ErrorT>, OutputT> sinkTransform;
+
+    private final List<PCollection<ErrorT>> errorCollections = new 
ArrayList<>();
+
+    private @Nullable OutputT sinkOutput = null;
+
+    private boolean closed = false;
+
+    /**
+     * Constructs a new ErrorHandler, but should not be called directly. 
Instead, call
+     * pipeline.registerErrorHandler to ensure safe pipeline construction
+     */
+    @Internal
+    public PTransformErrorHandler(PTransform<PCollection<ErrorT>, OutputT> 
sinkTransform) {
+      this.sinkTransform = sinkTransform;
+    }
+
+    @Override
+    public void addErrorCollection(PCollection<ErrorT> errorCollection) {
+      errorCollections.add(errorCollection);
+    }
+
+    @Override
+    public boolean isClosed() {
+      return closed;
+    }
+
+    @Override
+    public OutputT getOutput() {
+      if (!this.isClosed()) {
+        throw new IllegalStateException(
+            "ErrorHandler must be finalized before the output can be 
returned");
+      }
+      // make the static analysis checker happy
+      Preconditions.checkArgumentNotNull(sinkOutput);
+      return sinkOutput;
+    }
+
+    @Override
+    public void close() {
+      closed = true;
+      if (errorCollections.isEmpty()) {
+        LOG.warn("Empty list of error pcollections passed to ErrorHandler.");
+        return;
+      }
+      LOG.debug(
+          "{} error collections are being sent to {}",
+          errorCollections.size(),
+          sinkTransform.getName());
+      sinkOutput =
+          PCollectionList.of(errorCollections)
+              .apply(Flatten.pCollections())
+              .apply(new WriteErrorMetrics<ErrorT>(sinkTransform))
+              .apply(
+                  sinkTransform.addAnnotation(
+                      "FeatureMetric", 
"ErrorHandler".getBytes(StandardCharsets.UTF_8)));
+    }
+
+    public static class WriteErrorMetrics<ErrorT>
+        extends PTransform<PCollection<ErrorT>, PCollection<ErrorT>> {
+
+      private final Counter errorCounter;
+
+      public WriteErrorMetrics(PTransform<?, ?> sinkTransform) {
+        errorCounter = Metrics.counter("ErrorMetrics", sinkTransform.getName() 
+ "-input");
+      }
+
+      @Override
+      public PCollection<ErrorT> expand(PCollection<ErrorT> input) {
+        return input.apply(ParDo.of(new CountErrors<ErrorT>(errorCounter)));
+      }
+
+      public static class CountErrors<ErrorT> extends DoFn<ErrorT, ErrorT> {
+
+        private final Counter errorCounter;
+
+        public CountErrors(Counter errorCounter) {
+          this.errorCounter = errorCounter;
+        }
+
+        @ProcessElement
+        public void processElement(@Element ErrorT error, 
OutputReceiver<ErrorT> receiver) {
+          errorCounter.inc();
+          receiver.output(error);
+        }
+      }
+    }
+  }
+
+  @Internal
+  class NoOpErrorHandler<ErrorT, OutputT extends POutput> implements 
ErrorHandler<ErrorT, OutputT> {
+
+    @Override
+    public void addErrorCollection(PCollection<ErrorT> errorCollection) {}
+
+    @Override
+    public boolean isClosed() {
+      throw new IllegalArgumentException("No Op handler should not be closed");

Review Comment:
   Perhaps we should still allow closing this one? (It's unclear what the 
intended use is. If it's to ignore errors, let's make it a bit scarier 
sounding.)



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/BRHEnabledPTransform.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.errorhandling;
+
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import 
org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.NoOpErrorHandler;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Dummy PTransform that is configurable with a Bad Record Handler. */
+public class BRHEnabledPTransform extends PTransform<PCollection<Integer>, 
PCollection<Integer>> {
+
+  private ErrorHandler<BadRecord, ?> errorHandler = new NoOpErrorHandler<>();
+
+  private BadRecordRouter badRecordRouter = BadRecordRouter.THROWING_ROUTER;
+
+  private static final TupleTag<Integer> RECORDS = new TupleTag<>();
+
+  public BRHEnabledPTransform() {}
+
+  public BRHEnabledPTransform withBadRecordHandler(ErrorHandler<BadRecord, ?> 
errorHandler) {
+    this.errorHandler = errorHandler;
+    this.badRecordRouter = BadRecordRouter.RECORDING_ROUTER;
+    return this;
+  }
+
+  @Override
+  public PCollection<Integer> expand(PCollection<Integer> input) {
+    PCollectionTuple pCollectionTuple =
+        input.apply(
+            "NoOpDoFn",
+            ParDo.of(new OddIsBad(badRecordRouter))
+                .withOutputTags(RECORDS, 
TupleTagList.of(BadRecordRouter.BAD_RECORD_TAG)));
+
+    Coder<BadRecord> badRecordCoder;
+
+    try {
+      SchemaRegistry schemaRegistry = input.getPipeline().getSchemaRegistry();
+      badRecordCoder =
+          SchemaCoder.of(
+              schemaRegistry.getSchema(BadRecord.class),
+              TypeDescriptor.of(BadRecord.class),
+              schemaRegistry.getToRowFunction(BadRecord.class),
+              schemaRegistry.getFromRowFunction(BadRecord.class));
+    } catch (NoSuchSchemaException e) {
+      throw new RuntimeException(e);
+    }
+
+    errorHandler.addErrorCollection(
+        
pCollectionTuple.get(BadRecordRouter.BAD_RECORD_TAG).setCoder(badRecordCoder));

Review Comment:
   Why do we have to do this? Can't the coder be inferred from the BadRecord 
type automatically? If not, let's at least put the above boilerplate into a 
static Coder<BadRecord> getCoder() method on BadRecord. 



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlerTest.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.errorhandling;
+
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ErrorHandlerTest {
+  @Rule public final TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testGoodErrorHandlerUsage() throws Exception {
+    try (ErrorHandler<String, PCollection<String>> eh =
+        pipeline.registerErrorHandler(new DummySinkTransform<>())) {}
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testBadErrorHandlerUsage() {
+
+    pipeline.registerErrorHandler(new 
DummySinkTransform<PCollection<String>>());
+
+    thrown.expect(IllegalStateException.class);
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testBRHEnabledPTransform() {
+    PCollection<Integer> record = pipeline.apply(Create.of(1, 2, 3, 4));
+    record.apply(new BRHEnabledPTransform());
+
+    thrown.expect(RuntimeException.class);
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testErrorHandlerWithBRHTransform() throws Exception {
+    PCollection<Integer> record = pipeline.apply(Create.of(1, 2, 3, 4));
+    try (ErrorHandler<BadRecord, PCollection<BadRecord>> eh =
+        pipeline.registerErrorHandler(new DummySinkTransform<>())) {
+      record.apply(new BRHEnabledPTransform().withBadRecordHandler(eh));

Review Comment:
   We should assert that the outputs and bad records are as we expect. 



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/ErrorHandlerTest.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.errorhandling;
+
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ErrorHandlerTest {
+  @Rule public final TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testGoodErrorHandlerUsage() throws Exception {
+    try (ErrorHandler<String, PCollection<String>> eh =
+        pipeline.registerErrorHandler(new DummySinkTransform<>())) {}
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testBadErrorHandlerUsage() {
+
+    pipeline.registerErrorHandler(new 
DummySinkTransform<PCollection<String>>());
+
+    thrown.expect(IllegalStateException.class);

Review Comment:
   Some comments here would be nice, e.g. that we expect this because the error 
handler is never closed.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/errorhandling/BadRecord.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.errorhandling;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@AutoValue
+@DefaultSchema(AutoValueSchema.class)
+public abstract class BadRecord implements Serializable {
+
+  /** Information about the record that failed. */
+  public abstract Record getRecord();
+
+  /** Information about why the record failed. */
+  public abstract Failure getFailure();
+
+  public static Builder builder() {
+    return new AutoValue_BadRecord.Builder();
+  }
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    public abstract Builder setRecord(Record record);
+
+    public abstract Builder setFailure(Failure error);
+
+    public abstract BadRecord build();
+  }
+
+  @AutoValue
+  @DefaultSchema(AutoValueSchema.class)
+  public abstract static class Record implements Serializable {
+
+    /** The failing record, encoded as JSON. Will be null if serialization as 
JSON fails. */
+    public abstract @Nullable String getJsonRecord();
+
+    /**
+     * Nullable to account for failing to encode, or if there is no coder for 
the record at the time
+     * of failure.
+     */
+    @SuppressWarnings("mutable")
+    public abstract byte @Nullable [] getEncodedRecord();
+
+    /** The coder for the record, or null if there is no coder. */
+    public abstract @Nullable String getCoder();
+
+    public static Builder builder() {
+      return new AutoValue_BadRecord_Record.Builder();
+    }
+
+    @AutoValue.Builder
+    public abstract static class Builder {
+
+      public abstract Builder setJsonRecord(@Nullable String jsonRecord);
+
+      @SuppressWarnings("mutable")
+      public abstract Builder setEncodedRecord(byte @Nullable [] 
encodedRecord);
+
+      public abstract Builder setCoder(@Nullable String coder);
+
+      public abstract Record build();
+    }
+  }
+
+  @AutoValue
+  @DefaultSchema(AutoValueSchema.class)
+  public abstract static class Failure implements Serializable {
+
+    /** The exception itself, e.g. IOException. Null if there is a failure 
without an exception. */

Review Comment:
   Including the exception message? What about he traceback (which can be very 
valuable, but perhaps should be a separate field)? 



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/errorhandling/BRHEnabledPTransform.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.errorhandling;
+
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import 
org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.NoOpErrorHandler;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/** Dummy PTransform that is configurable with a Bad Record Handler. */
+public class BRHEnabledPTransform extends PTransform<PCollection<Integer>, 
PCollection<Integer>> {
+
+  private ErrorHandler<BadRecord, ?> errorHandler = new NoOpErrorHandler<>();
+
+  private BadRecordRouter badRecordRouter = BadRecordRouter.THROWING_ROUTER;
+
+  private static final TupleTag<Integer> RECORDS = new TupleTag<>();
+
+  public BRHEnabledPTransform() {}
+
+  public BRHEnabledPTransform withBadRecordHandler(ErrorHandler<BadRecord, ?> 
errorHandler) {
+    this.errorHandler = errorHandler;
+    this.badRecordRouter = BadRecordRouter.RECORDING_ROUTER;
+    return this;
+  }
+
+  @Override
+  public PCollection<Integer> expand(PCollection<Integer> input) {
+    PCollectionTuple pCollectionTuple =
+        input.apply(
+            "NoOpDoFn",
+            ParDo.of(new OddIsBad(badRecordRouter))
+                .withOutputTags(RECORDS, 
TupleTagList.of(BadRecordRouter.BAD_RECORD_TAG)));
+
+    Coder<BadRecord> badRecordCoder;
+
+    try {
+      SchemaRegistry schemaRegistry = input.getPipeline().getSchemaRegistry();
+      badRecordCoder =
+          SchemaCoder.of(
+              schemaRegistry.getSchema(BadRecord.class),
+              TypeDescriptor.of(BadRecord.class),
+              schemaRegistry.getToRowFunction(BadRecord.class),
+              schemaRegistry.getFromRowFunction(BadRecord.class));
+    } catch (NoSuchSchemaException e) {
+      throw new RuntimeException(e);
+    }
+
+    errorHandler.addErrorCollection(

Review Comment:
   Put another way, I think what we have has a decent API for users of the 
withBadRecordHandler API, but we'll need to make it nicer for implementors to 
promote widespread use. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to