This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bb296e46c06 Create CsvIOParseError data class (#31700)
bb296e46c06 is described below

commit bb296e46c06082098bc8cb0757356002d0fc8bee
Author: Damon <[email protected]>
AuthorDate: Thu Jun 27 12:04:16 2024 -0700

    Create CsvIOParseError data class (#31700)
---
 sdks/java/io/csv/build.gradle                      |   1 +
 .../apache/beam/sdk/io/csv/CsvIOParseError.java    |  75 +++++++++++++++
 .../beam/sdk/io/csv/CsvIOParseErrorTest.java       | 101 +++++++++++++++++++++
 3 files changed, 177 insertions(+)

diff --git a/sdks/java/io/csv/build.gradle b/sdks/java/io/csv/build.gradle
index 2be8f59d1f3..92c66ff0140 100644
--- a/sdks/java/io/csv/build.gradle
+++ b/sdks/java/io/csv/build.gradle
@@ -28,6 +28,7 @@ dependencies {
     implementation project(path: ":sdks:java:core", configuration: "shadow")
     implementation library.java.commons_csv
     implementation library.java.vendored_guava_32_1_2_jre
+    implementation library.java.joda_time
     testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
     testImplementation library.java.junit
     testRuntimeOnly project(path: ":runners:direct-java", configuration: 
"shadow")
diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java
 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java
new file mode 100644
index 00000000000..ad7d05912fa
--- /dev/null
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * {@link CsvIOParseError} is a data class to store errors from CSV record 
processing. It is {@link
+ * org.apache.beam.sdk.schemas.Schema} mapped for compatibility with writing 
to Beam Schema-aware
+ * I/O connectors.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class CsvIOParseError {
+
+  static Builder builder() {
+    return new AutoValue_CsvIOParseError.Builder();
+  }
+
+  /** The caught {@link Exception#getMessage()}. */
+  public abstract String getMessage();
+
+  /**
+   * The CSV record associated with the caught {@link Exception}. Annotated 
{@link Nullable} as not
+   * all processing errors are associated with a CSV record.
+   */
+  public abstract @Nullable String getCsvRecord();
+
+  /**
+   * The filename associated with the caught {@link Exception}. Annotated 
{@link Nullable} as not
+   * all processing errors are associated with a file.
+   */
+  public abstract @Nullable String getFilename();
+
+  /** The date and time when the {@link Exception} occurred. */
+  public abstract Instant getObservedTimestamp();
+
+  /** The caught {@link Exception#getStackTrace()}. */
+  public abstract String getStackTrace();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+
+    abstract Builder setMessage(String message);
+
+    abstract Builder setCsvRecord(String csvRecord);
+
+    abstract Builder setFilename(String filename);
+
+    abstract Builder setObservedTimestamp(Instant observedTimestamp);
+
+    abstract Builder setStackTrace(String stackTrace);
+
+    public abstract CsvIOParseError build();
+  }
+}
diff --git 
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseErrorTest.java
 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseErrorTest.java
new file mode 100644
index 00000000000..8e746c00605
--- /dev/null
+++ 
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseErrorTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaProvider;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class CsvIOParseErrorTest {
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  private static final SchemaProvider SCHEMA_PROVIDER = new 
DefaultSchema.DefaultSchemaProvider();
+
+  @Test
+  public void usableInSingleOutput() {
+    List<CsvIOParseError> want =
+        Arrays.asList(
+            CsvIOParseError.builder()
+                .setMessage("error message")
+                .setObservedTimestamp(Instant.now())
+                .setStackTrace("stack trace")
+                .build(),
+            CsvIOParseError.builder()
+                .setMessage("error message")
+                .setObservedTimestamp(Instant.now())
+                .setStackTrace("stack trace")
+                .setFilename("filename")
+                .setCsvRecord("csv record")
+                .build());
+
+    PCollection<CsvIOParseError> errors = pipeline.apply(Create.of(want));
+    PAssert.that(errors).containsInAnyOrder(want);
+
+    pipeline.run();
+  }
+
+  @Test
+  public void usableInMultiOutput() {
+    List<CsvIOParseError> want =
+        Arrays.asList(
+            CsvIOParseError.builder()
+                .setMessage("error message")
+                .setObservedTimestamp(Instant.now())
+                .setStackTrace("stack trace")
+                .build(),
+            CsvIOParseError.builder()
+                .setMessage("error message")
+                .setObservedTimestamp(Instant.now())
+                .setStackTrace("stack trace")
+                .setFilename("filename")
+                .setCsvRecord("csv record")
+                .build());
+
+    TupleTag<CsvIOParseError> errorTag = new TupleTag<CsvIOParseError>() {};
+    TupleTag<String> anotherTag = new TupleTag<String>() {};
+
+    PCollection<CsvIOParseError> errors = pipeline.apply("createWant", 
Create.of(want));
+    PCollection<String> anotherPCol = pipeline.apply("createAnother", 
Create.of("a", "b", "c"));
+    PCollectionTuple pct = PCollectionTuple.of(errorTag, 
errors).and(anotherTag, anotherPCol);
+    PAssert.that(pct.get(errorTag)).containsInAnyOrder(want);
+
+    pipeline.run();
+  }
+
+  @Test
+  public void canDeriveSchema() {
+    TypeDescriptor<CsvIOParseError> type = 
TypeDescriptor.of(CsvIOParseError.class);
+    Schema schema = SCHEMA_PROVIDER.schemaFor(type);
+    assertNotNull(schema);
+    pipeline.run();
+  }
+}

Reply via email to