This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bb296e46c06 Create CsvIOParseError data class (#31700)
bb296e46c06 is described below
commit bb296e46c06082098bc8cb0757356002d0fc8bee
Author: Damon <[email protected]>
AuthorDate: Thu Jun 27 12:04:16 2024 -0700
Create CsvIOParseError data class (#31700)
---
sdks/java/io/csv/build.gradle | 1 +
.../apache/beam/sdk/io/csv/CsvIOParseError.java | 75 +++++++++++++++
.../beam/sdk/io/csv/CsvIOParseErrorTest.java | 101 +++++++++++++++++++++
3 files changed, 177 insertions(+)
diff --git a/sdks/java/io/csv/build.gradle b/sdks/java/io/csv/build.gradle
index 2be8f59d1f3..92c66ff0140 100644
--- a/sdks/java/io/csv/build.gradle
+++ b/sdks/java/io/csv/build.gradle
@@ -28,6 +28,7 @@ dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.commons_csv
implementation library.java.vendored_guava_32_1_2_jre
+ implementation library.java.joda_time
testImplementation project(path: ":sdks:java:core", configuration:
"shadowTest")
testImplementation library.java.junit
testRuntimeOnly project(path: ":runners:direct-java", configuration:
"shadow")
diff --git
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java
new file mode 100644
index 00000000000..ad7d05912fa
--- /dev/null
+++
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseError.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * {@link CsvIOParseError} is a data class to store errors from CSV record
processing. It is {@link
+ * org.apache.beam.sdk.schemas.Schema} mapped for compatibility with writing
to Beam Schema-aware
+ * I/O connectors.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class CsvIOParseError {
+
+ static Builder builder() {
+ return new AutoValue_CsvIOParseError.Builder();
+ }
+
+ /** The caught {@link Exception#getMessage()}. */
+ public abstract String getMessage();
+
+ /**
+ * The CSV record associated with the caught {@link Exception}. Annotated
{@link Nullable} as not
+ * all processing errors are associated with a CSV record.
+ */
+ public abstract @Nullable String getCsvRecord();
+
+ /**
+ * The filename associated with the caught {@link Exception}. Annotated
{@link Nullable} as not
+ * all processing errors are associated with a file.
+ */
+ public abstract @Nullable String getFilename();
+
+ /** The date and time when the {@link Exception} occurred. */
+ public abstract Instant getObservedTimestamp();
+
+ /** The caught {@link Exception#getStackTrace()}. */
+ public abstract String getStackTrace();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+
+ abstract Builder setMessage(String message);
+
+ abstract Builder setCsvRecord(String csvRecord);
+
+ abstract Builder setFilename(String filename);
+
+ abstract Builder setObservedTimestamp(Instant observedTimestamp);
+
+ abstract Builder setStackTrace(String stackTrace);
+
+ public abstract CsvIOParseError build();
+ }
+}
diff --git
a/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseErrorTest.java
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseErrorTest.java
new file mode 100644
index 00000000000..8e746c00605
--- /dev/null
+++
b/sdks/java/io/csv/src/test/java/org/apache/beam/sdk/io/csv/CsvIOParseErrorTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.csv;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaProvider;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class CsvIOParseErrorTest {
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+
+ private static final SchemaProvider SCHEMA_PROVIDER = new
DefaultSchema.DefaultSchemaProvider();
+
+ @Test
+ public void usableInSingleOutput() {
+ List<CsvIOParseError> want =
+ Arrays.asList(
+ CsvIOParseError.builder()
+ .setMessage("error message")
+ .setObservedTimestamp(Instant.now())
+ .setStackTrace("stack trace")
+ .build(),
+ CsvIOParseError.builder()
+ .setMessage("error message")
+ .setObservedTimestamp(Instant.now())
+ .setStackTrace("stack trace")
+ .setFilename("filename")
+ .setCsvRecord("csv record")
+ .build());
+
+ PCollection<CsvIOParseError> errors = pipeline.apply(Create.of(want));
+ PAssert.that(errors).containsInAnyOrder(want);
+
+ pipeline.run();
+ }
+
+ @Test
+ public void usableInMultiOutput() {
+ List<CsvIOParseError> want =
+ Arrays.asList(
+ CsvIOParseError.builder()
+ .setMessage("error message")
+ .setObservedTimestamp(Instant.now())
+ .setStackTrace("stack trace")
+ .build(),
+ CsvIOParseError.builder()
+ .setMessage("error message")
+ .setObservedTimestamp(Instant.now())
+ .setStackTrace("stack trace")
+ .setFilename("filename")
+ .setCsvRecord("csv record")
+ .build());
+
+ TupleTag<CsvIOParseError> errorTag = new TupleTag<CsvIOParseError>() {};
+ TupleTag<String> anotherTag = new TupleTag<String>() {};
+
+ PCollection<CsvIOParseError> errors = pipeline.apply("createWant",
Create.of(want));
+ PCollection<String> anotherPCol = pipeline.apply("createAnother",
Create.of("a", "b", "c"));
+ PCollectionTuple pct = PCollectionTuple.of(errorTag,
errors).and(anotherTag, anotherPCol);
+ PAssert.that(pct.get(errorTag)).containsInAnyOrder(want);
+
+ pipeline.run();
+ }
+
+ @Test
+ public void canDeriveSchema() {
+ TypeDescriptor<CsvIOParseError> type =
TypeDescriptor.of(CsvIOParseError.class);
+ Schema schema = SCHEMA_PROVIDER.schemaFor(type);
+ assertNotNull(schema);
+ pipeline.run();
+ }
+}