[ 
https://issues.apache.org/jira/browse/BEAM-7925?focusedWorklogId=480491&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-480491
 ]

ASF GitHub Bot logged work on BEAM-7925:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Sep/20 23:59
            Start Date: 08/Sep/20 23:59
    Worklog Time Spent: 10m 
      Work Description: ihji commented on a change in pull request #12786:
URL: https://github.com/apache/beam/pull/12786#discussion_r485238785



##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -135,6 +135,16 @@
  * ...
  * }</pre>
  *
+ * <p>Reading with projection can be enabled with the projection schema as 
following. The
+ * projection_schema contains only the column that we would like to read and 
encoder_schema contains
+ * all field but with the unwanted columns changed to nullable.

Review comment:
       Please also mention that `withSplit()` will be enabled automatically.

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -194,6 +204,10 @@ public static ReadFiles readFiles(Schema schema) {
 
     abstract @Nullable Schema getSchema();
 
+    abstract @Nullable Schema getProjection();

Review comment:
       I think `getProjectionSchema` represents the field better.

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -312,12 +362,14 @@ public ReadFiles withSplit() {
     static class SplitReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
       private Class<? extends GenericData> modelClass;
       private static final Logger LOG = 
LoggerFactory.getLogger(SplitReadFn.class);
+      private String requestSchemaString;

Review comment:
       Is there any reason to use `String` instead of `Schema`? Looks like this 
is referred only once.

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -336,36 +388,41 @@ public void processElement(
                 + tracker.currentRestriction().getFrom()
                 + " to "
                 + tracker.currentRestriction().getTo());
-        ParquetReadOptions options = 
HadoopReadOptions.builder(getConfWithModelClass()).build();
-        ParquetFileReader reader =
-            ParquetFileReader.open(new 
BeamParquetInputFile(file.openSeekable()), options);
+        Configuration conf = getConfWithModelClass();
         GenericData model = null;
         if (modelClass != null) {
           model = (GenericData) modelClass.getMethod("get").invoke(null);
         }
-        ReadSupport<GenericRecord> readSupport = new 
AvroReadSupport<GenericRecord>(model);
-
+        AvroReadSupport<GenericRecord> readSupport = new 
AvroReadSupport<GenericRecord>(model);
+        if (requestSchemaString != null) {
+          AvroReadSupport.setRequestedProjection(
+              conf, new Schema.Parser().parse(requestSchemaString));
+        }
+        ParquetReadOptions options = HadoopReadOptions.builder(conf).build();
+        ParquetFileReader reader =
+            ParquetFileReader.open(new 
BeamParquetInputFile(file.openSeekable()), options);
         Filter filter = checkNotNull(options.getRecordFilter(), "filter");
         Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+        for (String property : options.getPropertyNames()) {
+          hadoopConf.set(property, options.getProperty(property));
+        }
         FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
         MessageType fileSchema = parquetFileMetadata.getSchema();
         Map<String, String> fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
-
         ReadSupport.ReadContext readContext =
             readSupport.init(
                 new InitContext(
                     hadoopConf, Maps.transformValues(fileMetadata, 
ImmutableSet::of), fileSchema));
         ColumnIOFactory columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
-        MessageType requestedSchema = readContext.getRequestedSchema();
+
         RecordMaterializer<GenericRecord> recordConverter =
             readSupport.prepareForRead(hadoopConf, fileMetadata, fileSchema, 
readContext);
-        reader.setRequestedSchema(requestedSchema);

Review comment:
       Is this okay to be skipped?

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -209,6 +223,10 @@ public static ReadFiles readFiles(Schema schema) {
 
       abstract Builder setSchema(Schema schema);
 
+      abstract Builder setProjectionEncoder(Schema schema);
+
+      abstract Builder setProjection(Schema schema);

Review comment:
       ditto.

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -279,6 +312,10 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
       abstract Builder setAvroDataModel(GenericData model);
 
+      abstract Builder setProjectionEncoder(Schema schema);
+
+      abstract Builder setProjection(Schema schema);

Review comment:
       ditto.

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -269,6 +298,10 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
     abstract @Nullable GenericData getAvroDataModel();
 
+    abstract @Nullable Schema getProjectionEncoder();
+
+    abstract @Nullable Schema getProjection();

Review comment:
       ditto.

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -299,9 +344,14 @@ public ReadFiles withSplit() {
     public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile> 
input) {
       checkNotNull(getSchema(), "Schema can not be null");
       if (isSplittable()) {
+        if (getProjection() == null) {

Review comment:
       To minimize duplication:
   
   ```
   Schema coderSchema = getProjection() == null ? getSchema() : 
getProjectionEncoder(); 
   return input
                 .apply(ParDo.of(new SplitReadFn(getAvroDataModel(), 
getProjection())))
                 .setCoder(AvroCoder.of(coderSchema));
   ```

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -336,36 +388,41 @@ public void processElement(
                 + tracker.currentRestriction().getFrom()
                 + " to "
                 + tracker.currentRestriction().getTo());
-        ParquetReadOptions options = 
HadoopReadOptions.builder(getConfWithModelClass()).build();
-        ParquetFileReader reader =
-            ParquetFileReader.open(new 
BeamParquetInputFile(file.openSeekable()), options);
+        Configuration conf = getConfWithModelClass();
         GenericData model = null;
         if (modelClass != null) {
           model = (GenericData) modelClass.getMethod("get").invoke(null);
         }
-        ReadSupport<GenericRecord> readSupport = new 
AvroReadSupport<GenericRecord>(model);
-
+        AvroReadSupport<GenericRecord> readSupport = new 
AvroReadSupport<GenericRecord>(model);
+        if (requestSchemaString != null) {
+          AvroReadSupport.setRequestedProjection(
+              conf, new Schema.Parser().parse(requestSchemaString));
+        }
+        ParquetReadOptions options = HadoopReadOptions.builder(conf).build();
+        ParquetFileReader reader =
+            ParquetFileReader.open(new 
BeamParquetInputFile(file.openSeekable()), options);
         Filter filter = checkNotNull(options.getRecordFilter(), "filter");
         Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+        for (String property : options.getPropertyNames()) {
+          hadoopConf.set(property, options.getProperty(property));

Review comment:
       Just out of curiosity: `hadoopConf` is from `options`, is this necessary 
to set properties from `options` again?

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -135,6 +135,16 @@
  * ...
  * }</pre>
  *
+ * <p>Reading with projection can be enabled with the projection schema as 
following. The
+ * projection_schema contains only the column that we would like to read and 
encoder_schema contains
+ * all field but with the unwanted columns changed to nullable.
+ *

Review comment:
       It would be also great if we could mention what is the expected 
improvement by projecting columns such as better memory usage or faster reading 
time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 480491)
    Time Spent: 40m  (was: 0.5h)

> ParquetIO supports neither column projection nor filter predicate
> -----------------------------------------------------------------
>
>                 Key: BEAM-7925
>                 URL: https://issues.apache.org/jira/browse/BEAM-7925
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-parquet
>    Affects Versions: 2.14.0
>            Reporter: Neville Li
>            Priority: P2
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Current {{ParquetIO}} supports neither column projection nor filter predicate 
> which defeats the performance motivation of using Parquet in the first place. 
> That's why we have our own implementation of 
> [ParquetIO|https://github.com/spotify/scio/tree/master/scio-parquet/src] in 
> Scio.
> Reading Parquet as Avro with column projection has some complications, 
> namely, the resulting Avro records may be incomplete and will not survive 
> ser/de. A workaround maybe provide a {{TypedRead}} interface that takes a 
> {{Function<A, B>}} that maps invalid Avro {{A}} into user defined type {{B}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to