This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 622c8a37caf [HUDI-8339] Avoid glob path loading while building 
functional index (#12087)
622c8a37caf is described below

commit 622c8a37cafea903b59c66fd4d40c80380df8e94
Author: Sagar Sumit <[email protected]>
AuthorDate: Sun Oct 13 09:11:21 2024 +0530

    [HUDI-8339] Avoid glob path loading while building functional index (#12087)
---
 .../client/utils/SparkMetadataWriterUtils.java     | 109 ++++++++++++++-------
 1 file changed, 74 insertions(+), 35 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index ece6f479cd6..8e9f0696c8d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -19,36 +19,48 @@
 
 package org.apache.hudi.client.utils;
 
-import org.apache.hudi.SparkAdapterSupport$;
+import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.index.functional.HoodieFunctionalIndex;
+import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
 import org.apache.spark.sql.functions;
-import org.apache.spark.sql.sources.BaseRelation;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.MAX_DFS_STREAM_BUFFER_SIZE;
 import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS;
+import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.createBloomFilterMetadataRecord;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.createColumnStatsRecords;
@@ -59,14 +71,6 @@ import static 
org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
  */
 public class SparkMetadataWriterUtils {
 
-  /**
-   * Configs required to load records from paths as a dataframe
-   */
-  private static final String QUERY_TYPE_CONFIG = 
"hoodie.datasource.query.type";
-  private static final String QUERY_TYPE_SNAPSHOT = "snapshot";
-  private static final String READ_PATHS_CONFIG = 
"hoodie.datasource.read.paths";
-  private static final String GLOB_PATHS_CONFIG = "glob.paths";
-
   public static List<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
                                                                              
Schema readerSchema,
                                                                              
List<FileSlice> fileSlices,
@@ -79,12 +83,12 @@ public class SparkMetadataWriterUtils {
       if (fileSlice.getBaseFile().isPresent()) {
         HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
         long fileSize = baseFile.getFileSize();
-        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, 
baseFile.getStoragePath());
+        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, 
baseFile.getStoragePath(), true);
       }
       // Handle log files
       fileSlice.getLogFiles().forEach(logFile -> {
         long fileSize = logFile.getFileSize();
-        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, 
logFile.getPath());
+        buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, columnRangeMetadataList, fileSize, 
logFile.getPath(), false);
       });
     }
     return createColumnStatsRecords(partition, columnRangeMetadataList, false, 
functionalIndex.getIndexName(), 
COLUMN_STATS.getRecordType()).collect(Collectors.toList());
@@ -103,12 +107,12 @@ public class SparkMetadataWriterUtils {
       if (fileSlice.getBaseFile().isPresent()) {
         HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
         buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex, 
columnToIndex, sqlContext, bloomFilterMetadataList, baseFile.getStoragePath(), 
metadataWriteConfig, partition,
-            baseFile.getCommitTime());
+            baseFile.getCommitTime(), true);
       }
       // Handle log files
       fileSlice.getLogFiles().forEach(
           logFile -> buildBloomFilterMetadata(metaClient, readerSchema, 
functionalIndex, columnToIndex, sqlContext, bloomFilterMetadataList, 
logFile.getPath(), metadataWriteConfig, partition,
-              logFile.getDeltaCommitTime()));
+              logFile.getDeltaCommitTime(), false));
     }
     return bloomFilterMetadataList;
   }
@@ -120,9 +124,10 @@ public class SparkMetadataWriterUtils {
                                                SQLContext sqlContext,
                                                
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
                                                long fileSize,
-                                               StoragePath filePath) {
-    Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath}, 
sqlContext, metaClient, readerSchema);
-    Column indexedColumn = 
functionalIndex.apply(Arrays.asList(fileDf.col(columnToIndex)));
+                                               StoragePath filePath,
+                                               boolean isBaseFile) {
+    Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath}, 
sqlContext, metaClient, readerSchema, isBaseFile);
+    Column indexedColumn = 
functionalIndex.apply(Collections.singletonList(fileDf.col(columnToIndex)));
     fileDf = fileDf.withColumn(columnToIndex, indexedColumn);
     HoodieColumnRangeMetadata<Comparable> columnRangeMetadata = 
computeColumnRangeMetadata(fileDf, columnToIndex, filePath.toString(), 
fileSize);
     columnRangeMetadataList.add(columnRangeMetadata);
@@ -137,9 +142,10 @@ public class SparkMetadataWriterUtils {
                                                StoragePath filePath,
                                                HoodieWriteConfig writeConfig,
                                                String partitionName,
-                                               String instantTime) {
-    Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath}, 
sqlContext, metaClient, readerSchema);
-    Column indexedColumn = 
functionalIndex.apply(Arrays.asList(fileDf.col(columnToIndex)));
+                                               String instantTime,
+                                               boolean isBaseFile) {
+    Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath}, 
sqlContext, metaClient, readerSchema, isBaseFile);
+    Column indexedColumn = 
functionalIndex.apply(Collections.singletonList(fileDf.col(columnToIndex)));
     fileDf = fileDf.withColumn(columnToIndex, indexedColumn);
     BloomFilter bloomFilter = 
