danny0405 commented on code in PR #13240:
URL: https://github.com/apache/hudi/pull/13240#discussion_r2070976872


##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java:
##########
@@ -71,93 +65,75 @@ protected HoodieFileWriter newParquetFileWriter(
       OutputStream outputStream,
       HoodieConfig config,
       Schema schema) throws IOException {
-    final DataType rowDataType = AvroSchemaConverter.convertToDataType(schema);
-    final RowType rowType = (RowType) rowDataType.getLogicalType();
+    final RowType rowType = (RowType) 
RowDataAvroQueryContexts.fromAvroSchema(schema).getRowType().getLogicalType();
     HoodieRowDataParquetWriteSupport writeSupport =
         new HoodieRowDataParquetWriteSupport(
             storage.getConf().unwrapAs(Configuration.class), rowType, null);
     return new HoodieRowDataParquetOutputStreamWriter(
-        new FSDataOutputStream(outputStream, null),
-        writeSupport,
-        new HoodieParquetConfig<>(
-            writeSupport,
-            
getCompressionCodecName(config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME)),
-            config.getIntOrDefault(HoodieStorageConfig.PARQUET_BLOCK_SIZE),
-            config.getIntOrDefault(HoodieStorageConfig.PARQUET_PAGE_SIZE),
-            config.getLongOrDefault(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE),
-            new HadoopStorageConfiguration(writeSupport.getHadoopConf()),
-            
config.getDoubleOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION),
-            
config.getBooleanOrDefault(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED)));
+        new FSDataOutputStream(outputStream, null), writeSupport, 
getParquetConfig(config, writeSupport));
   }
 
   /**
    * Create a parquet RowData writer on a given storage path.
    *
    * @param instantTime instant time to write
    * @param storagePath file storage path
-   * @param config Hoodie configuration
+   * @param config hoodie configuration
    * @param schema write schema
    * @param taskContextSupplier task context supplier
-   * @return A RowData parquet writer
+   * @return a RowData parquet writer
    */
   @Override
-  protected HoodieFileWriter newParquetFileWriter(
+  public HoodieFileWriter newParquetFileWriter(
       String instantTime,
       StoragePath storagePath,
       HoodieConfig config,
       Schema schema,
       TaskContextSupplier taskContextSupplier) throws IOException {
-    final DataType rowDataType = AvroSchemaConverter.convertToDataType(schema);
-    final RowType rowType = (RowType) rowDataType.getLogicalType();
-    Configuration conf = storage.getConf().unwrapAs(Configuration.class);
-    return newParquetInternalRowFileWriter(new Path(storagePath.toUri()), 
(HoodieWriteConfig) config, rowType, conf);
+    final RowType rowType = (RowType) 
RowDataAvroQueryContexts.fromAvroSchema(schema).getRowType().getLogicalType();
+    return newParquetFileWriter(instantTime, storagePath, config, rowType, 
taskContextSupplier);
   }
 
   /**
-   * Factory method to assist in instantiating an instance of {@link 
HoodieRowDataFileWriter}.
+   * Create a parquet RowData writer on a given storage path.
    *
-   * @param path        path of the RowFileWriter.

Review Comment:
   revert the indentation changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to