[
https://issues.apache.org/jira/browse/BEAM-214?focusedWorklogId=104570&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-104570
]
ASF GitHub Bot logged work on BEAM-214:
---------------------------------------
Author: ASF GitHub Bot
Created on: 22/May/18 09:57
Start Date: 22/May/18 09:57
Worklog Time Spent: 10m
Work Description: lgajowy commented on a change in pull request #5242:
[BEAM-214] ParquetIO
URL: https://github.com/apache/beam/pull/5242#discussion_r189843180
##########
File path:
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.parquet;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+
+import com.google.auto.value.AutoValue;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileIO;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.io.SeekableInputStream;
+import org.joda.time.Duration;
+
+/**
+ * IO to read and write Parquet files.
+ *
+ * <h3>Reading Parquet files</h3>
+ *
+ * <p>{@link ParquetIO} source returns a {@link PCollection} for
+ * Parquet files. The elements in the {@link PCollection} are Avro {@link
GenericRecord}.
+ *
+ * <p>To configure the {@link Read}, you have to provide the file patterns
(from) of the
+ * Parquet files and the Avro schema.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline.apply(ParquetIO.read().from("/foo/bar").withSchema(schema))
+ * ...
+ * }
+ * </pre>
+ *
+ * <p>As {@link Read} is based on {@link FileIO}, it supports any filesystem
(hdfs, ...).
+ *
+ * <h3>Writing Parquet files</h3>
+ *
+ * <p>{@link Write} allows you to write a {@link PCollection} of {@link
GenericRecord} into a
+ * Parquet file.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(...) // PCollection<GenericRecord>
+ * .apply(ParquetIO.write().to("/foo/bar").withSchema(schema));
+ * }</pre>
+ */
+public class ParquetIO {
+
+ /**
+ * Reads {@link GenericRecord} from a Parquet file (or multiple Parquet
files matching
+ * the pattern).
+ */
+ public static Read read() {
+ return new
AutoValue_ParquetIO_Read.Builder().setHintMatchesManyFiles(false)
+
.setMatchConfiguration(FileIO.MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
+ .build();
+ }
+
+ /**
+ * Like {@link #read()}, but reads each filepattern in the input {@link
PCollection}.
+ */
+ public static ReadAll readAll() {
+ return new AutoValue_ParquetIO_ReadAll.Builder()
+ .setMatchConfiguration(FileIO.MatchConfiguration
+ .create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
+ .build();
+ }
+
+ /**
+ * Like {@link #read()}, but reads each file in a {@link PCollection}
+ * of {@link org.apache.beam.sdk.io.FileIO.ReadableFile},
+ * which allows more flexible usage.
+ */
+ public static ReadFiles readFiles() {
+ return new AutoValue_ParquetIO_ReadFiles.Builder().build();
+ }
+
+ /**
+ * Writes a {@link PCollection} to an Parquet file.
+ */
+ public static Write write() {
+ return new AutoValue_ParquetIO_Write.Builder().build();
+ }
+
+ /**
+ * Implementation of {@link #read()}.
+ */
+ @AutoValue
+ public abstract static class Read extends PTransform<PBegin,
PCollection<GenericRecord>> {
+
+ @Nullable abstract ValueProvider<String> filepattern();
+ abstract FileIO.MatchConfiguration matchConfiguration();
+ @Nullable abstract Schema schema();
+ abstract boolean hintMatchesManyFiles();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setFilepattern(ValueProvider<String> filepattern);
+ abstract Builder setMatchConfiguration(FileIO.MatchConfiguration
matchConfiguration);
+ abstract Builder setSchema(Schema schema);
+ abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
+
+ abstract Read build();
+ }
+
+ /**
+ * Reads from the given filename or filepattern.
+ *
+ * <p>If it is known that the filepattern will match a very large number
of files (at least tens
+ * of thousands), use {@link #withHintMatchesManyFiles} for better
performance and scalability.
+ */
+ public Read from(ValueProvider<String> filepattern) {
+ return builder().setFilepattern(filepattern).build();
+ }
+
+ /** Like {@link #from(ValueProvider)}. */
+ public Read from(String filepattern) {
+ return from(ValueProvider.StaticValueProvider.of(filepattern));
+ }
+
+ /**
+ * Schema of the record in the Parquet file.
+ */
+ public Read withSchema(Schema schema) {
+ return builder().setSchema(schema).build();
+ }
+
+ /** Sets the {@link FileIO.MatchConfiguration}. */
+ public Read withMatchConfiguration(FileIO.MatchConfiguration
matchConfiguration) {
+ return builder().setMatchConfiguration(matchConfiguration).build();
+ }
+
+ /** Configures whether or not a filepattern matching no files is allowed.
*/
+ public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+ return
withMatchConfiguration(matchConfiguration().withEmptyMatchTreatment(treatment));
+ }
+
+ /**
+ * Continuously watches for new files matching the filepattern, polling it
at the given
+ * interval, until the given termination condition is reached. The
returned {@link PCollection}
+ * is unbounded.
+ *
+ * <p>This works only in runners supporting {@link
Experimental.Kind#SPLITTABLE_DO_FN}.
+ */
+ @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+ public Read watchForNewFiles(
+ Duration pollInterval, Watch.Growth.TerminationCondition<String, ?>
terminationCondition) {
+ return withMatchConfiguration(
+ matchConfiguration().continuously(pollInterval,
terminationCondition));
+ }
+
+ /**
+ * Hints that the filepattern specified in {@link #from(String)} matches a
very large number of
+ * files.
+ *
+ * <p>This hint may cause a runner to execute the transform differently,
in a way that improves
+ * performance for this case, but it may worsen performance if the
filepattern matches only a
+ * small number of files (e.g., in a runner that supports dynamic work
rebalancing, it will
+ * happen less efficiently within individual files).
+ */
+ public Read withHintMatchesManyFiles() {
+ return builder().setHintMatchesManyFiles(true).build();
+ }
+
+ @Override
+ public PCollection<GenericRecord> expand(PBegin input) {
+ checkNotNull(filepattern(), "filepattern");
+ checkNotNull(schema(), "schema");
+
+ ReadAll readAll =
readAll().withMatchConfiguration(matchConfiguration()).withSchema(schema());
+ return input
+ .apply("Create filepattern", Create.ofProvider(filepattern(),
+ StringUtf8Coder.of()))
+ .apply("Via ReadAll", readAll);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull(
+ DisplayData.item("filePattern", filepattern()).withLabel("Input
File Pattern"))
+ .include("matchConfiguration", matchConfiguration());
+ }
+ }
+
+ /**
+ * Implementation of {@link #readAll()}.
+ */
+ @AutoValue
+ public abstract static class ReadAll extends PTransform<PCollection<String>,
+ PCollection<GenericRecord>> {
+
+ abstract FileIO.MatchConfiguration getMatchConfiguration();
+ @Nullable abstract Schema getSchema();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setMatchConfiguration(FileIO.MatchConfiguration
matchConfiguration);
+ abstract Builder setSchema(Schema schema);
+
+ abstract ReadAll build();
+ }
+
+ /**
+ * Sets the {@link org.apache.beam.sdk.io.FileIO.MatchConfiguration}.
+ */
+ public ReadAll withMatchConfiguration(FileIO.MatchConfiguration
configuration) {
+ return builder().setMatchConfiguration(configuration).build();
+ }
+
+ /**
+ * Sets the {@link EmptyMatchTreatment}.
+ */
+ public ReadAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+ return
withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
+ }
+
+ /**
+ * Sets the schema of the records.
+ */
+ public ReadAll withSchema(Schema schema) {
+ return builder().setSchema(schema).build();
+ }
+
+ @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+ public ReadAll watchForNewFiles(
+ Duration pollInterval, Watch.Growth.TerminationCondition<String, ?>
terminationCondition) {
+ return withMatchConfiguration(
+ getMatchConfiguration().continuously(pollInterval,
terminationCondition));
+ }
+
+ @Override
+ public PCollection<GenericRecord> expand(PCollection<String> input) {
+ checkNotNull(getSchema(), "schema");
+ return input
+ .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
+ .apply(FileIO.readMatches())
+ .apply(readFiles().withSchema(getSchema()));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.include("matchConfiguration", getMatchConfiguration());
+ }
+
+ }
+
+ /**
+ * Implementation of {@link #readFiles()}.
+ */
+ @AutoValue
+ public abstract static class ReadFiles extends
PTransform<PCollection<FileIO.ReadableFile>,
+ PCollection<GenericRecord>> {
+
+ @Nullable abstract Schema schema();
+
+ abstract Builder builder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setSchema(Schema schema);
+ abstract ReadFiles build();
+ }
+
+ /**
+ * Define the Avro schema of the record to read from the Parquet file.
+ */
+ public ReadFiles withSchema(Schema schema) {
+ checkArgument(schema != null,
+ "schema can not be null");
+ return builder().setSchema(schema).build();
+ }
+
+ @Override
+ public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile>
input) {
+ return input
+ .apply(ParDo.of(new ReadFn()))
+ .setCoder(AvroCoder.of(schema()));
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add(DisplayData.item("schema", schema().toString()));
+ }
+
+ static class ReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
+
+ @ProcessElement
+ public void processElement(ProcessContext processContext) throws
Exception {
Review comment:
ok, I reverted the change. thanks for the link! However, I've put the Schema
as an argument of the factory methods, because it seems "overwhelmingly most
important" one (as the guide stands). it is done the same way it is done in
AvroIO for generic records. Is it ok?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 104570)
Time Spent: 11.5h (was: 11h 20m)
> Create Parquet IO
> -----------------
>
> Key: BEAM-214
> URL: https://issues.apache.org/jira/browse/BEAM-214
> Project: Beam
> Issue Type: Improvement
> Components: io-ideas
> Reporter: Neville Li
> Assignee: Jean-Baptiste Onofré
> Priority: Minor
> Time Spent: 11.5h
> Remaining Estimate: 0h
>
> Would be nice to support Parquet files with projection and predicates.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)