nsivabalan commented on a change in pull request #1687:
URL: https://github.com/apache/hudi/pull/1687#discussion_r443261442



##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -115,7 +115,8 @@ private void init(String fileId, String partitionPath, 
HoodieBaseFile dataFileTo
 
       oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/" 
+ latestValidFilePath);
       String relativePath = new Path((partitionPath.isEmpty() ? "" : 
partitionPath + "/")
-          + FSUtils.makeDataFileName(instantTime, writeToken, 
fileId)).toString();
+          + FSUtils.makeDataFileName(instantTime, writeToken, fileId,
+              hoodieTable.getBaseFileFormat().getFileExtension())).toString();

Review comment:
       might as well expose getBaseFileExtension() in HoodieTable.

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
##########
@@ -34,29 +34,28 @@
 
 import java.io.IOException;
 
-import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
 
-public class HoodieStorageWriterFactory {
+public class HoodieFileWriterFactory {
 
-  public static <T extends HoodieRecordPayload, R extends IndexedRecord> 
HoodieStorageWriter<R> getStorageWriter(
+  public static <T extends HoodieRecordPayload, R extends IndexedRecord> 
HoodieFileWriter<R> getFileWriter(

Review comment:
       I did go thru the AbstractFactoryPattern. Is this what you are 
insinuating @bvaradar 
   
   ```
   class FileStorageFactoryProducer {
   
        FileStorageFactory getFileStorageFactory(Config config){
                if(config.getFileStorageType() == PARQUET) {
                        return new ParquetFileStorageFactory(....)
                } else if(config.getFileStorageType() == HFile) {
                        return new HFileFileStorageFactory(....)
                } else {
                   // throw exception
                }
        }  
   }
   
   
   interface FileStorageFactory {
        
        public static <T extends HoodieRecordPayload, R extends IndexedRecord> 
HoodieFileWriter<R> getFileWriter(String instantTime, 
   Path path, HoodieTable<T> hoodieTable, HoodieWriteConfig config, Schema 
schema, 
   SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException;
   
           public static <T extends HoodieRecordPayload, R extends 
IndexedRecord> HoodieFileReader<R> getFileReader(
   Configuration conf, Path path) throws IOException;
   
   }
   
   
   class ParquetFileStorageFactory implements FileStorageFactory {
        
                // exposes Writer and Reader for Parquet
   
   }
   
   
   class HFileFileStorageFactory implements FileStorageFactory {
                // exposes Writer and Reader for HFile
   }
   ```
    

##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -132,7 +133,8 @@ private void init(String fileId, String partitionPath, 
HoodieBaseFile dataFileTo
 
       // Create the writer for writing the new version file
       storageWriter =
-          HoodieStorageWriterFactory.getStorageWriter(instantTime, 
newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
+          HoodieFileWriterFactory.getFileWriter(instantTime, newFilePath, 
hoodieTable, config, writerSchema, sparkTaskContextSupplier);

Review comment:
       similar to HoodieReadHandle, how about we expose this writer 
instantiation in HoodieWriteHandle.
   
   ```
   HoodieFileWriter<IndexedRecord> getFileWriter(String instantTime, Path 
newFilePath){
        return HoodieFileWriterFactory.getFileWriter(instantTime, newFilePath, 
hoodieTable, config, writerSchema, sparkTaskContextSupplier);
   }
   ```
   

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
##########
@@ -115,7 +115,11 @@ public CommitActionExecutor(JavaSparkContext jsc,
   }
 
   protected HoodieMergeHandle getUpdateHandle(String partitionPath, String 
fileId, Iterator<HoodieRecord<T>> recordItr) {
-    return new HoodieMergeHandle<>(config, instantTime, (HoodieTable<T>)table, 
recordItr, partitionPath, fileId, sparkTaskContextSupplier);
+    if (table.requireSortedRecords()) {

Review comment:
       not sure if my way of thinking is right. correct me if I am wrong. we 
are making our file formats flexible where in either of base file or log file 
can be of any format. I do understand for index purposes both base file and log 
file will be HFile. but wondering if tagging this config to table is the right 
think to do or should we have two configs, one for base file and one for log 
file. What in case we want to experiment w/ Hfile format for log files and 
parquet as base files since we might get indexing capability for log files too. 
   

##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
##########
@@ -56,4 +61,9 @@ protected HoodieBaseFile getLatestDataFile() {
     return hoodieTable.getBaseFileOnlyView()
         .getLatestBaseFile(partitionPathFilePair.getLeft(), 
partitionPathFilePair.getRight()).get();
   }
+
+  protected HoodieFileReader getStorageReader() throws IOException {

Review comment:
       minor. Do you think we should name this getNewStorageReader(). Just to 
be cautious to avoid some caller using this more like a getter for Reader. This 
does not return a singleton, but creates a new reader everytime. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -132,7 +133,8 @@ private void init(String fileId, String partitionPath, 
HoodieBaseFile dataFileTo
 
       // Create the writer for writing the new version file
       storageWriter =
-          HoodieStorageWriterFactory.getStorageWriter(instantTime, 
newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
+          HoodieFileWriterFactory.getFileWriter(instantTime, newFilePath, 
hoodieTable, config, writerSchema, sparkTaskContextSupplier);

Review comment:
       similar to my suggestion in reader, try to see if we need to name this 
as getNewFileWriter()

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
##########
@@ -89,11 +87,12 @@ public CommitActionExecutor(JavaSparkContext jsc,
       throw new HoodieUpsertException(
           "Error in finding the old file path at commit " + instantTime + " 
for fileId: " + fileId);
     } else {
-      AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), 
upsertHandle.getWriterSchema());
       BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = 
null;
-      try (ParquetReader<IndexedRecord> reader =
-          
AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath()).withConf(table.getHadoopConf()).build())
 {
-        wrapper = new SparkBoundedInMemoryExecutor(config, new 
ParquetReaderIterator(reader),
+      try {
+        HoodieStorageReader<IndexedRecord> storageReader =
+            HoodieStorageReaderFactory.getStorageReader(table.getHadoopConf(), 
upsertHandle.getOldFilePath());
+        wrapper =
+            new SparkBoundedInMemoryExecutor(config, 
storageReader.getRecordIterator(upsertHandle.getWriterSchema()),

Review comment:
       @vinothchandar :  Guess its taken care of. Here is the getRecordIterator 
for Parquet.
   ```
   public Iterator<R> getRecordIterator(Schema schema) throws IOException {
       AvroReadSupport.setAvroReadSchema(conf, schema);
       ParquetReader<IndexedRecord> reader = 
AvroParquetReader.<IndexedRecord>builder(path).withConf(conf).build();
       return new ParquetReaderIterator(reader);
     }
   ```
   We set the scheme before instantiating the reader. 
   Let me know is my understanding is wrong. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to