This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f72f6ce0e81 Remove CsvIOParseResult (#31819)
f72f6ce0e81 is described below
commit f72f6ce0e813ce1189f723714ec319a5a0431419
Author: Damon <[email protected]>
AuthorDate: Tue Jul 9 17:26:59 2024 -0700
Remove CsvIOParseResult (#31819)
---
.../apache/beam/sdk/io/csv/CsvIOParseResult.java | 86 ----------------------
.../org/apache/beam/sdk/io/csv/CsvIOReadFiles.java | 20 ++---
2 files changed, 7 insertions(+), 99 deletions(-)
diff --git
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseResult.java
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseResult.java
deleted file mode 100644
index 5d4d4c8c02e..00000000000
---
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseResult.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.csv;
-
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-
-/**
- * The {@link T} and {@link org.apache.beam.sdk.io.csv.CsvIOParseError} {@link
PCollection} results
- * of parsing CSV records. Use {@link #getOutput()} and {@link #getErrors()}
to apply these results
- * in a pipeline.
- */
-public class CsvIOParseResult<T> implements POutput {
-
- static <T> CsvIOParseResult<T> of(
- TupleTag<T> outputTag, TupleTag<CsvIOParseError> errorTag,
PCollectionTuple pct) {
- return new CsvIOParseResult<>(outputTag, errorTag, pct);
- }
-
- private final Pipeline pipeline;
- private final TupleTag<T> outputTag;
- private final PCollection<T> output;
- private final TupleTag<CsvIOParseError> errorTag;
- private final PCollection<CsvIOParseError> errors;
-
- private CsvIOParseResult(
- TupleTag<T> outputTag, TupleTag<CsvIOParseError> errorTag,
PCollectionTuple pct) {
- this.outputTag = outputTag;
- this.errorTag = errorTag;
- this.pipeline = pct.getPipeline();
- this.output = pct.get(outputTag);
- this.errors = pct.get(errorTag);
- }
-
- /** The {@link T} {@link PCollection} as a result of successfully parsing
CSV records. */
- public PCollection<T> getOutput() {
- return output;
- }
-
- /**
- * The {@link org.apache.beam.sdk.io.csv.CsvIOParseError} {@link
PCollection} as a result of
- * errors associated with parsing CSV records.
- */
- public PCollection<CsvIOParseError> getErrors() {
- return errors;
- }
-
- @Override
- public Pipeline getPipeline() {
- return pipeline;
- }
-
- @Override
- public Map<TupleTag<?>, PValue> expand() {
- return ImmutableMap.of(
- outputTag, output,
- errorTag, errors);
- }
-
- @Override
- public void finishSpecifyingOutput(
- String transformName, PInput input, PTransform<?, ?> transform) {}
-}
diff --git
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOReadFiles.java
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOReadFiles.java
index 0f6267c6b34..3e0b36b85c2 100644
---
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOReadFiles.java
+++
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOReadFiles.java
@@ -17,20 +17,19 @@
*/
package org.apache.beam.sdk.io.csv;
-import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
/**
* Skeleton for error handling in CsvIO that transforms a {@link
FileIO.ReadableFile} into the
* result of parsing.
*/
// TODO(https://github.com/apache/beam/issues/31736): Plan completion in
future PR after
-// dependencies are completed.
-class CsvIOReadFiles<T> extends PTransform<PCollection<FileIO.ReadableFile>,
CsvIOParseResult<T>> {
+// dependencies are completed.
+class CsvIOReadFiles<T> extends PTransform<PCollection<FileIO.ReadableFile>,
PCollection<T>> {
/** Stores required parameters for parsing. */
private final CsvIOParseConfiguration.Builder configBuilder;
@@ -39,16 +38,11 @@ class CsvIOReadFiles<T> extends
PTransform<PCollection<FileIO.ReadableFile>, Csv
}
/** {@link PTransform} that parses and relays the filename associated with
each error. */
- // TODO: complete expand method to unsure parsing from FileIO.ReadableFile
to CsvIOParseResult.
@Override
- public CsvIOParseResult<T> expand(PCollection<FileIO.ReadableFile> input) {
+ public PCollection<T> expand(PCollection<FileIO.ReadableFile> input) {
// TODO(https://github.com/apache/beam/issues/31736): Needed to prevent
check errors, will
- // remove with future PR.
+ // remove with future PR.
configBuilder.build();
- TupleTag<T> outputTag = new TupleTag<>();
- TupleTag<CsvIOParseError> errorTag = new TupleTag<>();
- Pipeline p = input.getPipeline();
- PCollectionTuple tuple = PCollectionTuple.empty(p);
- return CsvIOParseResult.of(outputTag, errorTag, tuple);
+ return input.apply(ParDo.of(new DoFn<FileIO.ReadableFile, T>() {}));
}
}