nsivabalan commented on code in PR #12525:
URL: https://github.com/apache/hudi/pull/12525#discussion_r1906234472
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2206,44 +2306,158 @@ public static boolean
validateDataTypeForSecondaryIndex(List<String> sourceField
});
}
- public static HoodieData<HoodieRecord>
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
-
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
- int
secondaryIndexMaxParallelism,
- String
activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
-
HoodieIndexDefinition indexDefinition) {
- if (partitionFiles.isEmpty()) {
- return engineContext.emptyHoodieData();
+ /**
+ * Converts the write stats to secondary index records.
+ *
+ * @param allWriteStats list of write stats
+ * @param instantTime instant time
+ * @param indexDefinition secondary index definition
+ * @param metadataConfig metadata config
+ * @param fsView file system view as of instant time
+ * @param dataMetaClient data table meta client
+ * @param engineContext engine context
+ * @param engineType engine type (e.g. SPARK, FLINK or JAVA)
+ * @return {@link HoodieData} of {@link HoodieRecord} to be updated in the
metadata table for the given secondary index partition
+ */
+ public static HoodieData<HoodieRecord>
convertWriteStatsToSecondaryIndexRecords(List<HoodieWriteStat> allWriteStats,
+
String instantTime,
+
HoodieIndexDefinition indexDefinition,
+
HoodieMetadataConfig metadataConfig,
+
HoodieMetadataFileSystemView fsView,
+
HoodieTableMetaClient dataMetaClient,
+
HoodieEngineContext engineContext,
+
EngineType engineType) {
+ // Secondary index cannot support logs having inserts with current
offering. So, lets validate that.
+ if (allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0;
+ })) {
+ throw new HoodieIOException("Secondary index cannot support logs having
inserts with current offering. Please disable secondary index.");
}
- final int parallelism = Math.min(partitionFiles.size(),
secondaryIndexMaxParallelism);
- final StoragePath basePath = metaClient.getBasePath();
+
Schema tableSchema;
try {
- tableSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+ tableSchema = tryResolveSchemaForTable(dataMetaClient).get();
} catch (Exception e) {
- throw new HoodieException("Failed to get latest schema for " +
metaClient.getBasePath(), e);
- }
-
- engineContext.setJobStatus(activeModule, "Secondary Index: reading
secondary keys from " + partitionFiles.size() + " partitions");
- return engineContext.parallelize(partitionFiles,
parallelism).flatMap(partitionWithBaseAndLogFiles -> {
- final String partition = partitionWithBaseAndLogFiles.getKey();
- final Pair<String, List<String>> baseAndLogFiles =
partitionWithBaseAndLogFiles.getValue();
- List<String> logFilePaths = new ArrayList<>();
- baseAndLogFiles.getValue().forEach(logFile -> logFilePaths.add(basePath
+ StoragePath.SEPARATOR + partition + StoragePath.SEPARATOR + logFile));
- String baseFilePath = baseAndLogFiles.getKey();
- Option<StoragePath> dataFilePath = baseFilePath.isEmpty() ?
Option.empty() : Option.of(FSUtils.constructAbsolutePath(basePath,
baseFilePath));
- Schema readerSchema;
- if (dataFilePath.isPresent()) {
- readerSchema = HoodieIOFactory.getIOFactory(metaClient.getStorage())
-
.getFileFormatUtils(metaClient.getTableConfig().getBaseFileFormat())
- .readAvroSchema(metaClient.getStorage(), dataFilePath.get());
+ throw new HoodieException("Failed to get latest schema for " +
dataMetaClient.getBasePath(), e);
+ }
+ Map<String, List<HoodieWriteStat>> writeStatsByFileId =
allWriteStats.stream().collect(Collectors.groupingBy(HoodieWriteStat::getFileId));
+ int parallelism = Math.max(Math.min(writeStatsByFileId.size(),
metadataConfig.getSecondaryIndexParallelism()), 1);
+
+ return engineContext.parallelize(new
ArrayList<>(writeStatsByFileId.entrySet()),
parallelism).flatMap(writeStatsByFileIdEntry -> {
+ String fileId = writeStatsByFileIdEntry.getKey();
+ List<HoodieWriteStat> writeStats = writeStatsByFileIdEntry.getValue();
+ String partition = writeStats.get(0).getPartitionPath();
+ FileSlice previousFileSliceForFileId =
fsView.getLatestFileSlice(partition, fileId).orElse(null);
+ Map<String, String> recordKeyToSecondaryKeyForPreviousFileSlice;
+ if (previousFileSliceForFileId == null) {
+ // new file slice, so empty mapping for previous slice
+ recordKeyToSecondaryKeyForPreviousFileSlice = Collections.emptyMap();
} else {
- readerSchema = tableSchema;
+ StoragePath previousBaseFile =
previousFileSliceForFileId.getBaseFile().map(HoodieBaseFile::getStoragePath).orElse(null);
+ List<String> logFiles =
+
previousFileSliceForFileId.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(HoodieLogFile::getPath).map(StoragePath::toString).collect(Collectors.toList());
+ recordKeyToSecondaryKeyForPreviousFileSlice =
+ getRecordKeyToSecondaryKey(dataMetaClient, engineType, logFiles,
tableSchema, partition, Option.ofNullable(previousBaseFile), indexDefinition,
instantTime);
}
- return createSecondaryIndexGenerator(metaClient, engineType,
logFilePaths, readerSchema, partition, dataFilePath, indexDefinition,
-
metaClient.getActiveTimeline().getCommitsTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse(""));
+ List<FileSlice> latestIncludingInflightFileSlices =
getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, Option.empty(),
partition);
+ FileSlice currentFileSliceForFileId =
latestIncludingInflightFileSlices.stream().filter(fs ->
fs.getFileId().equals(fileId)).findFirst()
+ .orElseThrow(() -> new HoodieException("Could not find any file
slice for fileId " + fileId));
+ StoragePath currentBaseFile =
currentFileSliceForFileId.getBaseFile().map(HoodieBaseFile::getStoragePath).orElse(null);
+ List<String> logFilesIncludingInflight =
+
currentFileSliceForFileId.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(HoodieLogFile::getPath).map(StoragePath::toString).collect(Collectors.toList());
+ Map<String, String> recordKeyToSecondaryKeyForCurrentFileSlice =
+ getRecordKeyToSecondaryKey(dataMetaClient, engineType,
logFilesIncludingInflight, tableSchema, partition,
Option.ofNullable(currentBaseFile), indexDefinition, instantTime);
+ // Need to find what secondary index record should be deleted, and what
should be inserted.
+ // For each entry in recordKeyToSecondaryKeyForCurrentFileSlice, if it
is not present in recordKeyToSecondaryKeyForPreviousFileSlice, then it should
be inserted.
+ // For each entry in recordKeyToSecondaryKeyForCurrentFileSlice, if it
is present in recordKeyToSecondaryKeyForPreviousFileSlice, then it should be
updated.
+ // For each entry in recordKeyToSecondaryKeyForPreviousFileSlice, if it
is not present in recordKeyToSecondaryKeyForCurrentFileSlice, then it should be
deleted.
+ List<HoodieRecord> records = new ArrayList<>();
+ recordKeyToSecondaryKeyForCurrentFileSlice.forEach((recordKey,
secondaryKey) -> {
+ if
(!recordKeyToSecondaryKeyForPreviousFileSlice.containsKey(recordKey)) {
+ records.add(createSecondaryIndexRecord(recordKey, secondaryKey,
indexDefinition.getIndexName(), false));
+ } else {
+ // delete previous entry if secondaryKey is different
Review Comment:
minor. "delete prev entry and insert new value"
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2206,44 +2306,158 @@ public static boolean
validateDataTypeForSecondaryIndex(List<String> sourceField
});
}
- public static HoodieData<HoodieRecord>
readSecondaryKeysFromBaseFiles(HoodieEngineContext engineContext,
-
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
- int
secondaryIndexMaxParallelism,
- String
activeModule, HoodieTableMetaClient metaClient, EngineType engineType,
-
HoodieIndexDefinition indexDefinition) {
- if (partitionFiles.isEmpty()) {
- return engineContext.emptyHoodieData();
+ /**
+ * Converts the write stats to secondary index records.
+ *
+ * @param allWriteStats list of write stats
+ * @param instantTime instant time
+ * @param indexDefinition secondary index definition
+ * @param metadataConfig metadata config
+ * @param fsView file system view as of instant time
+ * @param dataMetaClient data table meta client
+ * @param engineContext engine context
+ * @param engineType engine type (e.g. SPARK, FLINK or JAVA)
+ * @return {@link HoodieData} of {@link HoodieRecord} to be updated in the
metadata table for the given secondary index partition
+ */
+ public static HoodieData<HoodieRecord>
convertWriteStatsToSecondaryIndexRecords(List<HoodieWriteStat> allWriteStats,
Review Comment:
can we introduce SecondaryRecordGenerationUtils.
This file (HoodieTableMetadataUtil) is becoming larger and larger.
I know we are going to fix it soon, but lets try to keep it concise as much
as possible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]