nsivabalan commented on code in PR #12214:
URL: https://github.com/apache/hudi/pull/12214#discussion_r1833514714
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -59,7 +74,21 @@
* This results in two I/O passes over the log file.
*/
@NotThreadSafe
-public class HoodieMergedLogRecordScanner extends
BaseHoodieMergedLogRecordScanner<String> {
+public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
Review Comment:
why do we have to make this extend from AbstractHoodieLogRecordReader
instead of BaseHoodieMergedLogRecordScanner
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -620,15 +622,78 @@ public static HoodieRecord<HoodieMetadataPayload>
createRecordIndexUpdate(String
* @param secondaryKey Secondary key of the record
* @param isDeleted true if this record is deleted
*/
- public static HoodieRecord<HoodieMetadataPayload>
createSecondaryIndex(String recordKey, String secondaryKey, String
partitionPath, Boolean isDeleted) {
-
- HoodieKey key = new HoodieKey(secondaryKey, partitionPath);
- HoodieMetadataPayload payload = new HoodieMetadataPayload(secondaryKey,
new HoodieSecondaryIndexInfo(recordKey, isDeleted));
+ public static HoodieRecord<HoodieMetadataPayload>
createSecondaryIndexRecord(String recordKey, String secondaryKey, String
partitionPath, Boolean isDeleted) {
+ // the payload key is in the format of "secondaryKey$primaryKey"
+ HoodieKey key = new HoodieKey(constructSecondaryIndexKey(secondaryKey,
recordKey), partitionPath);
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), new
HoodieSecondaryIndexInfo(isDeleted));
return new HoodieAvroRecord<>(key, payload);
}
- public String getRecordKeyFromSecondaryIndex() {
- return secondaryIndexMetadata.getRecordKey();
+ public static String getRecordKeyFromSecondaryIndexKey(String key) {
+ // the payload key is in the format of "secondaryKey$primaryKey"
+ // we need to extract the primary key from the payload key
+ checkState(nonEmpty(key) &&
key.contains(SECONDARY_INDEX_RECORD_KEY_SEPARATOR), "Invalid key format for
secondary index payload: " + key);
+ int delimiterIndex = getSecondaryIndexKeySeparatorPosition(key);
+ return unescapeSpecialChars(key.substring(delimiterIndex + 1));
+ }
+
+ public static String getSecondaryKeyFromSecondaryIndexKey(String key) {
+ // the payload key is in the format of "secondaryKey$primaryKey"
+ // we need to extract the secondary key from the payload key
+ checkState(nonEmpty(key) &&
key.contains(SECONDARY_INDEX_RECORD_KEY_SEPARATOR), "Invalid key format for
secondary index payload: " + key);
Review Comment:
do we need the nonEmpty() check for every key. we can't store empty keys
into storage right. So, why keep checking for empty on every lookup.
in fact, here we are checking twice (once for fetching primary key and once
for fetching sec key)
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -1055,17 +914,15 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>>
fetchBaseFileAllRecords
return Collections.emptyMap();
}
- ClosableIterator<HoodieRecord<?>> records = reader.getRecordIterator();
+ ClosableIterator<HoodieRecord<?>> recordIterator =
reader.getRecordIterator();
Review Comment:
can we rename fetchBaseFileAllRecordsByPayload to
`fetchBaseFileAllRecordsByPayloadForSecIndex`
to call out that this is some special handling just for SI.
I do not want someone else to re-use this method for something else down the
line
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -620,15 +622,78 @@ public static HoodieRecord<HoodieMetadataPayload>
createRecordIndexUpdate(String
* @param secondaryKey Secondary key of the record
* @param isDeleted true if this record is deleted
*/
- public static HoodieRecord<HoodieMetadataPayload>
createSecondaryIndex(String recordKey, String secondaryKey, String
partitionPath, Boolean isDeleted) {
-
- HoodieKey key = new HoodieKey(secondaryKey, partitionPath);
- HoodieMetadataPayload payload = new HoodieMetadataPayload(secondaryKey,
new HoodieSecondaryIndexInfo(recordKey, isDeleted));
+ public static HoodieRecord<HoodieMetadataPayload>
createSecondaryIndexRecord(String recordKey, String secondaryKey, String
partitionPath, Boolean isDeleted) {
+ // the payload key is in the format of "secondaryKey$primaryKey"
+ HoodieKey key = new HoodieKey(constructSecondaryIndexKey(secondaryKey,
recordKey), partitionPath);
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), new
HoodieSecondaryIndexInfo(isDeleted));
return new HoodieAvroRecord<>(key, payload);
}
- public String getRecordKeyFromSecondaryIndex() {
- return secondaryIndexMetadata.getRecordKey();
+ public static String getRecordKeyFromSecondaryIndexKey(String key) {
+ // the payload key is in the format of "secondaryKey$primaryKey"
+ // we need to extract the primary key from the payload key
+ checkState(nonEmpty(key) &&
key.contains(SECONDARY_INDEX_RECORD_KEY_SEPARATOR), "Invalid key format for
secondary index payload: " + key);
+ int delimiterIndex = getSecondaryIndexKeySeparatorPosition(key);
+ return unescapeSpecialChars(key.substring(delimiterIndex + 1));
+ }
+
+ public static String getSecondaryKeyFromSecondaryIndexKey(String key) {
+ // the payload key is in the format of "secondaryKey$primaryKey"
+ // we need to extract the secondary key from the payload key
+ checkState(nonEmpty(key) &&
key.contains(SECONDARY_INDEX_RECORD_KEY_SEPARATOR), "Invalid key format for
secondary index payload: " + key);
+ int delimiterIndex = getSecondaryIndexKeySeparatorPosition(key);
+ return unescapeSpecialChars(key.substring(0, delimiterIndex));
+ }
+
+ static String constructSecondaryIndexKey(String secondaryKey, String
recordKey) {
+ return escapeSpecialChars(secondaryKey) +
SECONDARY_INDEX_RECORD_KEY_SEPARATOR + escapeSpecialChars(recordKey);
+ }
+
+ private static String escapeSpecialChars(String str) {
+ StringBuilder escaped = new StringBuilder();
+ for (char c : str.toCharArray()) {
+ if (c == '\\' || c == '$') {
+ escaped.append('\\'); // Add escape character
+ }
+ escaped.append(c); // Add the actual character
+ }
+ return escaped.toString();
+ }
+
+ private static int getSecondaryIndexKeySeparatorPosition(String key) {
+ int delimiterIndex = -1;
+ boolean isEscape = false;
+
+ // Find the delimiter index while skipping escaped $
+ for (int i = 0; i < key.length(); i++) {
+ char c = key.charAt(i);
+ if (c == '\\' && !isEscape) {
+ isEscape = true;
+ } else if (c == '$' && !isEscape) {
+ delimiterIndex = i;
+ break;
+ } else {
+ isEscape = false;
+ }
+ }
+ checkState(delimiterIndex != -1, "Invalid encoded key format");
+ return delimiterIndex;
+ }
+
+ private static String unescapeSpecialChars(String str) {
Review Comment:
can we move all of secondary index utils to SecondaryIndexKeyUtils class
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -59,7 +74,21 @@
* This results in two I/O passes over the log file.
*/
@NotThreadSafe
-public class HoodieMergedLogRecordScanner extends
BaseHoodieMergedLogRecordScanner<String> {
+public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
Review Comment:
oh I see it. you are removing BaseHoodieMergedLogRecordScanner
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2042,30 +2043,66 @@ private static ClosableIterator<HoodieRecord>
createSecondaryIndexGenerator(Hood
Option.empty());
ClosableIterator<HoodieRecord> fileSliceIterator =
ClosableIterator.wrap(fileSliceReader);
return new ClosableIterator<HoodieRecord>() {
+ private HoodieRecord nextValidRecord;
+
@Override
public void close() {
fileSliceIterator.close();
}
@Override
public boolean hasNext() {
- return fileSliceIterator.hasNext();
+ // As part of hasNext() we try to find the valid non-delete record
that has a secondary key.
+ if (nextValidRecord != null) {
+ return true;
+ }
+
+ // Secondary key is null when there is a delete record, and we only
have the record key.
+ // This can happen when the record is deleted in the log file.
+ // In this case, we need not index the record because for the given
record key,
+ // we have already prepared the delete record before reaching this
point.
+ // NOTE: Delete record should not happen when initializing the
secondary index i.e. when called from readSecondaryKeysFromFileSlices,
+ // because from that call, we get the merged records as of some
committed instant. So, delete records must have been filtered out.
+ // Loop to find the next valid record or exhaust the iterator.
+ while (fileSliceIterator.hasNext()) {
+ HoodieRecord record = fileSliceIterator.next();
+ String secondaryKey = getSecondaryKey(record);
+ if (secondaryKey != null) {
+ nextValidRecord = HoodieMetadataPayload.createSecondaryIndexRecord(
+ record.getRecordKey(tableSchema,
HoodieRecord.RECORD_KEY_METADATA_FIELD),
+ secondaryKey,
+ indexDefinition.getIndexName(),
+ false
+ );
+ return true;
+ }
+ }
+
+ // If no valid records are found
+ return false;
}
@Override
public HoodieRecord next() {
- HoodieRecord record = fileSliceIterator.next();
- String recordKey = record.getRecordKey(tableSchema,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
- String secondaryKeyFields = String.join(".",
indexDefinition.getSourceFields());
- String secondaryKey;
+ if (!hasNext()) {
+ throw new NoSuchElementException("No more valid records available.");
+ }
+ HoodieRecord result = nextValidRecord;
+ nextValidRecord = null; // Reset for the next call
+ return result;
+ }
+
+ private String getSecondaryKey(HoodieRecord record) {
try {
- GenericRecord genericRecord = (GenericRecord)
(record.toIndexedRecord(tableSchema,
CollectionUtils.emptyProps()).get()).getData();
- secondaryKey =
HoodieAvroUtils.getNestedFieldValAsString(genericRecord, secondaryKeyFields,
true, false);
+ if (record.toIndexedRecord(tableSchema,
CollectionUtils.emptyProps()).isPresent()) {
Review Comment:
don't have a easier way to figure out if a record is deleted or not?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2042,30 +2043,66 @@ private static ClosableIterator<HoodieRecord>
createSecondaryIndexGenerator(Hood
Option.empty());
ClosableIterator<HoodieRecord> fileSliceIterator =
ClosableIterator.wrap(fileSliceReader);
return new ClosableIterator<HoodieRecord>() {
+ private HoodieRecord nextValidRecord;
+
@Override
public void close() {
fileSliceIterator.close();
}
@Override
public boolean hasNext() {
- return fileSliceIterator.hasNext();
+ // As part of hasNext() we try to find the valid non-delete record
that has a secondary key.
+ if (nextValidRecord != null) {
+ return true;
+ }
+
+ // Secondary key is null when there is a delete record, and we only
have the record key.
+ // This can happen when the record is deleted in the log file.
+ // In this case, we need not index the record because for the given
record key,
+ // we have already prepared the delete record before reaching this
point.
+ // NOTE: Delete record should not happen when initializing the
secondary index i.e. when called from readSecondaryKeysFromFileSlices,
+ // because from that call, we get the merged records as of some
committed instant. So, delete records must have been filtered out.
+ // Loop to find the next valid record or exhaust the iterator.
+ while (fileSliceIterator.hasNext()) {
Review Comment:
are you sure we need this logic?
So, in general, a fileSliceReader can return deleted record too is it? whats
the purpose?
we generally don't return deleted records from say LogRecordReader right.
so, why a fileSliceReader might return a deleted record.
can you help me understand.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -620,15 +622,78 @@ public static HoodieRecord<HoodieMetadataPayload>
createRecordIndexUpdate(String
* @param secondaryKey Secondary key of the record
* @param isDeleted true if this record is deleted
*/
- public static HoodieRecord<HoodieMetadataPayload>
createSecondaryIndex(String recordKey, String secondaryKey, String
partitionPath, Boolean isDeleted) {
-
- HoodieKey key = new HoodieKey(secondaryKey, partitionPath);
- HoodieMetadataPayload payload = new HoodieMetadataPayload(secondaryKey,
new HoodieSecondaryIndexInfo(recordKey, isDeleted));
+ public static HoodieRecord<HoodieMetadataPayload>
createSecondaryIndexRecord(String recordKey, String secondaryKey, String
partitionPath, Boolean isDeleted) {
+ // the payload key is in the format of "secondaryKey$primaryKey"
+ HoodieKey key = new HoodieKey(constructSecondaryIndexKey(secondaryKey,
recordKey), partitionPath);
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), new
HoodieSecondaryIndexInfo(isDeleted));
return new HoodieAvroRecord<>(key, payload);
}
- public String getRecordKeyFromSecondaryIndex() {
- return secondaryIndexMetadata.getRecordKey();
+ public static String getRecordKeyFromSecondaryIndexKey(String key) {
+ // the payload key is in the format of "secondaryKey$primaryKey"
+ // we need to extract the primary key from the payload key
+ checkState(nonEmpty(key) &&
key.contains(SECONDARY_INDEX_RECORD_KEY_SEPARATOR), "Invalid key format for
secondary index payload: " + key);
+ int delimiterIndex = getSecondaryIndexKeySeparatorPosition(key);
+ return unescapeSpecialChars(key.substring(delimiterIndex + 1));
+ }
+
+ public static String getSecondaryKeyFromSecondaryIndexKey(String key) {
Review Comment:
do these need to be public ?
Or feel free to move a Utils class and we can write UTs directly
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -819,8 +821,8 @@ protected Map<String, String>
getSecondaryKeysForRecordKeys(List<String> recordK
// Parallel lookup keys from each file slice
Map<String, String> reverseSecondaryKeyMap = new HashMap<>();
- partitionFileSlices.parallelStream().forEach(partition -> {
- Map<String, String> partialResult =
reverseLookupSecondaryKeys(partitionName, recordKeys, partition);
+ partitionFileSlices.parallelStream().forEach(fileSlice -> {
Review Comment:
no of file slice in secondary index is fixed right. So, why can't we hash
and route deterministically to one file slice?
and so during lookup, for 1 secondary index key value to lookup, we can just
look into 1 file slice rather than all file slices in sec index?
may be you discussed this point during design. Curious to know more details
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]