danny0405 commented on a change in pull request #4837:
URL: https://github.com/apache/hudi/pull/4837#discussion_r809692854



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
##########
@@ -171,6 +172,33 @@ public BloomFilter 
readBloomFilterFromMetadata(Configuration configuration, Path
    */
   public abstract List<HoodieKey> fetchRecordKeyPartitionPath(Configuration 
configuration, Path filePath);
 
+  /**
+   * Fetch {@link HoodieKey}s from the given data file.
+   * @param reader        The file reader
+   * @param filePath      The data file path
+   * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
+   */
+  public abstract List<HoodieKey> fetchRecordKeyPartitionPath(BaseFileReader 
reader, Path filePath, int batchSize);
+
+  /**
+   * Open File Reader.
+   * @param configuration        configuration to build file reader
+   * @param filePath      The data file path
+   * @param keyGeneratorOpt instance of KeyGenerator
+   * @return file reader
+   */
+  public abstract BaseFileReader getReader(Configuration configuration, Path 
filePath, Option<BaseKeyGenerator> keyGeneratorOpt);
+
+  /**
+   * Open File Reader.
+   * @param configuration        configuration to build file reader
+   * @param filePath      The data file path
+   * @return file reader
+   */
+  public BaseFileReader getReader(Configuration configuration, Path filePath) {
+    return getReader(configuration, filePath, Option.empty());

Review comment:
       ditto

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
##########
@@ -134,17 +162,30 @@
   @Override
   public List<GenericRecord> readAvroRecords(Configuration configuration, Path 
filePath, Schema avroSchema) {
     List<GenericRecord> records = new ArrayList<>();
+    Reader reader = null;
+    RecordReader recordReader = null;
     try {

Review comment:
       ```java
   final Reader reader = null;
   final RecordReader recordReader = null;
   ```

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
##########
@@ -171,6 +172,33 @@ public BloomFilter 
readBloomFilterFromMetadata(Configuration configuration, Path
    */
   public abstract List<HoodieKey> fetchRecordKeyPartitionPath(Configuration 
configuration, Path filePath);
 
+  /**
+   * Fetch {@link HoodieKey}s from the given data file.
+   * @param reader        The file reader
+   * @param filePath      The data file path
+   * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
+   */
+  public abstract List<HoodieKey> fetchRecordKeyPartitionPath(BaseFileReader 
reader, Path filePath, int batchSize);
+
+  /**
+   * Open File Reader.
+   * @param configuration        configuration to build file reader
+   * @param filePath      The data file path
+   * @param keyGeneratorOpt instance of KeyGenerator
+   * @return file reader
+   */
+  public abstract BaseFileReader getReader(Configuration configuration, Path 
filePath, Option<BaseKeyGenerator> keyGeneratorOpt);
+

Review comment:
       `getReader` => `getRecordKeyPartitionPathReader`

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
##########
@@ -239,11 +291,30 @@ public Schema readAvroSchema(Configuration conf, Path 
orcFilePath) {
 
   @Override
   public long getRowCount(Configuration conf, Path orcFilePath) {
-    try {
-      Reader reader = OrcFile.createReader(orcFilePath, 
OrcFile.readerOptions(conf));
+    try (Reader reader = OrcFile.createReader(orcFilePath, 
OrcFile.readerOptions(conf))) {
       return reader.getNumberOfRows();
     } catch (IOException io) {
       throw new HoodieIOException("Unable to get row count for ORC file:" + 
orcFilePath, io);
     }
   }
+
+  private class OrcFileInnerReader extends BaseFileReader {
+    Reader reader;

Review comment:
       `OrcFileInnerReader` => `OrcReaderWrapper`

##########
File path: 
hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java
##########
@@ -211,16 +213,29 @@ protected void loadRecords(String partitionPath) throws 
Exception {
             return;
           }
 
-          final List<HoodieKey> hoodieKeys;
-          try {
-            hoodieKeys =
-                fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new 
Path(baseFile.getPath()));
-          } catch (Exception e) {
-            throw new HoodieException(String.format("Error when loading record 
keys from file: %s", baseFile), e);
-          }
-
-          for (HoodieKey hoodieKey : hoodieKeys) {
-            output.collect(new StreamRecord(new 
IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))));
+          if (this.batchSize > 0) {
+            List<HoodieKey> hoodieKeys;
+            Path filePath = new Path(baseFile.getPath());
+            try (BaseFileUtils.BaseFileReader reader = 
fileUtils.getReader(this.hadoopConf, filePath)) {
+              do {
+                hoodieKeys = fileUtils.fetchRecordKeyPartitionPath(reader, 
filePath, batchSize);

Review comment:
       1. Can we have method `BaseFileReader#hasNextBatch` and 
`BaseFileReader#NextBatch` then iterator over that ?
   2. Can we also have a constant batch size `1024` and remove the config 
option then ?

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
##########
@@ -100,6 +126,9 @@
           String rowKey = rowKeys.toString(i);
           String partitionPath = partitionPaths.toString(i);
           hoodieKeys.add(new HoodieKey(rowKey, partitionPath));
+          if (batchSize > 0 && hoodieKeys.size() >= batchSize) {
+            break;
+          }

Review comment:
       1. I see that there is already a batchSize param in `VectorizedRowBatch` 
and the default val is 1024, can we use that
   2. We should not skip the for-loop early in while reading a batch, which 
would cause data lost, instead we can put it out of the for-loop, that means we 
must read a full `VectorizedRowBatch`
   3. Considering there is already a batch there, the more reasonable way to 
control the read buffer is a param like `int batches`, which means the number 
of read batches once a time. Or you can divide the `batchSize` by the 
`VectorizedRowBatch` size to get the vector batch number to read.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -143,11 +148,20 @@
         fields.addAll(keyGenerator.getPartitionPathFields());
         return HoodieAvroUtils.getSchemaForFields(readAvroSchema(conf, 
filePath), fields);
       })
-          .orElse(HoodieAvroUtils.getRecordKeyPartitionPathSchema());
+              .orElse(HoodieAvroUtils.getRecordKeyPartitionPathSchema());
       AvroReadSupport.setAvroReadSchema(conf, readSchema);

Review comment:
       Fix the indentation.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
##########
@@ -171,6 +172,33 @@ public BloomFilter 
readBloomFilterFromMetadata(Configuration configuration, Path
    */
   public abstract List<HoodieKey> fetchRecordKeyPartitionPath(Configuration 
configuration, Path filePath);
 
+  /**
+   * Fetch {@link HoodieKey}s from the given data file.
+   * @param reader        The file reader
+   * @param filePath      The data file path
+   * @return {@link List} of {@link HoodieKey}s fetched from the parquet file

Review comment:
       1. Please align the comments of the param document, please fix all the 
documents
   2. add document for `batchSize`

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
##########
@@ -424,4 +435,17 @@ private static BigDecimal extractDecimal(Object val, 
DecimalMetadata decimalMeta
       throw new UnsupportedOperationException(String.format("Unsupported value 
type (%s)", val.getClass().getName()));
     }
   }
+
+  private class ParquetFileInnerReader extends BaseFileReader {
+    ParquetReader reader;

Review comment:
       `ParquetFileInnerReader` => `ParquetReaderWrapper`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to