HoodieFileWriterFactory.createBloomFilter(writeConfig);
     fileDf.foreach(row -> {
@@ -151,20 +157,53 @@ public class SparkMetadataWriterUtils {
   }
 
   private static Dataset<Row> readRecordsAsRow(StoragePath[] paths, SQLContext 
sqlContext,
-                                               HoodieTableMetaClient 
metaClient, Schema schema) {
-    String readPathString =
-        String.join(",", 
Arrays.stream(paths).map(StoragePath::toString).toArray(String[]::new));
-    String globPathString = String.join(",", 
Arrays.stream(paths).map(StoragePath::getParent).map(StoragePath::toString).distinct().toArray(String[]::new));
-    HashMap<String, String> params = new HashMap<>();
-    params.put(QUERY_TYPE_CONFIG, QUERY_TYPE_SNAPSHOT);
-    params.put(READ_PATHS_CONFIG, readPathString);
-    // Building HoodieFileIndex needs this param to decide query path
-    params.put(GLOB_PATHS_CONFIG, globPathString);
-    // Let Hudi relations to fetch the schema from the table itself
-    BaseRelation relation = SparkAdapterSupport$.MODULE$.sparkAdapter()
-        .createRelation(sqlContext, metaClient, schema, paths, params);
-
-    return dropMetaFields(sqlContext.baseRelationToDataFrame(relation));
+                                               HoodieTableMetaClient 
metaClient, Schema schema,
+                                               boolean isBaseFile) {
+    List<HoodieRecord> records = isBaseFile ? getBaseFileRecords(new 
HoodieBaseFile(paths[0].toString()), metaClient, schema)
+        : 
getUnmergedLogFileRecords(Arrays.stream(paths).map(StoragePath::toString).collect(Collectors.toList()),
 metaClient, schema);
+    return dropMetaFields(toDataset(records, schema, sqlContext));
+  }
+
+  private static List<HoodieRecord> getUnmergedLogFileRecords(List<String> 
logFilePaths, HoodieTableMetaClient metaClient, Schema readerSchema) {
+    List<HoodieRecord> records = new ArrayList<>();
+    HoodieUnMergedLogRecordScanner scanner = 
HoodieUnMergedLogRecordScanner.newBuilder()
+        .withStorage(metaClient.getStorage())
+        .withBasePath(metaClient.getBasePath())
+        .withLogFilePaths(logFilePaths)
+        .withBufferSize(MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
+        
.withLatestInstantTime(metaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().getTimestamp())
+        .withReaderSchema(readerSchema)
+        .withTableMetaClient(metaClient)
+        .withLogRecordScannerCallback(records::add)
+        .build();
+    scanner.scan(false);
+    return records;
+  }
+
+  private static List<HoodieRecord> getBaseFileRecords(HoodieBaseFile 
baseFile, HoodieTableMetaClient metaClient, Schema readerSchema) {
+    List<HoodieRecord> records = new ArrayList<>();
+    HoodieRecordMerger recordMerger =
+        
HoodieRecordUtils.createRecordMerger(metaClient.getBasePath().toString(), 
EngineType.SPARK, Collections.emptyList(), 
metaClient.getTableConfig().getRecordMergerStrategy());
+    try (HoodieFileReader baseFileReader = 
HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType())
+        .getFileReader(getReaderConfigs(metaClient.getStorageConf()), 
baseFile.getStoragePath())) {
+      baseFileReader.getRecordIterator(readerSchema).forEachRemaining((record) 
-> records.add((HoodieRecord) record));
+      return records;
+    } catch (IOException e) {
+      throw new HoodieIOException("Error reading base file " + 
baseFile.getFileName(), e);
+    }
+  }
+
+  private static Dataset<Row> toDataset(List<HoodieRecord> records, Schema 
schema, SQLContext sqlContext) {
+    List<GenericRecord> avroRecords = records.stream()
+        .map(r -> (GenericRecord) r.getData())
+        .collect(Collectors.toList());
+    if (avroRecords.isEmpty()) {
+      return sqlContext.emptyDataFrame().toDF();
+    }
+    try (JavaSparkContext jsc = new 
JavaSparkContext(sqlContext.sparkContext())) {
+      JavaRDD<GenericRecord> javaRDD = jsc.parallelize(avroRecords);
+      return AvroConversionUtils.createDataFrame(javaRDD.rdd(), 
schema.toString(), sqlContext.sparkSession());
+    }
   }
 
   private static <T extends Comparable<T>> 
HoodieColumnRangeMetadata<Comparable> computeColumnRangeMetadata(Dataset<Row> 
rowDataset,

Reply via email to