nsivabalan commented on code in PR #12214:
URL: https://github.com/apache/hudi/pull/12214#discussion_r1832064458
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -620,15 +621,25 @@ public static HoodieRecord<HoodieMetadataPayload>
createRecordIndexUpdate(String
* @param secondaryKey Secondary key of the record
* @param isDeleted true if this record is deleted
*/
- public static HoodieRecord<HoodieMetadataPayload>
createSecondaryIndex(String recordKey, String secondaryKey, String
partitionPath, Boolean isDeleted) {
-
- HoodieKey key = new HoodieKey(secondaryKey, partitionPath);
- HoodieMetadataPayload payload = new HoodieMetadataPayload(secondaryKey,
new HoodieSecondaryIndexInfo(recordKey, isDeleted));
+ public static HoodieRecord<HoodieMetadataPayload>
createSecondaryIndexRecord(String recordKey, String secondaryKey, String
partitionPath, Boolean isDeleted) {
+ // the payload key is in the format of "secondaryKey_primaryKey"
+ HoodieKey key = new HoodieKey(String.format("%s%s%s",secondaryKey,
SECONDARY_INDEX_RECORD_KEY_SEPARATOR, recordKey), partitionPath);
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), new
HoodieSecondaryIndexInfo(isDeleted));
return new HoodieAvroRecord<>(key, payload);
}
- public String getRecordKeyFromSecondaryIndex() {
- return secondaryIndexMetadata.getRecordKey();
+ public static String getRecordKeyFromSecondaryIndex(String key) {
+ // the payload key is in the format of "secondaryKey$primaryKey"
+ // we need to extract the primary key from the payload key
+ checkState(nonEmpty(key) &&
key.contains(SECONDARY_INDEX_RECORD_KEY_SEPARATOR), "Invalid key format for
secondary index payload: " + key);
+ return key.substring(key.lastIndexOf(SECONDARY_INDEX_RECORD_KEY_SEPARATOR)
+ 1);
+ }
+
+ public static String getSecondaryKeyFromSecondaryIndex(String key) {
Review Comment:
lets name this getSecondaryKeyFromSecondaryIndexKey
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -841,31 +843,41 @@ private Map<String, String>
reverseLookupSecondaryKeys(String partitionName, Lis
Set<String> keySet = new TreeSet<>(recordKeys);
Set<String> deletedRecordsFromLogs = new HashSet<>();
+ // Map of recordKey (primaryKey) -> log record that is not deleted for
all input recordKeys
Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new
HashMap<>();
logRecordScanner.getRecords().forEach(record -> {
+ String recordKey =
getRecordKeyFromSecondaryIndex(record.getRecordKey());
HoodieMetadataPayload payload = record.getData();
- if (!payload.isDeleted()) { // process only valid records.
- String recordKey = payload.getRecordKeyFromSecondaryIndex();
- if (keySet.contains(recordKey)) {
- logRecordsMap.put(recordKey, record);
- }
+ if (!payload.isDeleted() && keySet.contains(recordKey)) { // process
only valid records.
+ logRecordsMap.put(recordKey, record);
} else {
- deletedRecordsFromLogs.add(record.getRecordKey());
+ deletedRecordsFromLogs.add(recordKey);
}
});
// Map of (record-key, secondary-index-record)
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords =
fetchBaseFileAllRecordsByPayload(baseFileReader, keySet, partitionName);
- // Iterate over all provided log-records, merging them into existing
records
- logRecordsMap.forEach((key1, value1) -> baseFileRecords.merge(key1,
value1, (oldRecord, newRecord) -> {
- Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord =
HoodieMetadataPayload.combineSecondaryIndexRecord(oldRecord, newRecord);
- return mergedRecord.orElseGet(null);
- }));
- baseFileRecords.forEach((key, value) -> {
- if (!deletedRecordsFromLogs.contains(key)) {
- recordKeyMap.put(key, value.getRecordKey());
- }
- });
+ if (baseFileRecords.isEmpty()) {
Review Comment:
are there chances that baseFileRecords itself is null? if so, should we also
check for null here?
for eg, if slice contain no base file.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -841,31 +843,41 @@ private Map<String, String>
reverseLookupSecondaryKeys(String partitionName, Lis
Set<String> keySet = new TreeSet<>(recordKeys);
Set<String> deletedRecordsFromLogs = new HashSet<>();
+ // Map of recordKey (primaryKey) -> log record that is not deleted for
all input recordKeys
Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new
HashMap<>();
logRecordScanner.getRecords().forEach(record -> {
+ String recordKey =
getRecordKeyFromSecondaryIndex(record.getRecordKey());
HoodieMetadataPayload payload = record.getData();
- if (!payload.isDeleted()) { // process only valid records.
- String recordKey = payload.getRecordKeyFromSecondaryIndex();
- if (keySet.contains(recordKey)) {
- logRecordsMap.put(recordKey, record);
- }
+ if (!payload.isDeleted() && keySet.contains(recordKey)) { // process
only valid records.
+ logRecordsMap.put(recordKey, record);
} else {
- deletedRecordsFromLogs.add(record.getRecordKey());
+ deletedRecordsFromLogs.add(recordKey);
}
});
// Map of (record-key, secondary-index-record)
Map<String, HoodieRecord<HoodieMetadataPayload>> baseFileRecords =
fetchBaseFileAllRecordsByPayload(baseFileReader, keySet, partitionName);
- // Iterate over all provided log-records, merging them into existing
records
- logRecordsMap.forEach((key1, value1) -> baseFileRecords.merge(key1,
value1, (oldRecord, newRecord) -> {
- Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord =
HoodieMetadataPayload.combineSecondaryIndexRecord(oldRecord, newRecord);
- return mergedRecord.orElseGet(null);
- }));
- baseFileRecords.forEach((key, value) -> {
- if (!deletedRecordsFromLogs.contains(key)) {
- recordKeyMap.put(key, value.getRecordKey());
- }
- });
+ if (baseFileRecords.isEmpty()) {
+ logRecordsMap.forEach((key1, value1) -> {
+ if (!value1.getData().isDeleted()) {
+ recordKeyMap.put(key1,
getSecondaryKeyFromSecondaryIndex(value1.getRecordKey()));
+ }
+ });
+ } else {
+ // Iterate over all provided log-records, merging them into existing
records
+ logRecordsMap.forEach((key1, value1) -> baseFileRecords.merge(key1,
value1, (oldRecord, newRecord) -> {
+ Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord =
HoodieMetadataPayload.combineSecondaryIndexRecord(oldRecord, newRecord);
+ if (!mergedRecord.isPresent() ||
mergedRecord.get().getData().isDeleted()) {
+ System.out.println(">>> mergedRecord is not present or deleted " +
mergedRecord);
Review Comment:
can you remove SOPs
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMetadataMergedLogRecordScanner.java:
##########
@@ -78,24 +78,27 @@ public Map<String, HoodieRecord> getRecords() {
@Override
public <T> void processNextRecord(HoodieRecord<T> newRecord) throws
IOException {
- // Merge the new record with the existing record in the map
- HoodieRecord<T> oldRecord = (HoodieRecord<T>)
records.get(newRecord.getRecordKey());
- if (oldRecord != null) {
- LOG.debug("Merging new record with existing record in the map. Key: {}",
newRecord.getRecordKey());
- recordMerger.fullOuterMerge(oldRecord, readerSchema, newRecord,
readerSchema, this.getPayloadProps()).forEach(
- mergedRecord -> {
- HoodieRecord<T> combinedRecord = mergedRecord.getLeft();
- if (combinedRecord.getData() != oldRecord.getData()) {
- HoodieRecord latestHoodieRecord =
getLatestHoodieRecord(newRecord, combinedRecord, newRecord.getRecordKey());
- records.put(newRecord.getRecordKey(), latestHoodieRecord.copy());
- }
- });
+ String key = newRecord.getRecordKey();
Review Comment:
why separate patch. can we clean it up in this patch itself.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -841,31 +843,41 @@ private Map<String, String>
reverseLookupSecondaryKeys(String partitionName, Lis
Set<String> keySet = new TreeSet<>(recordKeys);
Set<String> deletedRecordsFromLogs = new HashSet<>();
+ // Map of recordKey (primaryKey) -> log record that is not deleted for
all input recordKeys
Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new
HashMap<>();
logRecordScanner.getRecords().forEach(record -> {
+ String recordKey =
getRecordKeyFromSecondaryIndex(record.getRecordKey());
HoodieMetadataPayload payload = record.getData();
- if (!payload.isDeleted()) { // process only valid records.
- String recordKey = payload.getRecordKeyFromSecondaryIndex();
- if (keySet.contains(recordKey)) {
- logRecordsMap.put(recordKey, record);
- }
+ if (!payload.isDeleted() && keySet.contains(recordKey)) { // process
only valid records.
+ logRecordsMap.put(recordKey, record);
} else {
- deletedRecordsFromLogs.add(record.getRecordKey());
+ deletedRecordsFromLogs.add(recordKey);
Review Comment:
we could go into this else block on 2 conditions.
not everytime its a delete record.
can you fix that.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -620,15 +621,25 @@ public static HoodieRecord<HoodieMetadataPayload>
createRecordIndexUpdate(String
* @param secondaryKey Secondary key of the record
* @param isDeleted true if this record is deleted
*/
- public static HoodieRecord<HoodieMetadataPayload>
createSecondaryIndex(String recordKey, String secondaryKey, String
partitionPath, Boolean isDeleted) {
-
- HoodieKey key = new HoodieKey(secondaryKey, partitionPath);
- HoodieMetadataPayload payload = new HoodieMetadataPayload(secondaryKey,
new HoodieSecondaryIndexInfo(recordKey, isDeleted));
+ public static HoodieRecord<HoodieMetadataPayload>
createSecondaryIndexRecord(String recordKey, String secondaryKey, String
partitionPath, Boolean isDeleted) {
+ // the payload key is in the format of "secondaryKey_primaryKey"
+ HoodieKey key = new HoodieKey(String.format("%s%s%s",secondaryKey,
SECONDARY_INDEX_RECORD_KEY_SEPARATOR, recordKey), partitionPath);
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), new
HoodieSecondaryIndexInfo(isDeleted));
return new HoodieAvroRecord<>(key, payload);
}
- public String getRecordKeyFromSecondaryIndex() {
- return secondaryIndexMetadata.getRecordKey();
+ public static String getRecordKeyFromSecondaryIndex(String key) {
Review Comment:
lets name this getRecordKeyFromSecondaryIndexKey
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]