nsivabalan commented on a change in pull request #1687:
URL: https://github.com/apache/hudi/pull/1687#discussion_r443261442
##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -115,7 +115,8 @@ private void init(String fileId, String partitionPath,
HoodieBaseFile dataFileTo
oldFilePath = new Path(config.getBasePath() + "/" + partitionPath + "/"
+ latestValidFilePath);
String relativePath = new Path((partitionPath.isEmpty() ? "" :
partitionPath + "/")
- + FSUtils.makeDataFileName(instantTime, writeToken,
fileId)).toString();
+ + FSUtils.makeDataFileName(instantTime, writeToken, fileId,
+ hoodieTable.getBaseFileFormat().getFileExtension())).toString();
Review comment:
might as well expose getBaseFileExtension() in HoodieTable.
##########
File path:
hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
##########
@@ -34,29 +34,28 @@
import java.io.IOException;
-import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
-public class HoodieStorageWriterFactory {
+public class HoodieFileWriterFactory {
- public static <T extends HoodieRecordPayload, R extends IndexedRecord>
HoodieStorageWriter<R> getStorageWriter(
+ public static <T extends HoodieRecordPayload, R extends IndexedRecord>
HoodieFileWriter<R> getFileWriter(
Review comment:
I did go thru the AbstractFactoryPattern. Is this what you are
insinuating @bvaradar
```
class FileStorageFactoryProducer {
FileStorageFactory getFileStorageFactory(Config config){
if(config.getFileStorageType() == PARQUET) {
return new ParquetFileStorageFactory(....)
} else if(config.getFileStorageType() == HFile) {
return new HFileFileStorageFactory(....)
} else {
// throw exception
}
}
}
interface FileStorageFactory {
public static <T extends HoodieRecordPayload, R extends IndexedRecord>
HoodieFileWriter<R> getFileWriter(String instantTime,
Path path, HoodieTable<T> hoodieTable, HoodieWriteConfig config, Schema
schema,
SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException;
public static <T extends HoodieRecordPayload, R extends
IndexedRecord> HoodieFileReader<R> getFileReader(
Configuration conf, Path path) throws IOException;
}
class ParquetFileStorageFactory implements FileStorageFactory {
// exposes Writer and Reader for Parquet
}
class HFileFileStorageFactory implements FileStorageFactory {
// exposes Writer and Reader for HFile
}
```
##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -132,7 +133,8 @@ private void init(String fileId, String partitionPath,
HoodieBaseFile dataFileTo
// Create the writer for writing the new version file
storageWriter =
- HoodieStorageWriterFactory.getStorageWriter(instantTime,
newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
+ HoodieFileWriterFactory.getFileWriter(instantTime, newFilePath,
hoodieTable, config, writerSchema, sparkTaskContextSupplier);
Review comment:
similar to HoodieReadHandle, how about we expose this writer
instantiation in HoodieWriteHandle.
```
HoodieFileWriter<IndexedRecord> getFileWriter(String instantTime, Path
newFilePath){
return HoodieFileWriterFactory.getFileWriter(instantTime, newFilePath,
hoodieTable, config, writerSchema, sparkTaskContextSupplier);
}
```
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
##########
@@ -115,7 +115,11 @@ public CommitActionExecutor(JavaSparkContext jsc,
}
protected HoodieMergeHandle getUpdateHandle(String partitionPath, String
fileId, Iterator<HoodieRecord<T>> recordItr) {
- return new HoodieMergeHandle<>(config, instantTime, (HoodieTable<T>)table,
recordItr, partitionPath, fileId, sparkTaskContextSupplier);
+ if (table.requireSortedRecords()) {
Review comment:
not sure if my way of thinking is right. correct me if I am wrong. we
are making our file formats flexible where in either of base file or log file
can be of any format. I do understand for index purposes both base file and log
file will be HFile. but wondering if tagging this config to table is the right
think to do or should we have two configs, one for base file and one for log
file. What in case we want to experiment w/ Hfile format for log files and
parquet as base files since we might get indexing capability for log files too.
##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieReadHandle.java
##########
@@ -56,4 +61,9 @@ protected HoodieBaseFile getLatestDataFile() {
return hoodieTable.getBaseFileOnlyView()
.getLatestBaseFile(partitionPathFilePair.getLeft(),
partitionPathFilePair.getRight()).get();
}
+
+ protected HoodieFileReader getStorageReader() throws IOException {
Review comment:
minor. Do you think we should name this getNewStorageReader(). Just to
be cautious to avoid some caller using this more like a getter for Reader. This
does not return a singleton, but creates a new reader everytime.
##########
File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##########
@@ -132,7 +133,8 @@ private void init(String fileId, String partitionPath,
HoodieBaseFile dataFileTo
// Create the writer for writing the new version file
storageWriter =
- HoodieStorageWriterFactory.getStorageWriter(instantTime,
newFilePath, hoodieTable, config, writerSchema, sparkTaskContextSupplier);
+ HoodieFileWriterFactory.getFileWriter(instantTime, newFilePath,
hoodieTable, config, writerSchema, sparkTaskContextSupplier);
Review comment:
similar to my suggestion in reader, try to see if we need to name this
as getNewFileWriter()
##########
File path:
hudi-client/src/main/java/org/apache/hudi/table/action/commit/CommitActionExecutor.java
##########
@@ -89,11 +87,12 @@ public CommitActionExecutor(JavaSparkContext jsc,
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + "
for fileId: " + fileId);
} else {
- AvroReadSupport.setAvroReadSchema(table.getHadoopConf(),
upsertHandle.getWriterSchema());
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper =
null;
- try (ParquetReader<IndexedRecord> reader =
-
AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath()).withConf(table.getHadoopConf()).build())
{
- wrapper = new SparkBoundedInMemoryExecutor(config, new
ParquetReaderIterator(reader),
+ try {
+ HoodieStorageReader<IndexedRecord> storageReader =
+ HoodieStorageReaderFactory.getStorageReader(table.getHadoopConf(),
upsertHandle.getOldFilePath());
+ wrapper =
+ new SparkBoundedInMemoryExecutor(config,
storageReader.getRecordIterator(upsertHandle.getWriterSchema()),
Review comment:
@vinothchandar : Guess its taken care of. Here is the getRecordIterator
for Parquet.
```
public Iterator<R> getRecordIterator(Schema schema) throws IOException {
AvroReadSupport.setAvroReadSchema(conf, schema);
ParquetReader<IndexedRecord> reader =
AvroParquetReader.<IndexedRecord>builder(path).withConf(conf).build();
return new ParquetReaderIterator(reader);
}
```
We set the scheme before instantiating the reader.
Let me know is my understanding is wrong.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]