vinothchandar commented on code in PR #10625:
URL: https://github.com/apache/hudi/pull/10625#discussion_r1504979741
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -223,6 +228,11 @@ private void enablePartitions() {
}
if (dataWriteConfig.isRecordIndexEnabled() ||
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(RECORD_INDEX)) {
this.enabledPartitionTypes.add(RECORD_INDEX);
+
+ // Enable secondary index only iff record index is enabled
+ if (dataWriteConfig.isSecondaryIndexEnabled() ||
dataMetaClient.getTableConfig().isMetadataPartitionAvailable(SECONDARY_INDEX)) {
+ this.enabledPartitionTypes.add(SECONDARY_INDEX);
Review Comment:
nts: rename `SECONDARY_INDEX` ? technically the record index itself is an
unique secondary index. This is just a non-unique secondary index.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -546,6 +562,39 @@ private HoodieFunctionalIndexDefinition
getFunctionalIndexDefinition(String inde
}
}
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeSecondaryIndexPartition() throws IOException {
+ HoodieMetadataFileSystemView fsView = new
HoodieMetadataFileSystemView(dataMetaClient,
dataMetaClient.getActiveTimeline(), metadata);
+ // Collect the list of latest file slices present in each partition
+ List<String> partitions = metadata.getAllPartitionPaths();
+ fsView.loadAllPartitions();
+
+ List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
+ partitions.forEach(partition -> {
Review Comment:
would n't this take a long time, if done on the driver?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -131,6 +131,12 @@ public class HoodieTableConfig extends HoodieConfig {
.withDocumentation("Columns used to uniquely identify the table.
Concatenated values of these fields are used as "
+ " the record key component of HoodieKey.");
+ public static final ConfigProperty<String> SECONDARYKEY_FIELDS =
ConfigProperty
Review Comment:
I'd like for this to be absorbed into how we are tracking functional index
flow and not introduce any specific table configs here.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -111,6 +123,16 @@ private HoodieMergedLogRecordScanner(FileSystem fs, String
basePath, List<String
// Store merged records for all versions for this log file, set the
in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes,
spillableMapBasePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(readerSchema), diskMapType,
isBitCaskDiskMapCompressionEnabled);
+ this.nonUniqueKeyRecords = new
ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new
DefaultSizeEstimator(),
+ new HoodieRecordSizeEstimator(readerSchema), diskMapType,
isBitCaskDiskMapCompressionEnabled);
+
+ if (logFilePaths.size() > 0 &&
HoodieTableMetadata.isMetadataTableSecondaryIndexPartition(basePath,
partitionName)) {
Review Comment:
this layer cannot be aware of metadata table.
##########
hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java:
##########
@@ -54,6 +54,13 @@ public class KeyGeneratorOptions extends HoodieConfig {
+ "Actual value will be obtained by invoking .toString() on the
field value. Nested fields can be specified using\n"
+ "the dot notation eg: `a.b.c`");
+ public static final ConfigProperty<String> SECONDARYKEY_FIELD_NAME =
ConfigProperty
Review Comment:
+1
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -2493,6 +2493,10 @@ public boolean isLogCompactionEnabledOnMetadata() {
return
getBoolean(HoodieMetadataConfig.ENABLE_LOG_COMPACTION_ON_METADATA_TABLE);
}
+ public boolean isSecondaryIndexEnabled() {
Review Comment:
nts: revisit if this is needed or we need to consolidate using the record
level index flag
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -546,6 +562,39 @@ private HoodieFunctionalIndexDefinition
getFunctionalIndexDefinition(String inde
}
}
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeSecondaryIndexPartition() throws IOException {
+ HoodieMetadataFileSystemView fsView = new
HoodieMetadataFileSystemView(dataMetaClient,
dataMetaClient.getActiveTimeline(), metadata);
+ // Collect the list of latest file slices present in each partition
+ List<String> partitions = metadata.getAllPartitionPaths();
+ fsView.loadAllPartitions();
+
+ List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
+ partitions.forEach(partition -> {
+ fsView.getLatestFileSlices(partition).forEach(fs -> {
+ partitionFileSlicePairs.add(Pair.of(partition, fs));
+ });
+ });
+
+ // Reuse record index parallelism config to build secondary index
+ int parallelism = Math.min(partitionFileSlicePairs.size(),
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+
+ HoodieData<HoodieRecord> records = readSecondaryKeysFromFileSlices(
+ engineContext,
+ partitionFileSlicePairs,
+ parallelism,
+ this.getClass().getSimpleName(),
+ dataMetaClient,
+ EngineType.SPARK);
+
+ // Initialize the file groups - using the same estimation logic as that of
record index
Review Comment:
scope for code reuse?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java:
##########
@@ -63,15 +61,21 @@ public HoodieFileSliceReader(Option<HoodieFileReader>
baseFileReader,
}
this.props = props;
this.simpleKeyGenFieldsOpt = simpleKeyGenFieldsOpt;
- this.records = scanner.getRecords();
}
private boolean hasNextInternal() {
while (baseFileIterator.isPresent() && baseFileIterator.get().hasNext()) {
try {
HoodieRecord currentRecord =
baseFileIterator.get().next().wrapIntoHoodieRecordPayloadWithParams(schema,
props,
simpleKeyGenFieldsOpt, scanner.isWithOperationField(),
scanner.getPartitionNameOverride(), false, Option.empty());
- Option<HoodieRecord> logRecord =
removeLogRecord(currentRecord.getRecordKey());
+
+ if (!scanner.hasKey(currentRecord.getRecordKey())) {
Review Comment:
what is this change achieving?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java:
##########
@@ -216,4 +217,19 @@ protected HoodieData<HoodieRecord>
getFunctionalIndexRecords(List<Pair<String, F
public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, ?>
initializeWriteClient() {
return new SparkRDDWriteClient(engineContext, metadataWriteConfig, true);
}
+
+ @Override
+ public HoodieData<HoodieRecord>
getDeletedSecondaryRecordMapping(HoodieEngineContext engineContext,
Review Comment:
nts: revisit if this is a necessary extension
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -546,6 +562,39 @@ private HoodieFunctionalIndexDefinition
getFunctionalIndexDefinition(String inde
}
}
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeSecondaryIndexPartition() throws IOException {
+ HoodieMetadataFileSystemView fsView = new
HoodieMetadataFileSystemView(dataMetaClient,
dataMetaClient.getActiveTimeline(), metadata);
+ // Collect the list of latest file slices present in each partition
+ List<String> partitions = metadata.getAllPartitionPaths();
+ fsView.loadAllPartitions();
+
+ List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
+ partitions.forEach(partition -> {
+ fsView.getLatestFileSlices(partition).forEach(fs -> {
+ partitionFileSlicePairs.add(Pair.of(partition, fs));
+ });
+ });
+
+ // Reuse record index parallelism config to build secondary index
+ int parallelism = Math.min(partitionFileSlicePairs.size(),
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+
+ HoodieData<HoodieRecord> records = readSecondaryKeysFromFileSlices(
+ engineContext,
+ partitionFileSlicePairs,
+ parallelism,
+ this.getClass().getSimpleName(),
+ dataMetaClient,
+ EngineType.SPARK);
+
+ // Initialize the file groups - using the same estimation logic as that of
record index
+ final int fileGroupCount =
HoodieTableMetadataUtil.estimateFileGroupCount(RECORD_INDEX, records.count(),
Review Comment:
this is going to compute `records` twice? no?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/LogFileIterator.java:
##########
@@ -24,25 +24,29 @@
import org.apache.hudi.common.util.collection.CachingIterator;
import java.util.Iterator;
-import java.util.Map;
public class LogFileIterator<T> extends CachingIterator<HoodieRecord<T>> {
HoodieMergedLogRecordScanner scanner;
- Map<String, HoodieRecord> records;
+ // Map<String, HoodieRecord> records;
Iterator<HoodieRecord> iterator;
- protected Option<HoodieRecord> removeLogRecord(String key) {
- return Option.ofNullable(records.remove(key));
+ // protected Option<HoodieRecord> removeLogRecord(String key) {
+ // return Option.ofNullable(records.remove(key));
+ // }
+
+ protected Option<HoodieRecord> removeLogRecord(HoodieRecord record) {
+ return scanner.remove(record);
}
public LogFileIterator(HoodieMergedLogRecordScanner scanner) {
this.scanner = scanner;
- this.records = scanner.getRecords();
+ // this.records = scanner.getRecords();
}
private boolean hasNextInternal() {
if (iterator == null) {
- iterator = records.values().iterator();
+ // iterator = records.values().iterator();
Review Comment:
remove?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -82,6 +88,9 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
public final HoodieTimer timer = HoodieTimer.create();
// Map of compacted/merged records
private final ExternalSpillableMap<String, HoodieRecord> records;
+
+ private final ExternalSpillableMap<String, HashMap<String, HoodieRecord>>
nonUniqueKeyRecords;
Review Comment:
serializing the hashmap to spillable disk - may not be efficient? or is that
okay? I wonder if we need another holder object.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -269,6 +350,68 @@ public <T> void processNextRecord(HoodieRecord<T>
newRecord) throws IOException
}
}
+ private <T> void processNextNonUniqueKeyRecord(HoodieRecord<T> newRecord)
throws IOException {
Review Comment:
I think we should probably subclass /find some way to avoid having this code
right inside this class?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -269,6 +289,68 @@ public <T> void processNextRecord(HoodieRecord<T>
newRecord) throws IOException
}
}
+ private <T> void processNextNonUniqueKeyRecord(HoodieRecord<T> newRecord)
throws IOException {
+ String key = newRecord.getRecordKey();
+ HoodieMetadataPayload newPayload = (HoodieMetadataPayload)
newRecord.getData();
+
+ // The rules for merging the prevRecord and the latestRecord is noted
below. Note that this only applies for SecondaryIndex
Review Comment:
hmmm. we should be using the combineAndGet... method of the MetadataPayload?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1005,6 +1056,73 @@ private HoodieData<HoodieRecord>
getFunctionalIndexUpdates(HoodieCommitMetadata
return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition,
dataMetaClient, parallelism, readerSchema, hadoopConf);
}
+ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+ dataMetaClient.getTableConfig().getMetadataPartitions()
+ .stream()
+ .filter(partition ->
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+ .forEach(partition -> {
+ HoodieData<HoodieRecord> secondaryIndexRecords;
+ try {
+ secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata,
writeStatus);
+ } catch (Exception e) {
+ throw new HoodieMetadataException("Failed to get secondary index
updates for partition " + partition, e);
+ }
+ partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
+ });
+ }
+
+ private HoodieData<HoodieRecord>
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata,
HoodieData<WriteStatus> writeStatus) throws Exception {
+ // Build a list of basefiles+delta-log-files for every partition that this
commit touches
+ // {
+ // {
+ // "partition1", { {"baseFile11", {"logFile11", "logFile12"}},
{"baseFile12", {"logFile11"} } },
+ // },
+ // {
+ // "partition2", { {"baseFile21", {"logFile21", "logFile22"}},
{"baseFile22", {"logFile21"} } }
+ // }
+ // }
+ List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = new
ArrayList<>();
+ commitMetadata.getPartitionToWriteStats().forEach((dataPartition,
writeStats) -> {
+ writeStats.forEach(writeStat -> {
+ if (writeStat instanceof HoodieDeltaWriteStat) {
+ partitionFilePairs.add(Pair.of(dataPartition,
Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(),
((HoodieDeltaWriteStat) writeStat).getLogFiles())));
+ } else {
+ partitionFilePairs.add(Pair.of(dataPartition,
Pair.of(writeStat.getPath(), new ArrayList<>())));
+ }
+ });
+ });
+
+ // Build a list of keys that need to be removed. A 'delete' record will be
emitted into the respective FileGroup of
+ // the secondary index partition for each of these keys. For a commit
which is deleting/updating a lot of records, this
+ // operation is going to be expensive (in CPU, memory and IO)
+ List<String> keysToRemove = new ArrayList<>();
+ writeStatus.collectAsList().forEach(status -> {
Review Comment:
this collect can OOM right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]