This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f72f6ce0e81 Remove CsvIOParseResult (#31819)
f72f6ce0e81 is described below

commit f72f6ce0e813ce1189f723714ec319a5a0431419
Author: Damon <[email protected]>
AuthorDate: Tue Jul 9 17:26:59 2024 -0700

    Remove CsvIOParseResult (#31819)
---
 .../apache/beam/sdk/io/csv/CsvIOParseResult.java   | 86 ----------------------
 .../org/apache/beam/sdk/io/csv/CsvIOReadFiles.java | 20 ++---
 2 files changed, 7 insertions(+), 99 deletions(-)

diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseResult.java
 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseResult.java
deleted file mode 100644
index 5d4d4c8c02e..00000000000
--- 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOParseResult.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.csv;
-
-import java.util.Map;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-
-/**
- * The {@link T} and {@link org.apache.beam.sdk.io.csv.CsvIOParseError} {@link 
PCollection} results
- * of parsing CSV records. Use {@link #getOutput()} and {@link #getErrors()} 
to apply these results
- * in a pipeline.
- */
-public class CsvIOParseResult<T> implements POutput {
-
-  static <T> CsvIOParseResult<T> of(
-      TupleTag<T> outputTag, TupleTag<CsvIOParseError> errorTag, 
PCollectionTuple pct) {
-    return new CsvIOParseResult<>(outputTag, errorTag, pct);
-  }
-
-  private final Pipeline pipeline;
-  private final TupleTag<T> outputTag;
-  private final PCollection<T> output;
-  private final TupleTag<CsvIOParseError> errorTag;
-  private final PCollection<CsvIOParseError> errors;
-
-  private CsvIOParseResult(
-      TupleTag<T> outputTag, TupleTag<CsvIOParseError> errorTag, 
PCollectionTuple pct) {
-    this.outputTag = outputTag;
-    this.errorTag = errorTag;
-    this.pipeline = pct.getPipeline();
-    this.output = pct.get(outputTag);
-    this.errors = pct.get(errorTag);
-  }
-
-  /** The {@link T} {@link PCollection} as a result of successfully parsing 
CSV records. */
-  public PCollection<T> getOutput() {
-    return output;
-  }
-
-  /**
-   * The {@link org.apache.beam.sdk.io.csv.CsvIOParseError} {@link 
PCollection} as a result of
-   * errors associated with parsing CSV records.
-   */
-  public PCollection<CsvIOParseError> getErrors() {
-    return errors;
-  }
-
-  @Override
-  public Pipeline getPipeline() {
-    return pipeline;
-  }
-
-  @Override
-  public Map<TupleTag<?>, PValue> expand() {
-    return ImmutableMap.of(
-        outputTag, output,
-        errorTag, errors);
-  }
-
-  @Override
-  public void finishSpecifyingOutput(
-      String transformName, PInput input, PTransform<?, ?> transform) {}
-}
diff --git 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOReadFiles.java 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOReadFiles.java
index 0f6267c6b34..3e0b36b85c2 100644
--- 
a/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOReadFiles.java
+++ 
b/sdks/java/io/csv/src/main/java/org/apache/beam/sdk/io/csv/CsvIOReadFiles.java
@@ -17,20 +17,19 @@
  */
 package org.apache.beam.sdk.io.csv;
 
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Skeleton for error handling in CsvIO that transforms a {@link 
FileIO.ReadableFile} into the
  * result of parsing.
  */
 // TODO(https://github.com/apache/beam/issues/31736): Plan completion in 
future PR after
-// dependencies are completed.
-class CsvIOReadFiles<T> extends PTransform<PCollection<FileIO.ReadableFile>, 
CsvIOParseResult<T>> {
+//  dependencies are completed.
+class CsvIOReadFiles<T> extends PTransform<PCollection<FileIO.ReadableFile>, 
PCollection<T>> {
   /** Stores required parameters for parsing. */
   private final CsvIOParseConfiguration.Builder configBuilder;
 
@@ -39,16 +38,11 @@ class CsvIOReadFiles<T> extends 
PTransform<PCollection<FileIO.ReadableFile>, Csv
   }
 
   /** {@link PTransform} that parses and relays the filename associated with 
each error. */
-  // TODO: complete expand method to unsure parsing from FileIO.ReadableFile 
to CsvIOParseResult.
   @Override
-  public CsvIOParseResult<T> expand(PCollection<FileIO.ReadableFile> input) {
+  public PCollection<T> expand(PCollection<FileIO.ReadableFile> input) {
     // TODO(https://github.com/apache/beam/issues/31736): Needed to prevent 
check errors, will
-    // remove with future PR.
+    //  remove with future PR.
     configBuilder.build();
-    TupleTag<T> outputTag = new TupleTag<>();
-    TupleTag<CsvIOParseError> errorTag = new TupleTag<>();
-    Pipeline p = input.getPipeline();
-    PCollectionTuple tuple = PCollectionTuple.empty(p);
-    return CsvIOParseResult.of(outputTag, errorTag, tuple);
+    return input.apply(ParDo.of(new DoFn<FileIO.ReadableFile, T>() {}));
   }
 }

Reply via email to