This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 622c8a37caf [HUDI-8339] Avoid glob path loading while building
functional index (#12087)
622c8a37caf is described below
commit 622c8a37cafea903b59c66fd4d40c80380df8e94
Author: Sagar Sumit <[email protected]>
AuthorDate: Sun Oct 13 09:11:21 2024 +0530
[HUDI-8339] Avoid glob path loading while building functional index (#12087)
---
.../client/utils/SparkMetadataWriterUtils.java | 109 ++++++++++++++-------
1 file changed, 74 insertions(+), 35 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index ece6f479cd6..8e9f0696c8d 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -19,36 +19,48 @@
package org.apache.hudi.client.utils;
-import org.apache.hudi.SparkAdapterSupport$;
+import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
+import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.functional.HoodieFunctionalIndex;
+import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.functions;
-import org.apache.spark.sql.sources.BaseRelation;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.config.HoodieCommonConfig.MAX_DFS_STREAM_BUFFER_SIZE;
import static org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS;
+import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static
org.apache.hudi.metadata.HoodieMetadataPayload.createBloomFilterMetadataRecord;
import static
org.apache.hudi.metadata.HoodieMetadataPayload.createColumnStatsRecords;
@@ -59,14 +71,6 @@ import static
org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
*/
public class SparkMetadataWriterUtils {
- /**
- * Configs required to load records from paths as a dataframe
- */
- private static final String QUERY_TYPE_CONFIG =
"hoodie.datasource.query.type";
- private static final String QUERY_TYPE_SNAPSHOT = "snapshot";
- private static final String READ_PATHS_CONFIG =
"hoodie.datasource.read.paths";
- private static final String GLOB_PATHS_CONFIG = "glob.paths";
-
public static List<HoodieRecord>
getFunctionalIndexRecordsUsingColumnStats(HoodieTableMetaClient metaClient,
Schema readerSchema,
List<FileSlice> fileSlices,
@@ -79,12 +83,12 @@ public class SparkMetadataWriterUtils {
if (fileSlice.getBaseFile().isPresent()) {
HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
long fileSize = baseFile.getFileSize();
- buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize,
baseFile.getStoragePath());
+ buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize,
baseFile.getStoragePath(), true);
}
// Handle log files
fileSlice.getLogFiles().forEach(logFile -> {
long fileSize = logFile.getFileSize();
- buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize,
logFile.getPath());
+ buildColumnRangeMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, columnRangeMetadataList, fileSize,
logFile.getPath(), false);
});
}
return createColumnStatsRecords(partition, columnRangeMetadataList, false,
functionalIndex.getIndexName(),
COLUMN_STATS.getRecordType()).collect(Collectors.toList());
@@ -103,12 +107,12 @@ public class SparkMetadataWriterUtils {
if (fileSlice.getBaseFile().isPresent()) {
HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
buildBloomFilterMetadata(metaClient, readerSchema, functionalIndex,
columnToIndex, sqlContext, bloomFilterMetadataList, baseFile.getStoragePath(),
metadataWriteConfig, partition,
- baseFile.getCommitTime());
+ baseFile.getCommitTime(), true);
}
// Handle log files
fileSlice.getLogFiles().forEach(
logFile -> buildBloomFilterMetadata(metaClient, readerSchema,
functionalIndex, columnToIndex, sqlContext, bloomFilterMetadataList,
logFile.getPath(), metadataWriteConfig, partition,
- logFile.getDeltaCommitTime()));
+ logFile.getDeltaCommitTime(), false));
}
return bloomFilterMetadataList;
}
@@ -120,9 +124,10 @@ public class SparkMetadataWriterUtils {
SQLContext sqlContext,
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadataList,
long fileSize,
- StoragePath filePath) {
- Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath},
sqlContext, metaClient, readerSchema);
- Column indexedColumn =
functionalIndex.apply(Arrays.asList(fileDf.col(columnToIndex)));
+ StoragePath filePath,
+ boolean isBaseFile) {
+ Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath},
sqlContext, metaClient, readerSchema, isBaseFile);
+ Column indexedColumn =
functionalIndex.apply(Collections.singletonList(fileDf.col(columnToIndex)));
fileDf = fileDf.withColumn(columnToIndex, indexedColumn);
HoodieColumnRangeMetadata<Comparable> columnRangeMetadata =
computeColumnRangeMetadata(fileDf, columnToIndex, filePath.toString(),
fileSize);
columnRangeMetadataList.add(columnRangeMetadata);
@@ -137,9 +142,10 @@ public class SparkMetadataWriterUtils {
StoragePath filePath,
HoodieWriteConfig writeConfig,
String partitionName,
- String instantTime) {
- Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath},
sqlContext, metaClient, readerSchema);
- Column indexedColumn =
functionalIndex.apply(Arrays.asList(fileDf.col(columnToIndex)));
+ String instantTime,
+ boolean isBaseFile) {
+ Dataset<Row> fileDf = readRecordsAsRow(new StoragePath[] {filePath},
sqlContext, metaClient, readerSchema, isBaseFile);
+ Column indexedColumn =
functionalIndex.apply(Collections.singletonList(fileDf.col(columnToIndex)));
fileDf = fileDf.withColumn(columnToIndex, indexedColumn);
BloomFilter bloomFilter =
HoodieFileWriterFactory.createBloomFilter(writeConfig);
fileDf.foreach(row -> {
@@ -151,20 +157,53 @@ public class SparkMetadataWriterUtils {
}
private static Dataset<Row> readRecordsAsRow(StoragePath[] paths, SQLContext
sqlContext,
- HoodieTableMetaClient
metaClient, Schema schema) {
- String readPathString =
- String.join(",",
Arrays.stream(paths).map(StoragePath::toString).toArray(String[]::new));
- String globPathString = String.join(",",
Arrays.stream(paths).map(StoragePath::getParent).map(StoragePath::toString).distinct().toArray(String[]::new));
- HashMap<String, String> params = new HashMap<>();
- params.put(QUERY_TYPE_CONFIG, QUERY_TYPE_SNAPSHOT);
- params.put(READ_PATHS_CONFIG, readPathString);
- // Building HoodieFileIndex needs this param to decide query path
- params.put(GLOB_PATHS_CONFIG, globPathString);
- // Let Hudi relations to fetch the schema from the table itself
- BaseRelation relation = SparkAdapterSupport$.MODULE$.sparkAdapter()
- .createRelation(sqlContext, metaClient, schema, paths, params);
-
- return dropMetaFields(sqlContext.baseRelationToDataFrame(relation));
+ HoodieTableMetaClient
metaClient, Schema schema,
+ boolean isBaseFile) {
+ List<HoodieRecord> records = isBaseFile ? getBaseFileRecords(new
HoodieBaseFile(paths[0].toString()), metaClient, schema)
+ :
getUnmergedLogFileRecords(Arrays.stream(paths).map(StoragePath::toString).collect(Collectors.toList()),
metaClient, schema);
+ return dropMetaFields(toDataset(records, schema, sqlContext));
+ }
+
+ private static List<HoodieRecord> getUnmergedLogFileRecords(List<String>
logFilePaths, HoodieTableMetaClient metaClient, Schema readerSchema) {
+ List<HoodieRecord> records = new ArrayList<>();
+ HoodieUnMergedLogRecordScanner scanner =
HoodieUnMergedLogRecordScanner.newBuilder()
+ .withStorage(metaClient.getStorage())
+ .withBasePath(metaClient.getBasePath())
+ .withLogFilePaths(logFilePaths)
+ .withBufferSize(MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
+
.withLatestInstantTime(metaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().getTimestamp())
+ .withReaderSchema(readerSchema)
+ .withTableMetaClient(metaClient)
+ .withLogRecordScannerCallback(records::add)
+ .build();
+ scanner.scan(false);
+ return records;
+ }
+
+ private static List<HoodieRecord> getBaseFileRecords(HoodieBaseFile
baseFile, HoodieTableMetaClient metaClient, Schema readerSchema) {
+ List<HoodieRecord> records = new ArrayList<>();
+ HoodieRecordMerger recordMerger =
+
HoodieRecordUtils.createRecordMerger(metaClient.getBasePath().toString(),
EngineType.SPARK, Collections.emptyList(),
metaClient.getTableConfig().getRecordMergerStrategy());
+ try (HoodieFileReader baseFileReader =
HoodieIOFactory.getIOFactory(metaClient.getStorage()).getReaderFactory(recordMerger.getRecordType())
+ .getFileReader(getReaderConfigs(metaClient.getStorageConf()),
baseFile.getStoragePath())) {
+ baseFileReader.getRecordIterator(readerSchema).forEachRemaining((record)
-> records.add((HoodieRecord) record));
+ return records;
+ } catch (IOException e) {
+ throw new HoodieIOException("Error reading base file " +
baseFile.getFileName(), e);
+ }
+ }
+
+ private static Dataset<Row> toDataset(List<HoodieRecord> records, Schema
schema, SQLContext sqlContext) {
+ List<GenericRecord> avroRecords = records.stream()
+ .map(r -> (GenericRecord) r.getData())
+ .collect(Collectors.toList());
+ if (avroRecords.isEmpty()) {
+ return sqlContext.emptyDataFrame().toDF();
+ }
+ try (JavaSparkContext jsc = new
JavaSparkContext(sqlContext.sparkContext())) {
+ JavaRDD<GenericRecord> javaRDD = jsc.parallelize(avroRecords);
+ return AvroConversionUtils.createDataFrame(javaRDD.rdd(),
schema.toString(), sqlContext.sparkSession());
+ }
}
private static <T extends Comparable<T>>
HoodieColumnRangeMetadata<Comparable> computeColumnRangeMetadata(Dataset<Row>
rowDataset,