yihua commented on code in PR #11163:
URL: https://github.com/apache/hudi/pull/11163#discussion_r1595997546


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java:
##########
@@ -107,38 +106,35 @@ protected byte[] serializeRecords(List<HoodieRecord> 
records) throws IOException
     }
 
     Schema writerSchema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    try (FSDataOutputStream outputStream = new FSDataOutputStream(baos, null)) 
{
-      HoodieFileWriter parquetWriter = null;
-      HoodieConfig config = new HoodieConfig();
-      config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(), 
compressionCodecName.get().name());
-      config.setValue(PARQUET_BLOCK_SIZE.key(), 
String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE));
-      config.setValue(PARQUET_PAGE_SIZE.key(), 
String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE));
-      config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 
* 1024));
-      config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(), 
String.valueOf(expectedCompressionRatio.get()));
-      config.setValue(PARQUET_DICTIONARY_ENABLED, 
String.valueOf(useDictionaryEncoding.get()));
-      HoodieRecordType recordType = records.iterator().next().getRecordType();
-      try {
-        parquetWriter = HoodieFileWriterFactory.getFileWriter(
-            HoodieFileFormat.PARQUET,
-            outputStream,
-            HoodieStorageUtils.getStorageConf(new Configuration()),
-            config,
-            writerSchema,
-            recordType);
-        for (HoodieRecord<?> record : records) {
-          String recordKey = getRecordKey(record).orElse(null);
-          parquetWriter.write(recordKey, record, writerSchema);
-        }
-        outputStream.flush();
-      } finally {
-        if (parquetWriter != null) {
-          parquetWriter.close();
-        }
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    HoodieFileWriter parquetWriter = null;
+    HoodieConfig config = new HoodieConfig();
+    config.setValue(PARQUET_COMPRESSION_CODEC_NAME.key(), 
compressionCodecName.get().name());
+    config.setValue(PARQUET_BLOCK_SIZE.key(), 
String.valueOf(ParquetWriter.DEFAULT_BLOCK_SIZE));
+    config.setValue(PARQUET_PAGE_SIZE.key(), 
String.valueOf(ParquetWriter.DEFAULT_PAGE_SIZE));
+    config.setValue(PARQUET_MAX_FILE_SIZE.key(), String.valueOf(1024 * 1024 * 
1024));
+    config.setValue(PARQUET_COMPRESSION_RATIO_FRACTION.key(), 
String.valueOf(expectedCompressionRatio.get()));
+    config.setValue(PARQUET_DICTIONARY_ENABLED, 
String.valueOf(useDictionaryEncoding.get()));
+    HoodieRecordType recordType = records.iterator().next().getRecordType();
+    try {
+      parquetWriter = HoodieFileWriterFactory.getFileWriter(
+          HoodieFileFormat.PARQUET,
+          outputStream,
+          HoodieStorageUtils.getStorageConf(new Configuration()),
+          config,
+          writerSchema,
+          recordType);

Review Comment:
   Use try with resources for `parquetWriter`?
   ```
   try (parquetWriter = HoodieFileWriterFactory.getFileWriter(
             HoodieFileFormat.PARQUET,
             outputStream,
             HoodieStorageUtils.getStorageConf(new Configuration()),
             config,
             writerSchema,
             recordType)) {
     ...
   }
   ```



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java:
##########
@@ -46,14 +46,21 @@ public class HoodieFileReaderFactory {
   public static HoodieFileReaderFactory 
getReaderFactory(HoodieRecord.HoodieRecordType recordType) {
     switch (recordType) {
       case AVRO:
-        return new HoodieAvroFileReaderFactory();
+
+        try {
+          Class<?> clazz =
+              
ReflectionUtils.getClass("org.apache.hudi.io.storage.hadoop.HoodieAvroFileReaderFactory");
+          return (HoodieFileReaderFactory) clazz.newInstance();
+        } catch (IllegalArgumentException | IllegalAccessException | 
InstantiationException e) {
+          throw new HoodieException("Unable to create hoodie avro file reader 
factory", e);

Review Comment:
   ```suggestion
             throw new HoodieException("Unable to create 
HoodieAvroFileReaderFactory.", e);
   ```



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java:
##########
@@ -18,10 +18,32 @@
 
 package org.apache.hudi.io.storage;
 
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+
+import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 
+import java.io.IOException;
+
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+
 /**
- * Marker interface for every {@link HoodieFileReader} reading in Avro (ie
- * producing {@link IndexedRecord}s)
+ * Base class for every avro file reader

Review Comment:
   ```suggestion
    * Base class for every Avro file reader
   ```



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java:
##########
@@ -46,7 +46,12 @@ public class HoodieFileWriterFactory {
   private static HoodieFileWriterFactory 
getWriterFactory(HoodieRecord.HoodieRecordType recordType) {
     switch (recordType) {
       case AVRO:
-        return new HoodieAvroFileWriterFactory();
+        try {
+          Class<?> clazz = 
ReflectionUtils.getClass("org.apache.hudi.io.storage.hadoop.HoodieAvroFileWriterFactory");
+          return (HoodieFileWriterFactory) clazz.newInstance();
+        } catch (IllegalAccessException | IllegalArgumentException | 
InstantiationException e) {
+          throw new HoodieException("Unable to create hoodie avro file writer 
factory", e);

Review Comment:
   ```suggestion
             throw new HoodieException("Unable to create 
HoodieAvroFileWriterFactory", e);
   ```



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieTestReaderContext.java:
##########
@@ -78,7 +80,9 @@ public ClosableIterator<IndexedRecord> getFileRecordIterator(
       Schema requiredSchema,
       StorageConfiguration<?> conf
   ) throws IOException {
-    HoodieAvroParquetReader reader = new HoodieAvroParquetReader(conf, new 
StoragePath(filePath.toUri()));
+    HoodieAvroFileReader reader = (HoodieAvroFileReader) 
HoodieFileReaderFactory
+        
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(null,

Review Comment:
   Instead of `null`, should the empty `HoodieConfig` be passed in here, to 
avoid NPE if the underlying implementation is changed to leverage the Hudi 
configs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to