vinothchandar commented on code in PR #9581:
URL: https://github.com/apache/hudi/pull/9581#discussion_r1323803839
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -173,18 +173,18 @@ public static <R> HoodieRecord<R>
tagRecord(HoodieRecord<R> record, HoodieRecord
*
* @param filePath - File to filter keys from
* @param candidateRecordKeys - Candidate keys to filter
- * @return List of candidate keys that are available in the file
+ * @return List of pairs of candidate keys and positions that are available
in the file
*/
- public static List<String> filterKeysFromFile(Path filePath, List<String>
candidateRecordKeys,
- Configuration configuration)
throws HoodieIndexException {
+ public static List<Pair<String, Long>> filterKeysFromFile(Path filePath,
List<String> candidateRecordKeys,
+ Configuration
configuration) throws HoodieIndexException {
ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
- List<String> foundRecordKeys = new ArrayList<>();
+ List<Pair<String, Long>> foundRecordKeys = new ArrayList<>();
try (HoodieFileReader fileReader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(configuration, filePath)) {
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
HoodieTimer timer = HoodieTimer.start();
- Set<String> fileRowKeys = fileReader.filterRowKeys(new
TreeSet<>(candidateRecordKeys));
+ Set<Pair<String, Long>> fileRowKeys = fileReader.filterRowKeys(new
TreeSet<>(candidateRecordKeys));
Review Comment:
Use `TreeSet` here as well. does the ordering inside `fileRowKeys` matter?
if so, then better use `SortedSet` here. or a list?
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java:
##########
@@ -106,6 +106,15 @@ public class HoodieStorageConfig extends HoodieConfig {
+ "to be appended to a log file. This helps to make sure the data
appended to the log file is broken up "
+ "into sizable blocks to prevent from OOM errors. This size should
be greater than the JVM memory.");
+ public static final ConfigProperty<Boolean> LOGFILE_WRITE_RECORD_POSITIONS =
ConfigProperty
Review Comment:
this is not a storage config? i.e its about a higher layer functionality
around updates. Move this to where the core write configs are? and call it
`WRITE_RECORD_POSITIONS`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -89,10 +90,11 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE = 100;
protected final String fileId;
+ private final boolean writeRecordPositions;
Review Comment:
I like this better than even LOG_RECORD_POSITIONS as the config name/
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -89,10 +90,11 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE = 100;
protected final String fileId;
+ private final boolean writeRecordPositions;
// Buffer for holding records in memory before they are flushed to disk
private final List<HoodieRecord> recordList = new ArrayList<>();
// Buffer for holding records (to be deleted) in memory before they are
flushed to disk
- private final List<DeleteRecord> recordsToDelete = new ArrayList<>();
+ private final List<Pair<DeleteRecord, Long>> recordsToDelete = new
ArrayList<>();
Review Comment:
rename variable.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -461,11 +464,13 @@ protected void
appendDataAndDeleteBlocks(Map<HeaderMetadataType, String> header,
? HoodieRecord.RECORD_KEY_METADATA_FIELD
:
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
- blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList,
getUpdatedHeader(header, blockSequenceNumber++,
taskContextSupplier.getAttemptIdSupplier().get()), keyField));
+ blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList,
writeRecordPositions,
Review Comment:
name booleans with either a `should` or `is` . e,g
`shouldWriteRecordPositions`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java:
##########
@@ -51,26 +51,28 @@ public HoodieKeyLocationFetchHandle(HoodieWriteConfig
config, HoodieTable<T, I,
this.keyGeneratorOpt = keyGeneratorOpt;
}
- private List<HoodieKey> fetchHoodieKeys(HoodieBaseFile baseFile) {
+ private List<Pair<HoodieKey, Long>>
fetchHoodieKeysAndPositions(HoodieBaseFile baseFile) {
Review Comment:
while you are at it. rename: `fetchRecordKeysWithPositions`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -2029,6 +2029,10 @@ public long getLogFileDataBlockMaxSize() {
return getLong(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE);
}
+ public boolean shouldLogFileWriteRecordPositions() {
+ return getBoolean(HoodieStorageConfig.LOGFILE_WRITE_RECORD_POSITIONS);
Review Comment:
This is more of a core write config, than it is a log file config. Can we
uplevel to "LOG_RECORD_POSITIONS" i.e should we log record positions for
updates/deletes or not.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java:
##########
@@ -70,14 +75,31 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
* NOTE: This ctor is used on the write-path (ie when records ought to be
written into the log)
*/
public HoodieDataBlock(List<HoodieRecord> records,
+ boolean writeRecordPositions,
Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer,
String keyFieldName) {
super(header, footer, Option.empty(), Option.empty(), null, false);
+ if (writeRecordPositions) {
+ records.sort((o1, o2) -> {
Review Comment:
+1 valid scenarios to think through. lets also consider clustering and other
scenarios where the record position is accurate.?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java:
##########
@@ -65,17 +69,44 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
private static final Lazy<HoodieDeleteRecord.Builder>
HOODIE_DELETE_RECORD_BUILDER_STUB =
Lazy.lazily(HoodieDeleteRecord::newBuilder);
+ private final boolean writeRecordPositions;
+ // Records to delete, sorted based on the record position if writing record
position to the log block header
private DeleteRecord[] recordsToDelete;
- public HoodieDeleteBlock(DeleteRecord[] recordsToDelete,
Map<HeaderMetadataType, String> header) {
- this(Option.empty(), null, false, Option.empty(), header, new HashMap<>());
- this.recordsToDelete = recordsToDelete;
+ public HoodieDeleteBlock(List<Pair<DeleteRecord, Long>> recordsToDelete,
+ boolean writeRecordPositions,
+ Map<HeaderMetadataType, String> header) {
+ this(Option.empty(), null, false, Option.empty(), header, new HashMap<>(),
writeRecordPositions);
+ if (writeRecordPositions) {
+ recordsToDelete.sort((o1, o2) -> {
+ long v1 = o1.getRight();
+ long v2 = o2.getRight();
+ return Long.compare(v1, v2);
+ });
+ if (recordsToDelete.get(0).getRight() > -1L) {
Review Comment:
if you can to check if its not invalid, can we use the constant you define
before and call a helper like HoodieRecordLocation.isValidPosition() or sth?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]