[ 
https://issues.apache.org/jira/browse/BEAM-214?focusedWorklogId=103010&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103010
 ]

ASF GitHub Bot logged work on BEAM-214:
---------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/May/18 17:52
            Start Date: 17/May/18 17:52
    Worklog Time Spent: 10m 
      Work Description: jkff commented on a change in pull request #5242: 
[BEAM-214] ParquetIO
URL: https://github.com/apache/beam/pull/5242#discussion_r188731168
 
 

 ##########
 File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
 ##########
 @@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.parquet;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+
+import com.google.auto.value.AutoValue;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.io.SeekableInputStream;
+import org.joda.time.Duration;
+
+/**
+ * IO to read and write Parquet files.
+ *
+ * <h3>Reading Parquet files</h3>
+ *
+ * <p>{@link ParquetIO} source returns a {@link PCollection} for
+ * Parquet files. The elements in the {@link PCollection} are Avro {@link 
GenericRecord}.
+ *
+ * <p>To configure the {@link Read}, you have to provide the file patterns 
(from) of the
+ * Parquet files and the Avro schema.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ *  pipeline.apply(ParquetIO.read().from("/foo/bar").withSchema(schema))
+ *  ...
+ * }
+ * </pre>
+ *
+ * <p>As {@link Read} is based on {@link FileIO}, it supports any filesystem 
(hdfs, ...).
+ *
+ * <h3>Writing Parquet files</h3>
+ *
+ * <p>{@link Write} allows you to write a {@link PCollection} of {@link 
GenericRecord} into a
+ * Parquet file.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ *  pipeline
+ *    .apply(...) // PCollection<GenericRecord>
+ *    .apply(ParquetIO.write().to("/foo/bar").withSchema(schema));
+ * }</pre>
+ */
+public class ParquetIO {
+
+  /**
+   * Reads {@link GenericRecord} from a Parquet file (or multiple Parquet 
files matching
+   * the pattern).
+   */
+  public static Read read() {
+    return new 
AutoValue_ParquetIO_Read.Builder().setHintMatchesManyFiles(false)
+        
.setMatchConfiguration(FileIO.MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+        .build();
+  }
+
+  /**
+   * Like {@link #read()}, but reads each filepattern in the input {@link 
PCollection}.
+   */
+  public static ReadAll readAll() {
+    return new AutoValue_ParquetIO_ReadAll.Builder()
+        .setMatchConfiguration(FileIO.MatchConfiguration
+            .create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+        .build();
+  }
+
+  /**
+   * Like {@link #read()}, but reads each file in a {@link PCollection}
+   * of {@link org.apache.beam.sdk.io.FileIO.ReadableFile},
+   * which allows more flexible usage.
+   */
+  public static ReadFiles readFiles() {
+    return new AutoValue_ParquetIO_ReadFiles.Builder().build();
+  }
+
+  /**
+   * Writes a {@link PCollection} to an Parquet file.
+   */
+  public static Write write() {
+    return new AutoValue_ParquetIO_Write.Builder().build();
+  }
+
+  /**
+   * Implementation of {@link #read()}.
+   */
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, 
PCollection<GenericRecord>> {
+
+    @Nullable abstract ValueProvider<String> filepattern();
+    abstract FileIO.MatchConfiguration matchConfiguration();
+    @Nullable abstract Schema schema();
+    abstract boolean hintMatchesManyFiles();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilepattern(ValueProvider<String> filepattern);
+      abstract Builder setMatchConfiguration(FileIO.MatchConfiguration 
matchConfiguration);
+      abstract Builder setSchema(Schema schema);
+      abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
+
+      abstract Read build();
+    }
+
+    /**
+     * Reads from the given filename or filepattern.
+     *
+     * <p>If it is known that the filepattern will match a very large number 
of files (at least tens
+     * of thousands), use {@link #withHintMatchesManyFiles} for better 
performance and scalability.
+     */
+    public Read from(ValueProvider<String> filepattern) {
+      return builder().setFilepattern(filepattern).build();
+    }
+
+    /** Like {@link #from(ValueProvider)}. */
+    public Read from(String filepattern) {
+      return from(ValueProvider.StaticValueProvider.of(filepattern));
+    }
+
+    /**
+     * Schema of the record in the Parquet file.
+     */
+    public Read withSchema(Schema schema) {
+      return builder().setSchema(schema).build();
+    }
+
+    /** Sets the {@link FileIO.MatchConfiguration}. */
+    public Read withMatchConfiguration(FileIO.MatchConfiguration 
matchConfiguration) {
+      return builder().setMatchConfiguration(matchConfiguration).build();
+    }
+
+    /** Configures whether or not a filepattern matching no files is allowed. 
*/
+    public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return 
withMatchConfiguration(matchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /**
+     * Continuously watches for new files matching the filepattern, polling it 
at the given
+     * interval, until the given termination condition is reached. The 
returned {@link PCollection}
+     * is unbounded.
+     *
+     * <p>This works only in runners supporting {@link 
Experimental.Kind#SPLITTABLE_DO_FN}.
+     */
+    @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+    public Read watchForNewFiles(
+        Duration pollInterval, Watch.Growth.TerminationCondition<String, ?> 
terminationCondition) {
+      return withMatchConfiguration(
+          matchConfiguration().continuously(pollInterval, 
terminationCondition));
+    }
+
+    /**
+     * Hints that the filepattern specified in {@link #from(String)} matches a 
very large number of
+     * files.
+     *
+     * <p>This hint may cause a runner to execute the transform differently, 
in a way that improves
+     * performance for this case, but it may worsen performance if the 
filepattern matches only a
+     * small number of files (e.g., in a runner that supports dynamic work 
rebalancing, it will
+     * happen less efficiently within individual files).
+     */
+    public Read withHintMatchesManyFiles() {
+      return builder().setHintMatchesManyFiles(true).build();
+    }
+
+    @Override
+    public PCollection<GenericRecord> expand(PBegin input) {
+      checkNotNull(filepattern(), "filepattern");
+      checkNotNull(schema(), "schema");
+
+      ReadAll readAll = 
readAll().withMatchConfiguration(matchConfiguration()).withSchema(schema());
+      return input
+          .apply("Create filepattern", Create.ofProvider(filepattern(),
+              StringUtf8Coder.of()))
+          .apply("Via ReadAll", readAll);
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(
+              DisplayData.item("filePattern", filepattern()).withLabel("Input 
File Pattern"))
+          .include("matchConfiguration", matchConfiguration());
+    }
+  }
+
+  /**
+   * Implementation of {@link #readAll()}.
+   */
+  @AutoValue
+  public abstract static class ReadAll extends PTransform<PCollection<String>,
+      PCollection<GenericRecord>> {
+
+    abstract FileIO.MatchConfiguration getMatchConfiguration();
+    @Nullable abstract Schema getSchema();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setMatchConfiguration(FileIO.MatchConfiguration 
matchConfiguration);
+      abstract Builder setSchema(Schema schema);
+
+      abstract ReadAll build();
+    }
+
+    /**
+     * Sets the {@link org.apache.beam.sdk.io.FileIO.MatchConfiguration}.
+     */
+    public ReadAll withMatchConfiguration(FileIO.MatchConfiguration 
configuration) {
+      return builder().setMatchConfiguration(configuration).build();
+    }
+
+    /**
+     * Sets the {@link EmptyMatchTreatment}.
+     */
+    public ReadAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return 
withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+    }
+
+    /**
+     * Sets the schema of the records.
+     */
+    public ReadAll withSchema(Schema schema) {
+      return builder().setSchema(schema).build();
+    }
+
+    @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+    public ReadAll watchForNewFiles(
+        Duration pollInterval, Watch.Growth.TerminationCondition<String, ?> 
terminationCondition) {
+      return withMatchConfiguration(
+          getMatchConfiguration().continuously(pollInterval, 
terminationCondition));
+    }
+
+    @Override
+    public PCollection<GenericRecord> expand(PCollection<String> input) {
+      checkNotNull(getSchema(), "schema");
+      return input
+          .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+          .apply(FileIO.readMatches())
+          .apply(readFiles().withSchema(getSchema()));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder.include("matchConfiguration", getMatchConfiguration());
+    }
+
+  }
+
+  /**
+   * Implementation of {@link #readFiles()}.
+   */
+  @AutoValue
+  public abstract static class ReadFiles extends 
PTransform<PCollection<FileIO.ReadableFile>,
+      PCollection<GenericRecord>> {
+
+    @Nullable abstract Schema schema();
+
+    abstract Builder builder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSchema(Schema schema);
+      abstract ReadFiles build();
+    }
+
+    /**
+     * Define the Avro schema of the record to read from the Parquet file.
+     */
+    public ReadFiles withSchema(Schema schema) {
+      checkArgument(schema != null,
+          "schema can not be null");
+      return builder().setSchema(schema).build();
+    }
+
+    @Override
+    public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile> 
input) {
+      return input
+          .apply(ParDo.of(new ReadFn()))
+          .setCoder(AvroCoder.of(schema()));
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.add(DisplayData.item("schema", schema().toString()));
+    }
+
+    static class ReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
+
+      @ProcessElement
+      public void processElement(ProcessContext processContext) throws 
Exception {
 
 Review comment:
   Oh I didn't realize it was needed to set the coder. Then we do need the 
schema - per 
https://beam.apache.org/contribute/ptransform-style-guide/#setting-coders-on-output-collections
 it's responsibility of the transform to ensure that its returned collections 
have a coder set.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 103010)
    Time Spent: 10.5h  (was: 10h 20m)

> Create Parquet IO
> -----------------
>
>                 Key: BEAM-214
>                 URL: https://issues.apache.org/jira/browse/BEAM-214
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-ideas
>            Reporter: Neville Li
>            Assignee: Jean-Baptiste Onofré
>            Priority: Minor
>          Time Spent: 10.5h
>  Remaining Estimate: 0h
>
> Would be nice to support Parquet files with projection and predicates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to