This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch revert-27021-more-dlq in repository https://gitbox.apache.org/repos/asf/beam.git
commit 2c3afccc5e26ddcc3de4cd18a539491eb838ccf2 Author: Yi Hu <[email protected]> AuthorDate: Wed Jun 21 11:32:28 2023 -0400 Revert "Adding error tag and metrics in SpannerWriteSchemaTransformProvider (#27021)" This reverts commit 7ecd8e514448406a15bedc85a680b1e7153774f9. --- .../SpannerWriteSchemaTransformProvider.java | 37 ++-------------------- 1 file changed, 2 insertions(+), 35 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java index a21be9be671..43e0de3b903 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java @@ -21,13 +21,10 @@ import com.google.auto.service.AutoService; import com.google.auto.value.AutoValue; import com.google.cloud.spanner.Mutation; import java.io.Serializable; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; @@ -35,11 +32,9 @@ import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.FlatMapElements; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionRowTuple; @@ -74,31 +69,6 @@ public class SpannerWriteSchemaTransformProvider this.configuration = configuration; } - // A generic counter for PCollection of Row. Will be initialized with the given - // name argument. Performs element-wise counter of the input PCollection. - private static class ElementCounterFn extends DoFn<Row, Row> { - - private Counter spannerGenericElementCounter; - private Long elementsInBundle = 0L; - - ElementCounterFn(String name) { - this.spannerGenericElementCounter = - Metrics.counter(SpannerSchemaTransformWrite.class, name); - } - - @ProcessElement - public void process(ProcessContext c) { - this.elementsInBundle += 1; - c.output(c.element()); - } - - @FinishBundle - public void finish(FinishBundleContext c) { - this.spannerGenericElementCounter.inc(this.elementsInBundle); - this.elementsInBundle = 0L; - } - } - @Override public @UnknownKeyFor @NonNull @Initialized PTransform< @UnknownKeyFor @NonNull @Initialized PCollectionRowTuple, @@ -156,11 +126,8 @@ public class SpannerWriteSchemaTransformProvider mutation.getValues().iterator())) .build()) .collect(Collectors.toList()))) - .apply( - "error-count", ParDo.of(new ElementCounterFn("Spanner-write-error-counter"))) .setRowSchema(failureSchema); - - return PCollectionRowTuple.of("failures", failures).and("errors", failures); + return PCollectionRowTuple.of("failures", failures); } }; } @@ -180,7 +147,7 @@ public class SpannerWriteSchemaTransformProvider @Override public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> outputCollectionNames() { - return Arrays.asList("failures", "errors"); + return Collections.singletonList("failures"); } @AutoValue
