iemejia commented on a change in pull request #13619:
URL: https://github.com/apache/beam/pull/13619#discussion_r549345130



##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -311,6 +313,12 @@ public static ReadFiles readFiles(Schema schema) {
 
       abstract Builder setAvroDataModel(GenericData model);
 
+      abstract Builder setConfiguration(SerializableConfiguration 
configuration);
+
+      Builder setHadoopConfigurationFlags(Map<String, String> flags) {

Review comment:
       Can you please remove this method and replace its uses with 
`setConfiguration(makeHadoopConfiguration(...))`

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -388,6 +402,12 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
 
       abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> 
parseFn);
 
+      abstract Builder<T> setConfiguration(SerializableConfiguration 
configuration);
+
+      Builder<T> setHadoopConfigurationFlags(Map<String, String> flags) {

Review comment:
       Please remove all definitions of this method and replace its uses with 
setConfiguration(makeHadoopConfiguration(...)) in all classes where it appears

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -564,14 +623,20 @@ public ReadFiles withSplit() {
       // Default initial splitting the file into blocks of 64MB. Unit of 
SPLIT_LIMIT is byte.
       private static final long SPLIT_LIMIT = 64000000;
 
+      private final SerializableConfiguration hadoopBaseConfig;

Review comment:
       rename to configuration

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -819,9 +884,15 @@ public Progress getProgress() {
 
       private final SerializableFunction<GenericRecord, T> parseFn;
 
-      ReadFn(GenericData model, SerializableFunction<GenericRecord, T> 
parseFn) {
+      private final SerializableConfiguration hadoopBaseConfig;

Review comment:
       rename to configuration

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -920,13 +996,7 @@ public Sink withCompressionCodec(CompressionCodecName 
compressionCodecName) {
 
     /** Specifies configuration to be passed into the sink's writer. */
     public Sink withConfiguration(Map<String, String> configuration) {
-      Configuration hadoopConfiguration = new Configuration();
-      for (Map.Entry<String, String> entry : configuration.entrySet()) {
-        hadoopConfiguration.set(entry.getKey(), entry.getValue());
-      }
-      return toBuilder()
-          .setConfiguration(new SerializableConfiguration(hadoopConfiguration))
-          .build();
+      return 
toBuilder().setConfiguration(makeHadoopConfigurationUsingFlags(configuration)).build();

Review comment:
       :+1: 

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -332,6 +340,10 @@ public Read withProjection(Schema projectionSchema, Schema 
encoderSchema) {
           .build();
     }
 
+    public Read withConfiguration(Map<String, String> flags) {

Review comment:
       can you please name the argument of the withConfiguration methods 
consistently everywhere as `configuration` instead of `flags` or 
`hadoopConfigFlags`

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -835,13 +906,18 @@ public void processElement(ProcessContext processContext) 
throws Exception {
 
         SeekableByteChannel seekableByteChannel = file.openSeekable();
 
-        AvroParquetReader.Builder builder =
-            AvroParquetReader.<GenericRecord>builder(new 
BeamParquetInputFile(seekableByteChannel));
+        AvroParquetReader.Builder<GenericRecord> builder =
+            AvroParquetReader.builder(new 
BeamParquetInputFile(seekableByteChannel));
         if (modelClass != null) {
           // all GenericData implementations have a static get method
           builder = builder.withDataModel((GenericData) 
modelClass.getMethod("get").invoke(null));
         }
 
+        if (hadoopBaseConfig != null) {

Review comment:
       We should probably define a default value inside of the builders (read, 
readFiles, parseGenericRecords, parseFilesGenericRecords)  
`.setConfiguration(...)` and since we define a default value we won't need this 
`if`

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -1058,6 +1128,18 @@ public GenericRecord apply(GenericRecord input) {
     private GenericRecordPassthroughFn() {}
   }
 
+  /** Returns a new Hadoop {@link Configuration} instance with provided flags. 
*/

Review comment:
       s/Hadoop {@link Configuration}/{@link SerializableConfiguration}

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -682,7 +747,7 @@ public void processElement(
       }
 
       public Configuration getConfWithModelClass() throws Exception {
-        Configuration conf = new Configuration();
+        Configuration conf = 
SerializableConfiguration.newConfiguration(hadoopBaseConfig);

Review comment:
       :+1: 

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -1058,6 +1128,18 @@ public GenericRecord apply(GenericRecord input) {
     private GenericRecordPassthroughFn() {}
   }
 
+  /** Returns a new Hadoop {@link Configuration} instance with provided flags. 
*/
+  private static SerializableConfiguration makeHadoopConfigurationUsingFlags(

Review comment:
       Can we move this method into the SerializableConfiguration class and 
make it `public static SerializableConfiguration fromMap(Map<String, string> 
entries) {` 

##########
File path: 
sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
##########
@@ -416,6 +416,9 @@ public void 
testWriteAndReadwithSplitUsingReflectDataSchemaWithDataModel() {
     readPipeline.run().waitUntilFinish();
   }
 
+  @Test
+  public void testConfigurationReadFile() {}

Review comment:
       test or remove

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -532,6 +581,12 @@ public ReadFiles withProjection(Schema projectionSchema, 
Schema encoderSchema) {
           .setSplittable(true)
           .build();
     }
+
+    /** Specify Hadoop configuration for ParquetReader. */
+    public ReadFiles withHadoopConfiguration(Map<String, String> 
configurationFlags) {

Review comment:
       Rename to `withConfiguration` to be consistent with the other methods + 
s/configurationFlags/configuration

##########
File path: 
sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java
##########
@@ -564,14 +623,20 @@ public ReadFiles withSplit() {
       // Default initial splitting the file into blocks of 64MB. Unit of 
SPLIT_LIMIT is byte.
       private static final long SPLIT_LIMIT = 64000000;
 
+      private final SerializableConfiguration hadoopBaseConfig;
+
       private final SerializableFunction<GenericRecord, T> parseFn;
 
       SplitReadFn(
-          GenericData model, Schema requestSchema, 
SerializableFunction<GenericRecord, T> parseFn) {
+          GenericData model,
+          Schema requestSchema,
+          SerializableFunction<GenericRecord, T> parseFn,
+          SerializableConfiguration hadoopBaseConfig) {

Review comment:
       rename to configuration

##########
File path: 
sdks/java/io/parquet/src/test/java/org/apache/beam/sdk/io/parquet/ParquetIOTest.java
##########
@@ -147,7 +147,7 @@ public void testBlockTracker() throws Exception {
   public void testSplitBlockWithLimit() {
     ParquetIO.ReadFiles.SplitReadFn<GenericRecord> testFn =
         new ParquetIO.ReadFiles.SplitReadFn<>(
-            null, null, ParquetIO.GenericRecordPassthroughFn.create());
+            null, null, ParquetIO.GenericRecordPassthroughFn.create(), null);

Review comment:
       Test with new Configuration(), this should not be nullable




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to