ihji commented on a change in pull request #12786:
URL: https://github.com/apache/beam/pull/12786#discussion_r485238785



##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -135,6 +135,16 @@
  * ...
  * }</pre>
  *
+ * <p>Reading with projection can be enabled with the projection schema as 
following. The
+ * projection_schema contains only the column that we would like to read and 
encoder_schema contains
+ * all field but with the unwanted columns changed to nullable.

Review comment:
       Please also mention that `withSplit()` will be enabled automatically.

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -194,6 +204,10 @@ public static ReadFiles readFiles(Schema schema) {
 
     abstract @Nullable Schema getSchema();
 
+    abstract @Nullable Schema getProjection();

Review comment:
       I think `getProjectionSchema` represents the field better.

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -312,12 +362,14 @@ public ReadFiles withSplit() {
     static class SplitReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
       private Class<? extends GenericData> modelClass;
       private static final Logger LOG = 
LoggerFactory.getLogger(SplitReadFn.class);
+      private String requestSchemaString;

Review comment:
       Is there any reason to use `String` instead of `Schema`? Looks like this 
is referred only once.

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -336,36 +388,41 @@ public void processElement(
                 + tracker.currentRestriction().getFrom()
                 + " to "
                 + tracker.currentRestriction().getTo());
-        ParquetReadOptions options = 
HadoopReadOptions.builder(getConfWithModelClass()).build();
-        ParquetFileReader reader =
-            ParquetFileReader.open(new 
BeamParquetInputFile(file.openSeekable()), options);
+        Configuration conf = getConfWithModelClass();
         GenericData model = null;
         if (modelClass != null) {
           model = (GenericData) modelClass.getMethod("get").invoke(null);
         }
-        ReadSupport<GenericRecord> readSupport = new 
AvroReadSupport<GenericRecord>(model);
-
+        AvroReadSupport<GenericRecord> readSupport = new 
AvroReadSupport<GenericRecord>(model);
+        if (requestSchemaString != null) {
+          AvroReadSupport.setRequestedProjection(
+              conf, new Schema.Parser().parse(requestSchemaString));
+        }
+        ParquetReadOptions options = HadoopReadOptions.builder(conf).build();
+        ParquetFileReader reader =
+            ParquetFileReader.open(new 
BeamParquetInputFile(file.openSeekable()), options);
         Filter filter = checkNotNull(options.getRecordFilter(), "filter");
         Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+        for (String property : options.getPropertyNames()) {
+          hadoopConf.set(property, options.getProperty(property));
+        }
         FileMetaData parquetFileMetadata = 
reader.getFooter().getFileMetaData();
         MessageType fileSchema = parquetFileMetadata.getSchema();
         Map<String, String> fileMetadata = 
parquetFileMetadata.getKeyValueMetaData();
-
         ReadSupport.ReadContext readContext =
             readSupport.init(
                 new InitContext(
                     hadoopConf, Maps.transformValues(fileMetadata, 
ImmutableSet::of), fileSchema));
         ColumnIOFactory columnIOFactory = new 
ColumnIOFactory(parquetFileMetadata.getCreatedBy());
-        MessageType requestedSchema = readContext.getRequestedSchema();
+
         RecordMaterializer<GenericRecord> recordConverter =
             readSupport.prepareForRead(hadoopConf, fileMetadata, fileSchema, 
readContext);
-        reader.setRequestedSchema(requestedSchema);

Review comment:
       Is this okay to be skipped?

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -209,6 +223,10 @@ public static ReadFiles readFiles(Schema schema) {
 
       abstract Builder setSchema(Schema schema);
 
+      abstract Builder setProjectionEncoder(Schema schema);
+
+      abstract Builder setProjection(Schema schema);

Review comment:
       ditto.

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -279,6 +312,10 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
       abstract Builder setAvroDataModel(GenericData model);
 
+      abstract Builder setProjectionEncoder(Schema schema);
+
+      abstract Builder setProjection(Schema schema);

Review comment:
       ditto.

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -269,6 +298,10 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
     abstract @Nullable GenericData getAvroDataModel();
 
+    abstract @Nullable Schema getProjectionEncoder();
+
+    abstract @Nullable Schema getProjection();

Review comment:
       ditto.

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -299,9 +344,14 @@ public ReadFiles withSplit() {
     public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile> 
input) {
       checkNotNull(getSchema(), "Schema can not be null");
       if (isSplittable()) {
+        if (getProjection() == null) {

Review comment:
       To minimize duplication:
   
   ```
   Schema coderSchema = getProjection() == null ? getSchema() : 
getProjectionEncoder(); 
   return input
                 .apply(ParDo.of(new SplitReadFn(getAvroDataModel(), 
getProjection())))
                 .setCoder(AvroCoder.of(coderSchema));
   ```

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -336,36 +388,41 @@ public void processElement(
                 + tracker.currentRestriction().getFrom()
                 + " to "
                 + tracker.currentRestriction().getTo());
-        ParquetReadOptions options = 
HadoopReadOptions.builder(getConfWithModelClass()).build();
-        ParquetFileReader reader =
-            ParquetFileReader.open(new 
BeamParquetInputFile(file.openSeekable()), options);
+        Configuration conf = getConfWithModelClass();
         GenericData model = null;
         if (modelClass != null) {
           model = (GenericData) modelClass.getMethod("get").invoke(null);
         }
-        ReadSupport<GenericRecord> readSupport = new 
AvroReadSupport<GenericRecord>(model);
-
+        AvroReadSupport<GenericRecord> readSupport = new 
AvroReadSupport<GenericRecord>(model);
+        if (requestSchemaString != null) {
+          AvroReadSupport.setRequestedProjection(
+              conf, new Schema.Parser().parse(requestSchemaString));
+        }
+        ParquetReadOptions options = HadoopReadOptions.builder(conf).build();
+        ParquetFileReader reader =
+            ParquetFileReader.open(new 
BeamParquetInputFile(file.openSeekable()), options);
         Filter filter = checkNotNull(options.getRecordFilter(), "filter");
         Configuration hadoopConf = ((HadoopReadOptions) options).getConf();
+        for (String property : options.getPropertyNames()) {
+          hadoopConf.set(property, options.getProperty(property));

Review comment:
       Just out of curiosity: `hadoopConf` is from `options`, is this necessary 
to set properties from `options` again?

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -135,6 +135,16 @@
  * ...
  * }</pre>
  *
+ * <p>Reading with projection can be enabled with the projection schema as 
following. The
+ * projection_schema contains only the column that we would like to read and 
encoder_schema contains
+ * all field but with the unwanted columns changed to nullable.
+ *

Review comment:
       It would be also great if we could mention what is the expected 
improvement by projecting columns such as better memory usage or faster reading 
time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to