[ https://issues.apache.org/jira/browse/BEAM-214?focusedWorklogId=103010&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-103010 ]
ASF GitHub Bot logged work on BEAM-214: --------------------------------------- Author: ASF GitHub Bot Created on: 17/May/18 17:52 Start Date: 17/May/18 17:52 Worklog Time Spent: 10m Work Description: jkff commented on a change in pull request #5242: [BEAM-214] ParquetIO URL: https://github.com/apache/beam/pull/5242#discussion_r188731168 ########## File path: sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java ########## @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.parquet; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; + +import com.google.auto.value.AutoValue; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.SeekableByteChannel; +import java.nio.channels.WritableByteChannel; +import javax.annotation.Nullable; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.options.ValueProvider; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Watch; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.io.DelegatingSeekableInputStream; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; +import org.apache.parquet.io.SeekableInputStream; +import org.joda.time.Duration; + +/** + * IO to read and write Parquet files. + * + * <h3>Reading Parquet files</h3> + * + * <p>{@link ParquetIO} source returns a {@link PCollection} for + * Parquet files. The elements in the {@link PCollection} are Avro {@link GenericRecord}. + * + * <p>To configure the {@link Read}, you have to provide the file patterns (from) of the + * Parquet files and the Avro schema. + * + * <p>For example: + * + * <pre>{@code + * pipeline.apply(ParquetIO.read().from("/foo/bar").withSchema(schema)) + * ... + * } + * </pre> + * + * <p>As {@link Read} is based on {@link FileIO}, it supports any filesystem (hdfs, ...). + * + * <h3>Writing Parquet files</h3> + * + * <p>{@link Write} allows you to write a {@link PCollection} of {@link GenericRecord} into a + * Parquet file. + * + * <p>For example: + * + * <pre>{@code + * pipeline + * .apply(...) // PCollection<GenericRecord> + * .apply(ParquetIO.write().to("/foo/bar").withSchema(schema)); + * }</pre> + */ +public class ParquetIO { + + /** + * Reads {@link GenericRecord} from a Parquet file (or multiple Parquet files matching + * the pattern). + */ + public static Read read() { + return new AutoValue_ParquetIO_Read.Builder().setHintMatchesManyFiles(false) + .setMatchConfiguration(FileIO.MatchConfiguration.create(EmptyMatchTreatment.DISALLOW)) + .build(); + } + + /** + * Like {@link #read()}, but reads each filepattern in the input {@link PCollection}. + */ + public static ReadAll readAll() { + return new AutoValue_ParquetIO_ReadAll.Builder() + .setMatchConfiguration(FileIO.MatchConfiguration + .create(EmptyMatchTreatment.ALLOW_IF_WILDCARD)) + .build(); + } + + /** + * Like {@link #read()}, but reads each file in a {@link PCollection} + * of {@link org.apache.beam.sdk.io.FileIO.ReadableFile}, + * which allows more flexible usage. + */ + public static ReadFiles readFiles() { + return new AutoValue_ParquetIO_ReadFiles.Builder().build(); + } + + /** + * Writes a {@link PCollection} to an Parquet file. + */ + public static Write write() { + return new AutoValue_ParquetIO_Write.Builder().build(); + } + + /** + * Implementation of {@link #read()}. + */ + @AutoValue + public abstract static class Read extends PTransform<PBegin, PCollection<GenericRecord>> { + + @Nullable abstract ValueProvider<String> filepattern(); + abstract FileIO.MatchConfiguration matchConfiguration(); + @Nullable abstract Schema schema(); + abstract boolean hintMatchesManyFiles(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setFilepattern(ValueProvider<String> filepattern); + abstract Builder setMatchConfiguration(FileIO.MatchConfiguration matchConfiguration); + abstract Builder setSchema(Schema schema); + abstract Builder setHintMatchesManyFiles(boolean hintManyFiles); + + abstract Read build(); + } + + /** + * Reads from the given filename or filepattern. + * + * <p>If it is known that the filepattern will match a very large number of files (at least tens + * of thousands), use {@link #withHintMatchesManyFiles} for better performance and scalability. + */ + public Read from(ValueProvider<String> filepattern) { + return builder().setFilepattern(filepattern).build(); + } + + /** Like {@link #from(ValueProvider)}. */ + public Read from(String filepattern) { + return from(ValueProvider.StaticValueProvider.of(filepattern)); + } + + /** + * Schema of the record in the Parquet file. + */ + public Read withSchema(Schema schema) { + return builder().setSchema(schema).build(); + } + + /** Sets the {@link FileIO.MatchConfiguration}. */ + public Read withMatchConfiguration(FileIO.MatchConfiguration matchConfiguration) { + return builder().setMatchConfiguration(matchConfiguration).build(); + } + + /** Configures whether or not a filepattern matching no files is allowed. */ + public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) { + return withMatchConfiguration(matchConfiguration().withEmptyMatchTreatment(treatment)); + } + + /** + * Continuously watches for new files matching the filepattern, polling it at the given + * interval, until the given termination condition is reached. The returned {@link PCollection} + * is unbounded. + * + * <p>This works only in runners supporting {@link Experimental.Kind#SPLITTABLE_DO_FN}. + */ + @Experimental(Experimental.Kind.SPLITTABLE_DO_FN) + public Read watchForNewFiles( + Duration pollInterval, Watch.Growth.TerminationCondition<String, ?> terminationCondition) { + return withMatchConfiguration( + matchConfiguration().continuously(pollInterval, terminationCondition)); + } + + /** + * Hints that the filepattern specified in {@link #from(String)} matches a very large number of + * files. + * + * <p>This hint may cause a runner to execute the transform differently, in a way that improves + * performance for this case, but it may worsen performance if the filepattern matches only a + * small number of files (e.g., in a runner that supports dynamic work rebalancing, it will + * happen less efficiently within individual files). + */ + public Read withHintMatchesManyFiles() { + return builder().setHintMatchesManyFiles(true).build(); + } + + @Override + public PCollection<GenericRecord> expand(PBegin input) { + checkNotNull(filepattern(), "filepattern"); + checkNotNull(schema(), "schema"); + + ReadAll readAll = readAll().withMatchConfiguration(matchConfiguration()).withSchema(schema()); + return input + .apply("Create filepattern", Create.ofProvider(filepattern(), + StringUtf8Coder.of())) + .apply("Via ReadAll", readAll); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotNull( + DisplayData.item("filePattern", filepattern()).withLabel("Input File Pattern")) + .include("matchConfiguration", matchConfiguration()); + } + } + + /** + * Implementation of {@link #readAll()}. + */ + @AutoValue + public abstract static class ReadAll extends PTransform<PCollection<String>, + PCollection<GenericRecord>> { + + abstract FileIO.MatchConfiguration getMatchConfiguration(); + @Nullable abstract Schema getSchema(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setMatchConfiguration(FileIO.MatchConfiguration matchConfiguration); + abstract Builder setSchema(Schema schema); + + abstract ReadAll build(); + } + + /** + * Sets the {@link org.apache.beam.sdk.io.FileIO.MatchConfiguration}. + */ + public ReadAll withMatchConfiguration(FileIO.MatchConfiguration configuration) { + return builder().setMatchConfiguration(configuration).build(); + } + + /** + * Sets the {@link EmptyMatchTreatment}. + */ + public ReadAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) { + return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment)); + } + + /** + * Sets the schema of the records. + */ + public ReadAll withSchema(Schema schema) { + return builder().setSchema(schema).build(); + } + + @Experimental(Experimental.Kind.SPLITTABLE_DO_FN) + public ReadAll watchForNewFiles( + Duration pollInterval, Watch.Growth.TerminationCondition<String, ?> terminationCondition) { + return withMatchConfiguration( + getMatchConfiguration().continuously(pollInterval, terminationCondition)); + } + + @Override + public PCollection<GenericRecord> expand(PCollection<String> input) { + checkNotNull(getSchema(), "schema"); + return input + .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration())) + .apply(FileIO.readMatches()) + .apply(readFiles().withSchema(getSchema())); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.include("matchConfiguration", getMatchConfiguration()); + } + + } + + /** + * Implementation of {@link #readFiles()}. + */ + @AutoValue + public abstract static class ReadFiles extends PTransform<PCollection<FileIO.ReadableFile>, + PCollection<GenericRecord>> { + + @Nullable abstract Schema schema(); + + abstract Builder builder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setSchema(Schema schema); + abstract ReadFiles build(); + } + + /** + * Define the Avro schema of the record to read from the Parquet file. + */ + public ReadFiles withSchema(Schema schema) { + checkArgument(schema != null, + "schema can not be null"); + return builder().setSchema(schema).build(); + } + + @Override + public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile> input) { + return input + .apply(ParDo.of(new ReadFn())) + .setCoder(AvroCoder.of(schema())); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add(DisplayData.item("schema", schema().toString())); + } + + static class ReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> { + + @ProcessElement + public void processElement(ProcessContext processContext) throws Exception { Review comment: Oh I didn't realize it was needed to set the coder. Then we do need the schema - per https://beam.apache.org/contribute/ptransform-style-guide/#setting-coders-on-output-collections it's responsibility of the transform to ensure that its returned collections have a coder set. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 103010) Time Spent: 10.5h (was: 10h 20m) > Create Parquet IO > ----------------- > > Key: BEAM-214 > URL: https://issues.apache.org/jira/browse/BEAM-214 > Project: Beam > Issue Type: Improvement > Components: io-ideas > Reporter: Neville Li > Assignee: Jean-Baptiste Onofré > Priority: Minor > Time Spent: 10.5h > Remaining Estimate: 0h > > Would be nice to support Parquet files with projection and predicates. -- This message was sent by Atlassian JIRA (v7.6.3#76005)