This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ec466e187ea [HUDI-8654] Add base file instant time of record positions
to the log block header (#12594)
ec466e187ea is described below
commit ec466e187ea7006b5bc565a20c70c91ccfacd37d
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Jan 29 12:00:22 2025 -0800
[HUDI-8654] Add base file instant time of record positions to the log block
header (#12594)
---
.../cli/commands/TestHoodieLogFileCommand.java | 4 +-
.../apache/hudi/client/BaseHoodieWriteClient.java | 20 ------
.../timeline/versioning/v1/TimelineArchiverV1.java | 2 +-
.../org/apache/hudi/io/HoodieAppendHandle.java | 51 +++++++++-----
.../metadata/HoodieBackedTableMetadataWriter.java | 2 +-
.../hudi/metadata/HoodieMetadataWriteUtils.java | 3 +-
.../utils/TestLegacyArchivedMetaEntryReader.java | 2 +-
.../hudi/testutils/HoodieWriteableTestTable.java | 2 +-
.../testutils/HoodieFlinkWriteableTestTable.java | 2 +-
.../apache/hudi/client/SparkRDDWriteClient.java | 4 --
.../hudi/common/config/HoodieReaderConfig.java | 2 +-
.../table/log/block/HoodieAvroDataBlock.java | 5 +-
.../common/table/log/block/HoodieCDCDataBlock.java | 2 +-
.../common/table/log/block/HoodieDataBlock.java | 22 +-----
.../common/table/log/block/HoodieDeleteBlock.java | 18 +----
.../table/log/block/HoodieHFileDataBlock.java | 2 +-
.../common/table/log/block/HoodieLogBlock.java | 60 +++++++++++++++-
.../table/log/block/HoodieParquetDataBlock.java | 3 +-
.../common/table/read/HoodieFileGroupReader.java | 10 +--
.../HoodiePositionBasedFileGroupRecordBuffer.java | 29 ++++++--
.../testutils/reader/HoodieFileSliceTestUtils.java | 31 +++++----
.../hudi/common/util/TestSerializationUtils.java | 2 +-
.../common/functional/TestHoodieLogFormat.java | 79 ++++++++++++++++------
.../TestHoodieLogFormatAppendFailure.java | 2 +-
.../table/log/block/TestHoodieDeleteBlock.java | 2 +-
.../common/testutils/HoodieCommonTestHarness.java | 4 +-
.../hudi/hadoop/testutils/InputFormatTestUtil.java | 6 +-
...stHoodiePositionBasedFileGroupRecordBuffer.java | 48 +++++++++----
...dieSparkMergeOnReadTableInsertUpdateDelete.java | 2 +-
.../hudi/functional/TestMORDataSourceStorage.scala | 61 +++++++++++++----
.../apache/hudi/hive/testutils/HiveTestUtil.java | 4 +-
.../TestHoodieMetadataTableValidator.java | 2 +-
32 files changed, 304 insertions(+), 184 deletions(-)
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index 9fab259df55..f108c46c61b 100644
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -120,7 +120,7 @@ public class TestHoodieLogFileCommand extends
CLIFunctionalTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
getSimpleSchema().toString());
- dataBlock = new HoodieAvroDataBlock(records, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ dataBlock = new HoodieAvroDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
writer.appendBlock(dataBlock);
Map<HoodieLogBlock.HeaderMetadataType, String> rollbackHeader = new
HashMap<>();
@@ -218,7 +218,7 @@ public class TestHoodieLogFileCommand extends
CLIFunctionalTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, false,
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1,
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
writer.appendBlock(dataBlock);
} finally {
if (writer != null) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 82d34c5515d..3e28767b3e6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -42,7 +42,6 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
-import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -1661,23 +1660,4 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
}
});
}
-
- protected final void maybeDisableWriteRecordPositions(HoodieTableMetaClient
metaClient) {
- // Disabling {@link WRITE_RECORD_POSITIONS} in the following two cases for
correctness
- // even if the record positions are enabled in MOR:
- // (1) When there is a pending compaction, the new base files to be
generated by compaction
- // is not available during this transaction. Given the log files in MOR
from a new transaction
- // after a compaction is scheduled can be attached to the base file
generated by the compaction
- // in the latest file slice, the accurate record positions may not be
derived.
- // (2) When NBCC is enabled, the compaction can be scheduled while there
are inflight
- // deltacommits, and unlike OCC, such an inflight deltacommit updating the
same file group
- // under compaction can still be successfully committed. This can also
introduce the
- // correctness problem as (1) that the positions in the log file can be
inaccurate.
- if (config.shouldWriteRecordPositions()
- && config.getTableType() == HoodieTableType.MERGE_ON_READ
- && (config.getWriteConcurrencyMode() ==
WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL
- ||
!metaClient.getActiveTimeline().filterPendingCompactionTimeline().empty())) {
- config.setValue(HoodieWriteConfig.WRITE_RECORD_POSITIONS,
String.valueOf(false));
- }
- }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
index c34b7ccb9de..594825ad9c1 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
@@ -440,7 +440,7 @@ public class TimelineArchiverV1<T extends
HoodieAvroPayload, I, K, O> implements
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
wrapperSchema.toString());
final String keyField =
table.getMetaClient().getTableConfig().getRecordKeyFieldProp();
List<HoodieRecord> indexRecords =
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
- HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, false,
header, keyField);
+ HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords,
header, keyField);
writer.appendBlock(block);
records.clear();
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index e1103a8bd4f..107e0b62ef6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieDeltaWriteStat;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -92,11 +93,12 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
private static final AtomicLong RECORD_COUNTER = new AtomicLong(1);
private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE = 100;
- private final boolean shouldWriteRecordPositions;
// Buffer for holding records in memory before they are flushed to disk
private final List<HoodieRecord> recordList = new ArrayList<>();
// Buffer for holding records (to be deleted), along with their position in
log block, in memory before they are flushed to disk
private final List<Pair<DeleteRecord, Long>> recordsToDeleteWithPositions =
new ArrayList<>();
+ // Base file instant time of the record positions
+ private final Option<String> baseFileInstantTimeOfPositions;
// Incoming records to be written to logs.
protected Iterator<HoodieRecord<T>> recordItr;
// Writer to log into the file group's latest slice.
@@ -158,9 +160,12 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
this.sizeEstimator = new DefaultSizeEstimator();
this.statuses = new ArrayList<>();
this.recordProperties.putAll(config.getProps());
- this.shouldWriteRecordPositions = config.shouldWriteRecordPositions()
+ boolean shouldWriteRecordPositions = config.shouldWriteRecordPositions()
// record positions supported only from table version 8
&&
config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT);
+ this.baseFileInstantTimeOfPositions = shouldWriteRecordPositions
+ ? getBaseFileInstantTimeOfPositions()
+ : Option.empty();
}
public HoodieAppendHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
@@ -168,6 +173,11 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
this(config, instantTime, hoodieTable, partitionPath, fileId, null,
sparkTaskContextSupplier);
}
+ private Option<String> getBaseFileInstantTimeOfPositions() {
+ return hoodieTable.getHoodieView().getLatestBaseFile(partitionPath, fileId)
+ .map(HoodieBaseFile::getCommitTime);
+ }
+
private void populateWriteStat(HoodieRecord record, HoodieDeltaWriteStat
deltaWriteStat) {
HoodieTableVersion tableVersion = hoodieTable.version();
String prevCommit;
@@ -483,13 +493,16 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
? HoodieRecord.RECORD_KEY_METADATA_FIELD
:
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
- blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList,
shouldWriteRecordPositions,
- getUpdatedHeader(header, config), keyField));
+ blocks.add(getDataBlock(config, pickLogDataBlockFormat(), recordList,
+ getUpdatedHeader(header, config, baseFileInstantTimeOfPositions),
+ keyField));
}
if (appendDeleteBlocks && !recordsToDeleteWithPositions.isEmpty()) {
- blocks.add(new HoodieDeleteBlock(recordsToDeleteWithPositions,
shouldWriteRecordPositions,
- getUpdatedHeader(header, config)));
+ blocks.add(new HoodieDeleteBlock(
+ recordsToDeleteWithPositions,
+ getUpdatedHeader(
+ header, config, baseFileInstantTimeOfPositions)));
}
if (!blocks.isEmpty()) {
@@ -618,7 +631,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
LOG.error("Error writing record " + indexedRecord.get(), e);
}
} else {
- long position = shouldWriteRecordPositions ? record.getCurrentPosition()
: -1L;
+ long position = baseFileInstantTimeOfPositions.isPresent() ?
record.getCurrentPosition() : -1L;
recordsToDeleteWithPositions.add(Pair.of(DeleteRecord.create(record.getKey(),
orderingVal), position));
}
numberOfRecords++;
@@ -665,7 +678,8 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
}
private static Map<HeaderMetadataType, String>
getUpdatedHeader(Map<HeaderMetadataType, String> header,
-
HoodieWriteConfig config) {
+
HoodieWriteConfig config,
+
Option<String> baseInstantTimeForPositions) {
Map<HeaderMetadataType, String> updatedHeader = new HashMap<>(header);
if (config.shouldWritePartialUpdates()) {
// When enabling writing partial updates to the data blocks, the
"IS_PARTIAL" flag is also
@@ -674,26 +688,31 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
updatedHeader.put(
HeaderMetadataType.IS_PARTIAL, Boolean.toString(true));
}
+ if (baseInstantTimeForPositions.isPresent()) {
+ updatedHeader.put(
+ HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS,
+ baseInstantTimeForPositions.get());
+ }
return updatedHeader;
}
- private static HoodieLogBlock getBlock(HoodieWriteConfig writeConfig,
- HoodieLogBlock.HoodieLogBlockType
logDataBlockFormat,
- List<HoodieRecord> records,
- boolean shouldWriteRecordPositions,
- Map<HeaderMetadataType, String>
header,
- String keyField) {
+ private static HoodieLogBlock getDataBlock(HoodieWriteConfig writeConfig,
+ HoodieLogBlock.HoodieLogBlockType
logDataBlockFormat,
+ List<HoodieRecord> records,
+ Map<HeaderMetadataType, String>
header,
+ String keyField) {
switch (logDataBlockFormat) {
case AVRO_DATA_BLOCK:
- return new HoodieAvroDataBlock(records, shouldWriteRecordPositions,
header, keyField);
+ return new HoodieAvroDataBlock(records, header, keyField);
case HFILE_DATA_BLOCK:
+ // Not supporting positions in HFile data blocks
+
header.remove(HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
return new HoodieHFileDataBlock(
records, header, writeConfig.getHFileCompressionAlgorithm(), new
StoragePath(writeConfig.getBasePath()),
writeConfig.getBooleanOrDefault(HoodieReaderConfig.USE_NATIVE_HFILE_READER));
case PARQUET_DATA_BLOCK:
return new HoodieParquetDataBlock(
records,
- shouldWriteRecordPositions,
header,
keyField,
writeConfig.getParquetCompressionCodec(),
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 9dafe3bad09..4b7332b5a64 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -948,7 +948,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
try {
final Map<HeaderMetadataType, String> blockHeader =
Collections.singletonMap(HeaderMetadataType.INSTANT_TIME, instantTime);
- final HoodieDeleteBlock block = new
HoodieDeleteBlock(Collections.emptyList(), false, blockHeader);
+ final HoodieDeleteBlock block = new
HoodieDeleteBlock(Collections.emptyList(), blockHeader);
try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.constructAbsolutePath(metadataWriteConfig.getBasePath(),
partitionName))
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index 414bbc3236e..3abdc44e042 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -165,7 +165,8 @@ public class HoodieMetadataWriteUtils {
.withRecordMergeStrategyId(HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID)
.withPayloadConfig(HoodiePayloadConfig.newBuilder()
.withPayloadClass(HoodieMetadataPayload.class.getCanonicalName()).build())
-
.withRecordMergeImplClasses(HoodieAvroRecordMerger.class.getCanonicalName());
+
.withRecordMergeImplClasses(HoodieAvroRecordMerger.class.getCanonicalName())
+ .withWriteRecordPositionsEnabled(false);
// RecordKey properties are needed for the metadata table records
final Properties properties = new Properties();
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
index eb7aa2d35d9..929524d84fe 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
@@ -136,7 +136,7 @@ public class TestLegacyArchivedMetaEntryReader {
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
wrapperSchema.toString());
final String keyField =
metaClient.getTableConfig().getRecordKeyFieldProp();
List<HoodieRecord> indexRecords =
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
- HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, false,
header, keyField);
+ HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords,
header, keyField);
writer.appendBlock(block);
records.clear();
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
index c52617e6a6e..c4633df6a9f 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
@@ -191,7 +191,7 @@ public class HoodieWriteableTestTable extends
HoodieMetadataTestTable {
return null;
}
}).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
- false, header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
return Pair.of(partitionPath, logWriter.getLogFile());
}
}
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
index 9b7f1ef2fb2..07af039be66 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
@@ -153,7 +153,7 @@ public class HoodieFlinkWriteableTestTable extends
HoodieWriteableTestTable {
LOG.warn("Failed to convert record " + r.toString(), e);
return null;
}
- }).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
false, header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ }).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
return Pair.of(partitionPath, logWriter.getLogFile());
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index c0b76892603..38c71f86383 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -127,7 +127,6 @@ public class SparkRDDWriteClient<T> extends
public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String
instantTime) {
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>,
HoodieData<WriteStatus>> table =
initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
- maybeDisableWriteRecordPositions(table.getMetaClient());
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
HoodieWriteMetadata<HoodieData<WriteStatus>> result =
table.upsert(context, instantTime, HoodieJavaRDD.of(records));
@@ -142,7 +141,6 @@ public class SparkRDDWriteClient<T> extends
public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>>
preppedRecords, String instantTime) {
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>,
HoodieData<WriteStatus>> table =
initTable(WriteOperationType.UPSERT_PREPPED,
Option.ofNullable(instantTime));
- maybeDisableWriteRecordPositions(table.getMetaClient());
table.validateUpsertSchema();
preWrite(instantTime, WriteOperationType.UPSERT_PREPPED,
table.getMetaClient());
HoodieWriteMetadata<HoodieData<WriteStatus>> result =
table.upsertPrepped(context, instantTime, HoodieJavaRDD.of(preppedRecords));
@@ -234,7 +232,6 @@ public class SparkRDDWriteClient<T> extends
@Override
public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, String
instantTime) {
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>,
HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE,
Option.ofNullable(instantTime));
- maybeDisableWriteRecordPositions(table.getMetaClient());
preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
HoodieWriteMetadata<HoodieData<WriteStatus>> result =
table.delete(context, instantTime, HoodieJavaRDD.of(keys));
HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD =
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
@@ -244,7 +241,6 @@ public class SparkRDDWriteClient<T> extends
@Override
public JavaRDD<WriteStatus> deletePrepped(JavaRDD<HoodieRecord<T>>
preppedRecord, String instantTime) {
HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>,
HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE_PREPPED,
Option.ofNullable(instantTime));
- maybeDisableWriteRecordPositions(table.getMetaClient());
preWrite(instantTime, WriteOperationType.DELETE_PREPPED,
table.getMetaClient());
HoodieWriteMetadata<HoodieData<WriteStatus>> result =
table.deletePrepped(context,instantTime, HoodieJavaRDD.of(preppedRecord));
HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD =
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
index 5a8eb69ad79..56935fc5d81 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
@@ -68,7 +68,7 @@ public class HoodieReaderConfig extends HoodieConfig {
public static final ConfigProperty<Boolean> MERGE_USE_RECORD_POSITIONS =
ConfigProperty
.key("hoodie.merge.use.record.positions")
- .defaultValue(false)
+ .defaultValue(true)
.markAdvanced()
.sinceVersion("1.0.0")
.withDocumentation("Whether to use positions in the block header for
data blocks containing updates and delete blocks for merging.");
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 478e12eb014..488ffb1ce43 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -88,11 +88,10 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
}
public HoodieAvroDataBlock(@Nonnull List<HoodieRecord> records,
- boolean shouldWriteRecordPositions,
@Nonnull Map<HeaderMetadataType, String> header,
@Nonnull String keyField
) {
- super(records, shouldWriteRecordPositions, header, new HashMap<>(),
keyField);
+ super(records, header, new HashMap<>(), keyField);
}
@Override
@@ -235,7 +234,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
*/
@Deprecated
public HoodieAvroDataBlock(List<HoodieRecord> records, Schema schema) {
- super(records, false, Collections.singletonMap(HeaderMetadataType.SCHEMA,
schema.toString()), new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ super(records, Collections.singletonMap(HeaderMetadataType.SCHEMA,
schema.toString()), new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
public static HoodieAvroDataBlock getBlock(byte[] content, Schema
readerSchema) throws IOException {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
index f32eeb0dfb7..44140b5b6af 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
@@ -49,7 +49,7 @@ public class HoodieCDCDataBlock extends HoodieAvroDataBlock {
public HoodieCDCDataBlock(List<HoodieRecord> records,
Map<HeaderMetadataType, String> header,
String keyField) {
- super(records, false, header, keyField);
+ super(records, header, keyField);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index b413d82fd2f..6f48cb27a74 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -42,7 +42,6 @@ import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import static
org.apache.hudi.common.model.HoodieRecordLocation.isPositionValid;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -70,7 +69,6 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
private final boolean enablePointLookups;
protected Schema readerSchema;
- protected final boolean shouldWriteRecordPositions;
// Map of string schema to parsed schema.
private static ConcurrentHashMap<String, Schema> schemaMap = new
ConcurrentHashMap<>();
@@ -79,31 +77,15 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
* NOTE: This ctor is used on the write-path (ie when records ought to be
written into the log)
*/
public HoodieDataBlock(List<HoodieRecord> records,
- boolean shouldWriteRecordPositions,
Map<HeaderMetadataType, String> header,
Map<FooterMetadataType, String> footer,
String keyFieldName) {
super(header, footer, Option.empty(), Option.empty(), null, false);
- if (shouldWriteRecordPositions) {
- records.sort((o1, o2) -> {
- long v1 = o1.getCurrentPosition();
- long v2 = o2.getCurrentPosition();
- return Long.compare(v1, v2);
- });
- if (isPositionValid(records.get(0).getCurrentPosition())) {
- addRecordPositionsToHeader(
-
records.stream().map(HoodieRecord::getCurrentPosition).collect(Collectors.toSet()),
- records.size());
- } else {
- LOG.warn("There are records without valid positions. "
- + "Skip writing record positions to the data block header.");
- }
- }
+ addRecordPositionsIfRequired(records, HoodieRecord::getCurrentPosition);
this.records = Option.of(records);
this.keyFieldName = keyFieldName;
// If no reader-schema has been provided assume writer-schema as one
this.readerSchema = getWriterSchema(super.getLogBlockHeader());
- this.shouldWriteRecordPositions = shouldWriteRecordPositions;
this.enablePointLookups = false;
}
@@ -120,8 +102,6 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
String keyFieldName,
boolean enablePointLookups) {
super(headers, footer, blockContentLocation, content, inputStreamSupplier,
readBlockLazily);
- // Setting `shouldWriteRecordPositions` to false as this constructor is
only used by the reader
- this.shouldWriteRecordPositions = false;
this.records = Option.empty();
this.keyFieldName = keyFieldName;
this.readerSchema = containsPartialUpdates()
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
index 3f5bfe51c78..c71db423856 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
@@ -56,7 +56,6 @@ import java.util.stream.Collectors;
import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
-import static
org.apache.hudi.common.model.HoodieRecordLocation.isPositionValid;
/**
* Delete block contains a list of keys to be deleted from scanning the blocks
so far.
@@ -76,24 +75,9 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
private DeleteRecord[] recordsToDelete;
public HoodieDeleteBlock(List<Pair<DeleteRecord, Long>> recordsToDelete,
- boolean shouldWriteRecordPositions,
Map<HeaderMetadataType, String> header) {
this(Option.empty(), null, false, Option.empty(), header, new HashMap<>());
- if (shouldWriteRecordPositions && !recordsToDelete.isEmpty()) {
- recordsToDelete.sort((o1, o2) -> {
- long v1 = o1.getRight();
- long v2 = o2.getRight();
- return Long.compare(v1, v2);
- });
- if (isPositionValid(recordsToDelete.get(0).getRight())) {
- addRecordPositionsToHeader(
-
recordsToDelete.stream().map(Pair::getRight).collect(Collectors.toSet()),
- recordsToDelete.size());
- } else {
- LOG.warn("There are delete records without valid positions. "
- + "Skip writing record positions to the delete block header.");
- }
- }
+ addRecordPositionsIfRequired(recordsToDelete, Pair::getRight);
this.recordsToDelete =
recordsToDelete.stream().map(Pair::getLeft).toArray(DeleteRecord[]::new);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 406bc0c6a03..4d1267701ba 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -89,7 +89,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
String compressionCodec,
StoragePath pathForReader,
boolean useNativeHFileReader) {
- super(records, false, header, new HashMap<>(),
HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME);
+ super(records, header, new HashMap<>(),
HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME);
this.compressionCodec = Option.of(compressionCodec);
this.pathForReader = pathForReader;
this.hFileReaderConfig = getHFileReaderConfig(useNativeHFileReader);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index 7d7da384d5d..04c7827b4e4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -42,11 +42,14 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import static
org.apache.hudi.common.model.HoodieRecordLocation.isPositionValid;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -145,7 +148,16 @@ public abstract class HoodieLogBlock {
return
LogReaderUtils.decodeRecordPositionsHeader(logBlockHeader.get(HeaderMetadataType.RECORD_POSITIONS));
}
- protected void addRecordPositionsToHeader(Set<Long> positionSet, int
numRecords) {
+ /**
+ * @return base file instant time of the record positions if the record
positions are enabled
+ * in the log block; {@code null} otherwise.
+ */
+ public String getBaseFileInstantTimeOfPositions() {
+ return
logBlockHeader.get(HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
+ }
+
+ protected void addRecordPositionsToHeader(Set<Long> positionSet,
+ int numRecords) {
if (positionSet.size() == numRecords) {
try {
logBlockHeader.put(HeaderMetadataType.RECORD_POSITIONS,
LogReaderUtils.encodePositions(positionSet));
@@ -159,6 +171,17 @@ public abstract class HoodieLogBlock {
}
}
+ protected boolean containsBaseFileInstantTimeOfPositions() {
+ return logBlockHeader.containsKey(
+ HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
+ }
+
+ protected void removeBaseFileInstantTimeOfPositions() {
+ LOG.warn("There are records without valid positions. "
+ + "Skip writing record positions to the block header.");
+
logBlockHeader.remove(HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
+ }
+
/**
* Type of the log block WARNING: This enum is serialized as the ordinal.
Only add new enums at the end.
*/
@@ -208,7 +231,8 @@ public abstract class HoodieLogBlock {
COMPACTED_BLOCK_TIMES(HoodieTableVersion.FIVE),
RECORD_POSITIONS(HoodieTableVersion.SIX),
BLOCK_IDENTIFIER(HoodieTableVersion.SIX),
- IS_PARTIAL(HoodieTableVersion.EIGHT);
+ IS_PARTIAL(HoodieTableVersion.EIGHT),
+ BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS(HoodieTableVersion.EIGHT);
@SuppressWarnings("unused")
private final HoodieTableVersion earliestTableVersion;
@@ -321,6 +345,38 @@ public abstract class HoodieLogBlock {
return Option.of(content);
}
+ /**
+ * Adds the record positions if the base file instant time of the positions
exists
+ * in the log header and the record positions are all valid.
+ *
+ * @param records records with valid or invalid positions
+ * @param getPositionFunc function to get the position from the record
+ * @param <T> type of record
+ */
+ protected <T> void addRecordPositionsIfRequired(List<T> records,
+ Function<T, Long>
getPositionFunc) {
+ if (containsBaseFileInstantTimeOfPositions()) {
+ if (!isPositionValid(getPositionFunc.apply(records.get(0)))) {
+ // Short circuit in case all records do not have valid positions,
+ // e.g., BUCKET index cannot identify the record position with low
overhead
+ removeBaseFileInstantTimeOfPositions();
+ return;
+ }
+ records.sort((o1, o2) -> {
+ long v1 = getPositionFunc.apply(o1);
+ long v2 = getPositionFunc.apply(o2);
+ return Long.compare(v1, v2);
+ });
+ if (isPositionValid(getPositionFunc.apply(records.get(0)))) {
+ addRecordPositionsToHeader(
+ records.stream().map(getPositionFunc).collect(Collectors.toSet()),
+ records.size());
+ } else {
+ removeBaseFileInstantTimeOfPositions();
+ }
+ }
+ }
+
/**
* When lazyReading of blocks is turned on, inflate the content of a log
block from disk.
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index 45ba608b3e5..8d9b1221c54 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -69,14 +69,13 @@ public class HoodieParquetDataBlock extends HoodieDataBlock
{
}
public HoodieParquetDataBlock(List<HoodieRecord> records,
- boolean shouldWriteRecordPositions,
Map<HeaderMetadataType, String> header,
String keyField,
String compressionCodecName,
double expectedCompressionRatio,
boolean useDictionaryEncoding
) {
- super(records, shouldWriteRecordPositions, header, new HashMap<>(),
keyField);
+ super(records, header, new HashMap<>(), keyField);
this.compressionCodecName = Option.of(compressionCodecName);
this.expectedCompressionRatio = Option.of(expectedCompressionRatio);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index e8cc9133f4d..bf9653efbf3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -122,8 +122,8 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
this.outputConverter =
readerContext.getSchemaHandler().getOutputConverter();
this.readStats = new HoodieReadStats();
this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
- tableConfig.getRecordMergeMode(), props, this.logFiles.isEmpty(),
isSkipMerge,
- shouldUseRecordPosition, readStats);
+ tableConfig.getRecordMergeMode(), props, hoodieBaseFileOption,
this.logFiles.isEmpty(),
+ isSkipMerge, shouldUseRecordPosition, readStats);
}
/**
@@ -133,6 +133,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode
recordMergeMode,
TypedProperties
props,
+
Option<HoodieBaseFile> baseFileOption,
boolean
hasNoLogFiles,
boolean
isSkipMerge,
boolean
shouldUseRecordPosition,
@@ -142,9 +143,10 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
} else if (isSkipMerge) {
return new HoodieUnmergedFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, recordMergeMode,
Option.empty(), Option.empty(), props, readStats);
- } else if (shouldUseRecordPosition) {
+ } else if (shouldUseRecordPosition && baseFileOption.isPresent()) {
return new HoodiePositionBasedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, recordMergeMode,
Option.empty(), Option.empty(), props, readStats);
+ readerContext, hoodieTableMetaClient, recordMergeMode,
Option.empty(),
+ Option.empty(), baseFileOption.get().getCommitTime(), props,
readStats);
} else {
return new HoodieKeyBasedFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, recordMergeMode,
Option.empty(), Option.empty(), props, readStats);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index a96982229da..9adc02f94c6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieKeyException;
@@ -64,6 +65,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieKeyBasedF
private static final String ROW_INDEX_COLUMN_NAME = "row_index";
public static final String ROW_INDEX_TEMPORARY_COLUMN_NAME =
"_tmp_metadata_" + ROW_INDEX_COLUMN_NAME;
+ protected final String baseFileInstantTime;
private long nextRecordPosition = 0L;
private boolean needToDoHybridStrategy = false;
@@ -72,9 +74,11 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieKeyBasedF
RecordMergeMode
recordMergeMode,
Option<String>
partitionNameOverrideOpt,
Option<String[]>
partitionPathFieldOpt,
+ String baseFileInstantTime,
TypedProperties props,
HoodieReadStats readStats) {
super(readerContext, hoodieTableMetaClient, recordMergeMode,
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
+ this.baseFileInstantTime = baseFileInstantTime;
}
@Override
@@ -89,7 +93,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieKeyBasedF
return;
}
// Extract positions from data block.
- List<Long> recordPositions = extractRecordPositions(dataBlock);
+ List<Long> recordPositions = extractRecordPositions(dataBlock,
baseFileInstantTime);
if (recordPositions == null) {
LOG.warn("Falling back to key based merge for Read");
fallbackToKeyBasedBuffer();
@@ -166,7 +170,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieKeyBasedF
return;
}
- List<Long> recordPositions = extractRecordPositions(deleteBlock);
+ List<Long> recordPositions = extractRecordPositions(deleteBlock,
baseFileInstantTime);
if (recordPositions == null) {
LOG.warn("Falling back to key based merge for Read");
fallbackToKeyBasedBuffer();
@@ -281,15 +285,26 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieKeyBasedF
}
/**
- * Extract the record positions from a log block header.
+ * Extracts valid record positions from a log block header.
*
- * @param logBlock
- * @return
- * @throws IOException
+ * @param logBlock {@link HoodieLogBlock} instance of the log
block
+ * @param baseFileInstantTime base file instant time for the file group to
read
+ *
+ * @return valid record positions
+ * @throws IOException upon I/O errors
*/
- protected static List<Long> extractRecordPositions(HoodieLogBlock logBlock)
throws IOException {
+ protected static List<Long> extractRecordPositions(HoodieLogBlock logBlock,
+ String
baseFileInstantTime) throws IOException {
List<Long> blockPositions = new ArrayList<>();
+ String blockBaseFileInstantTime =
logBlock.getBaseFileInstantTimeOfPositions();
+ if (StringUtils.isNullOrEmpty(blockBaseFileInstantTime) ||
!baseFileInstantTime.equals(blockBaseFileInstantTime)) {
+ LOG.debug("The record positions cannot be used because the base file
instant time "
+ + "is either missing or different from the base file to merge. "
+ + "Instant time in the header: {}, base file instant time of the
file group: {}.",
+ blockBaseFileInstantTime, baseFileInstantTime);
+ return null;
+ }
Roaring64NavigableMap positions = logBlock.getRecordPositions();
if (positions == null || positions.isEmpty()) {
LOG.warn("No record position info is found when attempt to do position
based merge.");
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
index 6995a2f0c25..594cbc1b8ca 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
@@ -70,6 +70,7 @@ import java.util.stream.IntStream;
import static
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
import static
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
+import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.DELETE_BLOCK;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK;
import static
org.apache.hudi.common.testutils.FileCreateUtilsLegacy.baseFileName;
@@ -168,7 +169,6 @@ public class HoodieFileSliceTestUtils {
List<IndexedRecord> records,
Map<HoodieLogBlock.HeaderMetadataType, String> header,
StoragePath logFilePath,
- boolean writePositions,
Map<String, Long> keyToPositionMap
) {
return createDataBlock(
@@ -176,19 +176,17 @@ public class HoodieFileSliceTestUtils {
records.stream().map(r -> new HoodieAvroIndexedRecord(r, new
HoodieRecordLocation("", "", keyToPositionMap.get(r.get(RECORD_KEY_INDEX)))))
.collect(Collectors.toList()),
header,
- logFilePath,
- writePositions);
+ logFilePath);
}
private static HoodieDataBlock createDataBlock(
HoodieLogBlock.HoodieLogBlockType dataBlockType,
List<HoodieRecord> records,
Map<HoodieLogBlock.HeaderMetadataType, String> header,
- StoragePath pathForReader,
- boolean writePositions
- ) {
+ StoragePath pathForReader) {
switch (dataBlockType) {
case CDC_DATA_BLOCK:
+ header.remove(BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
return new HoodieCDCDataBlock(
records,
header,
@@ -196,10 +194,10 @@ public class HoodieFileSliceTestUtils {
case AVRO_DATA_BLOCK:
return new HoodieAvroDataBlock(
records,
- writePositions,
header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
case HFILE_DATA_BLOCK:
+ header.remove(BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
return new HoodieHFileDataBlock(
records,
header,
@@ -209,7 +207,6 @@ public class HoodieFileSliceTestUtils {
case PARQUET_DATA_BLOCK:
return new HoodieParquetDataBlock(
records,
- writePositions,
header,
HoodieRecord.RECORD_KEY_METADATA_FIELD,
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(),
@@ -226,7 +223,6 @@ public class HoodieFileSliceTestUtils {
Map<HoodieLogBlock.HeaderMetadataType, String> header,
Schema schema,
Properties props,
- boolean writePositions,
Map<String, Long> keyToPositionMap
) {
List<HoodieRecord> hoodieRecords = records.stream()
@@ -241,9 +237,7 @@ public class HoodieFileSliceTestUtils {
r -> Pair.of(DeleteRecord.create(
r.getKey(), r.getOrderingValue(schema, props)),
r.getCurrentLocation().getPosition()))
.collect(Collectors.toList()),
- writePositions,
- header
- );
+ header);
}
public static HoodieBaseFile createBaseFile(
@@ -291,6 +285,7 @@ public class HoodieFileSliceTestUtils {
List<IndexedRecord> records,
Schema schema,
String fileId,
+ String baseFileInstantTime,
String logInstantTime,
int version,
HoodieLogBlock.HoodieLogBlockType blockType,
@@ -308,14 +303,19 @@ public class HoodieFileSliceTestUtils {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME,
logInstantTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+ if (writePositions) {
+ header.put(
+ BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS,
+ baseFileInstantTime);
+ }
if (blockType != DELETE_BLOCK) {
HoodieDataBlock dataBlock = getDataBlock(
- blockType, records, header, new StoragePath(logFilePath),
writePositions, keyToPositionMap);
+ blockType, records, header, new StoragePath(logFilePath),
keyToPositionMap);
writer.appendBlock(dataBlock);
} else {
- HoodieDeleteBlock deleteBlock = getDeleteBlock(
- records, header, schema, PROPERTIES, writePositions,
keyToPositionMap);
+ HoodieDeleteBlock deleteBlock =
+ getDeleteBlock(records, header, schema, PROPERTIES,
keyToPositionMap);
writer.appendBlock(deleteBlock);
}
}
@@ -373,6 +373,7 @@ public class HoodieFileSliceTestUtils {
records,
schema,
fileId,
+ baseFilePlan.getInstantTime(),
logFilePlan.getInstantTime(),
i,
blockType,
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
index a4d7cb29023..4e45bc184aa 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
@@ -74,7 +74,7 @@ public class TestSerializationUtils {
DeleteRecord deleteRecord = DeleteRecord.create(new HoodieKey("key",
"partition"));
List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
deleteRecordList.add(Pair.of(deleteRecord, -1L));
- HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
false, Collections.emptyMap());
+ HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
Collections.emptyMap());
byte[] firstBytes = SerializationUtils.serialize(deleteBlock);
byte[] secondBytes = SerializationUtils.serialize(deleteBlock);
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 5ac7ee5ffe3..098911b7abc 100755
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -26,8 +26,10 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -42,7 +44,6 @@ import
org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
-import org.apache.hudi.common.table.log.LogReaderUtils;
import org.apache.hudi.common.table.log.TestLogReaderUtils;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
@@ -87,6 +88,7 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
@@ -109,8 +111,11 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.model.HoodieRecordLocation.INVALID_POSITION;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.NULL_SCHEMA;
import static org.apache.hudi.common.testutils.HoodieTestUtils.getJavaVersion;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.shouldUseExternalHdfs;
import static org.apache.hudi.common.testutils.HoodieTestUtils.useExternalHdfs;
@@ -1358,7 +1363,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
for (DeleteRecord dr : deletedRecords) {
deleteRecordList.add(Pair.of(dr, -1L));
}
- HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
false, header);
+ HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
header);
writer.appendBlock(deleteBlock);
List<String> allLogFiles =
@@ -1499,7 +1504,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
deleteRecordList.add(Pair.of(
DeleteRecord.create(deletedKey.getRecordKey(),
deletedKey.getPartitionPath()), -1L));
}
- HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
false, header);
+ HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
header);
writer.appendBlock(deleteBlock);
List<String> allLogFiles =
@@ -1522,7 +1527,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
deleteRecordList.add(Pair.of(
DeleteRecord.create(deletedKey.getRecordKey(),
deletedKey.getPartitionPath()), -1L));
}
- deleteBlock = new HoodieDeleteBlock(deleteRecordList, false,
deleteBlockHeader);
+ deleteBlock = new HoodieDeleteBlock(deleteRecordList, deleteBlockHeader);
writer.appendBlock(deleteBlock);
FileCreateUtilsLegacy.createDeltaCommit(basePath, "102", storage);
@@ -1614,7 +1619,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
for (DeleteRecord dr : deleteRecords1) {
deleteRecordList.add(Pair.of(dr, -1L));
}
- HoodieDeleteBlock deleteBlock1 = new HoodieDeleteBlock(deleteRecordList,
false, header);
+ HoodieDeleteBlock deleteBlock1 = new HoodieDeleteBlock(deleteRecordList,
header);
writer.appendBlock(deleteBlock1);
// Delete another 10 keys with -1 as orderingVal.
@@ -1626,7 +1631,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
((GenericRecord)
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), -1), -1L))
.collect(Collectors.toList());
- HoodieDeleteBlock deleteBlock2 = new HoodieDeleteBlock(deleteRecordList,
false, header);
+ HoodieDeleteBlock deleteBlock2 = new HoodieDeleteBlock(deleteRecordList,
header);
writer.appendBlock(deleteBlock2);
// Delete another 10 keys with +1 as orderingVal.
@@ -1641,7 +1646,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
for (DeleteRecord dr : deletedRecords3) {
deleteRecordList.add(Pair.of(dr, -1L));
}
- HoodieDeleteBlock deleteBlock3 = new HoodieDeleteBlock(deleteRecordList,
false, header);
+ HoodieDeleteBlock deleteBlock3 = new HoodieDeleteBlock(deleteRecordList,
header);
writer.appendBlock(deleteBlock3);
List<String> allLogFiles =
@@ -1740,7 +1745,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
for (DeleteRecord dr : deleteRecords) {
deleteRecordList.add(Pair.of(dr, -1L));
}
- HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
false, header);
+ HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
header);
writer.appendBlock(deleteBlock);
FileCreateUtilsLegacy.createDeltaCommit(basePath, "100", storage);
@@ -1802,7 +1807,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
for (DeleteRecord dr : deleteRecords) {
deleteRecordList.add(Pair.of(dr, -1L));
}
- HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
false, header);
+ HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
header);
writer.appendBlock(deleteBlock);
FileCreateUtilsLegacy.createDeltaCommit(basePath, "100", storage);
@@ -1895,7 +1900,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
for (DeleteRecord dr : deleteRecords) {
deleteRecordList.add(Pair.of(dr, -1L));
}
- HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
false, header);
+ HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
header);
writer.appendBlock(deleteBlock);
FileCreateUtilsLegacy.createDeltaCommit(basePath, "100", storage);
@@ -1980,7 +1985,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
for (DeleteRecord dr : deletedRecords) {
deleteRecordList.add(Pair.of(dr, -1L));
}
- writer.appendBlock(new HoodieDeleteBlock(deleteRecordList, false, header));
+ writer.appendBlock(new HoodieDeleteBlock(deleteRecordList, header));
copyOfRecords2.addAll(copyOfRecords1);
List<String> originalKeys =
@@ -2790,19 +2795,51 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
}
@ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void testGetRecordPositions(boolean addRecordPositionsHeader) throws
IOException {
+ @CsvSource(value = {"false,false,false", "false,false,true",
+ "true,false,false", "true,false,true", "true,true,false",
"true,true,true"})
+ public void testGetRecordPositions(boolean recordWithPositions,
+ boolean allValidPositions,
+ boolean
addBaseFileInstantTimeOfPositions) throws IOException {
Map<HeaderMetadataType, String> header = new HashMap<>();
- Set<Long> positions = new HashSet<>();
- if (addRecordPositionsHeader) {
- positions = TestLogReaderUtils.generatePositions();
- String content = LogReaderUtils.encodePositions(positions);
- header.put(HeaderMetadataType.RECORD_POSITIONS, content);
+ List<Long> positions = new ArrayList<>();
+ if (recordWithPositions) {
+ positions.addAll(TestLogReaderUtils.generatePositions());
+ if (!allValidPositions) {
+ positions.add(INVALID_POSITION);
+ positions.set(positions.size() / 2, INVALID_POSITION);
+ }
}
- HoodieLogBlock logBlock = new HoodieDeleteBlock(Collections.emptyList(),
addRecordPositionsHeader, header);
- if (addRecordPositionsHeader) {
- TestLogReaderUtils.assertPositionEquals(positions,
logBlock.getRecordPositions());
+
+ List<Pair<DeleteRecord, Long>> deleteRecordList = positions.isEmpty()
+ ? IntStream.range(0, 10).boxed()
+ .map(i -> Pair.of(DeleteRecord.create("key" + i, "partition"),
INVALID_POSITION))
+ .collect(Collectors.toList())
+ : IntStream.range(0, positions.size()).boxed()
+ .map(i -> Pair.of(DeleteRecord.create("key" + i, "partition"),
positions.get(i)))
+ .collect(Collectors.toList());
+ List<HoodieRecord> recordList = positions.isEmpty()
+ ? IntStream.range(0, 10).boxed()
+ .map(i -> new HoodieAvroRecord(new HoodieKey("key" + i, "partition"),
null, HoodieOperation.INSERT,
+ new HoodieRecordLocation("001", "file1", INVALID_POSITION), null))
+ .collect(Collectors.toList())
+ : IntStream.range(0, positions.size()).boxed()
+ .map(i -> new HoodieAvroRecord(new HoodieKey("key" + i, "partition"),
null, HoodieOperation.INSERT,
+ new HoodieRecordLocation("001", "file1", positions.get(i)), null))
+ .collect(Collectors.toList());
+
+ header.put(HeaderMetadataType.SCHEMA, NULL_SCHEMA);
+ if (addBaseFileInstantTimeOfPositions) {
+
header.put(HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS,
"001");
}
+ HoodieLogBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
header);
+ HoodieLogBlock dataBlock = new HoodieAvroDataBlock(recordList, header,
"key");
+ boolean hasPositions = (recordWithPositions && allValidPositions &&
addBaseFileInstantTimeOfPositions);
+ String expectedInstantTime = hasPositions ? "001" : null;
+ Set<Long> expectedPositions = hasPositions ? new HashSet<>(positions) :
Collections.emptySet();
+ assertEquals(expectedInstantTime,
deleteBlock.getBaseFileInstantTimeOfPositions());
+ TestLogReaderUtils.assertPositionEquals(expectedPositions,
deleteBlock.getRecordPositions());
+ assertEquals(expectedInstantTime,
dataBlock.getBaseFileInstantTimeOfPositions());
+ TestLogReaderUtils.assertPositionEquals(expectedPositions,
dataBlock.getRecordPositions());
}
private static Stream<Arguments> testArguments() {
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
index 2175867f8ba..2fd6f4dbf5c 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
@@ -114,7 +114,7 @@ public class TestHoodieLogFormatAppendFailure {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
getSimpleSchema().toString());
HoodieAvroDataBlock dataBlock =
- new HoodieAvroDataBlock(records, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ new HoodieAvroDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits")
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
index d55139fe06e..ba513e391cd 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
@@ -121,7 +121,7 @@ public class TestHoodieDeleteBlock {
for (DeleteRecord dr : deleteRecords) {
deleteRecordList.add(Pair.of(dr, -1L));
}
- HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
false, new HashMap<>());
+ HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
new HashMap<>());
byte[] contentBytes =
deleteBlock.getContentBytes(HoodieTestUtils.getDefaultStorage());
HoodieDeleteBlock deserializeDeleteBlock = new HoodieDeleteBlock(
Option.of(contentBytes), null, true, Option.empty(), new HashMap<>(),
new HashMap<>());
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index eb7a04c83e2..590a7818c91 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -372,11 +372,11 @@ public class HoodieCommonTestHarness {
case CDC_DATA_BLOCK:
return new HoodieCDCDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
case AVRO_DATA_BLOCK:
- return new HoodieAvroDataBlock(records, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ return new HoodieAvroDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
case HFILE_DATA_BLOCK:
return new HoodieHFileDataBlock(records, header,
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), pathForReader,
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
case PARQUET_DATA_BLOCK:
- return new HoodieParquetDataBlock(records, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD,
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
+ return new HoodieParquetDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD,
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
default:
throw new RuntimeException("Unknown data block type " + dataBlockType);
}
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index 972a57c6a4f..c766076d66a 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -423,11 +423,11 @@ public class InputFormatTestUtil {
dataBlock = new HoodieHFileDataBlock(
hoodieRecords, header,
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), writer.getLogFile().getPath(),
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
} else if (logBlockType ==
HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) {
- dataBlock = new HoodieParquetDataBlock(hoodieRecords, false, header,
+ dataBlock = new HoodieParquetDataBlock(hoodieRecords, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD,
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
} else {
- dataBlock = new HoodieAvroDataBlock(hoodieRecords, false, header,
- HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ dataBlock = new HoodieAvroDataBlock(
+ hoodieRecords, header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
writer.appendBlock(dataBlock);
return writer;
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
index e27c96d08b3..3c80ef83919 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
@@ -51,6 +51,8 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.spark.sql.catalyst.InternalRow;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.net.URISyntaxException;
@@ -62,6 +64,7 @@ import java.util.stream.Collectors;
import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
+import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.createMetaClient;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -75,7 +78,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
private String partitionPath;
private HoodieReadStats readStats;
- public void prepareBuffer(RecordMergeMode mergeMode) throws Exception {
+ public void prepareBuffer(RecordMergeMode mergeMode, String
baseFileInstantTime) throws Exception {
Map<String, String> writeConfigs = new HashMap<>();
writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
"parquet");
writeConfigs.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"_row_key");
@@ -139,14 +142,19 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
mergeMode,
partitionNameOpt,
partitionFields,
+ baseFileInstantTime,
props,
readStats);
}
- public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader() {
+ public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader(boolean
shouldWriteRecordPositions,
+ String
baseFileInstantTime) {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
avroSchema.toString());
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+ if (shouldWriteRecordPositions) {
+ header.put(BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS,
baseFileInstantTime);
+ }
return header;
}
@@ -161,7 +169,8 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
return deletedRecords;
}
- public HoodieDeleteBlock getDeleteBlockWithPositions() throws IOException,
URISyntaxException {
+ public HoodieDeleteBlock getDeleteBlockWithPositions(String
baseFileInstantTime)
+ throws IOException, URISyntaxException {
List<DeleteRecord> deletedRecords = getDeleteRecords();
List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
@@ -169,7 +178,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
for (DeleteRecord dr : deletedRecords) {
deleteRecordList.add(Pair.of(dr, position++));
}
- return new HoodieDeleteBlock(deleteRecordList, true, getHeader());
+ return new HoodieDeleteBlock(deleteRecordList, getHeader(true,
baseFileInstantTime));
}
public HoodieDeleteBlock getDeleteBlockWithoutPositions() throws
IOException, URISyntaxException {
@@ -179,23 +188,34 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
for (DeleteRecord dr : deletedRecords) {
deleteRecordList.add(Pair.of(dr, -1L));
}
- return new HoodieDeleteBlock(deleteRecordList, true, getHeader());
+ return new HoodieDeleteBlock(deleteRecordList, getHeader(false, ""));
}
- @Test
- public void testProcessDeleteBlockWithPositions() throws Exception {
- prepareBuffer(RecordMergeMode.COMMIT_TIME_ORDERING);
- HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions();
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testProcessDeleteBlockWithPositions(boolean sameBaseInstantTime)
throws Exception {
+ String baseFileInstantTime = "090";
+ prepareBuffer(RecordMergeMode.COMMIT_TIME_ORDERING, baseFileInstantTime);
+ HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions(
+ sameBaseInstantTime ? baseFileInstantTime : baseFileInstantTime + "1");
buffer.processDeleteBlock(deleteBlock);
assertEquals(50, buffer.getLogRecords().size());
- // With record positions, we do not need the record keys.
-
assertNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY));
+ if (sameBaseInstantTime) {
+ // If the log block's base instant time of record positions match the
base file
+ // to merge, the log records are stored based on the position
+
assertNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY));
+ } else {
+ // If the log block's base instant time of record positions does not
match the
+ // base file to merge, the log records are stored based on the record key
+ assertNull(buffer.getLogRecords().get(0L));
+ }
}
@Test
public void testProcessDeleteBlockWithCustomMerger() throws Exception {
- prepareBuffer(RecordMergeMode.CUSTOM);
- HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions();
+ String baseFileInstantTime = "090";
+ prepareBuffer(RecordMergeMode.CUSTOM, baseFileInstantTime);
+ HoodieDeleteBlock deleteBlock =
getDeleteBlockWithPositions(baseFileInstantTime);
buffer.processDeleteBlock(deleteBlock);
assertEquals(50, buffer.getLogRecords().size());
assertNotNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY));
@@ -203,7 +223,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
@Test
public void testProcessDeleteBlockWithoutPositions() throws Exception {
- prepareBuffer(RecordMergeMode.COMMIT_TIME_ORDERING);
+ prepareBuffer(RecordMergeMode.COMMIT_TIME_ORDERING, "090");
HoodieDeleteBlock deleteBlock = getDeleteBlockWithoutPositions();
buffer.processDeleteBlock(deleteBlock);
assertEquals(50, buffer.getLogRecords().size());
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
index 6d1d5120ff8..05abd8f59dd 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
@@ -454,7 +454,7 @@ public class
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
getSimpleSchema().toString());
List<HoodieRecord> hoodieRecords =
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
- return new HoodieAvroDataBlock(hoodieRecords, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ return new HoodieAvroDataBlock(hoodieRecords, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
private String generateFakeWriteToken(String correctWriteToken) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index 9fb8f455f09..6d2ec1cbd81 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -26,6 +26,7 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieLogFile,
HoodieTableType, WriteConcurrencyMode}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.table.log.HoodieLogFileReader
+import org.apache.hudi.common.table.view.FileSystemViewManager
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator,
HoodieTestUtils}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.util.StringUtils
@@ -43,7 +44,7 @@ import org.apache.spark.sql.functions.{col, lit}
import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
+import org.junit.jupiter.params.provider.CsvSource
import scala.collection.JavaConverters._
@@ -191,15 +192,16 @@ class TestMORDataSourceStorage extends
SparkClientFunctionalTestHarness {
}
@ParameterizedTest
- @ValueSource(booleans = Array(true, false))
- def testAutoDisablingRecordPositionsUnderPendingCompaction(enableNBCC:
Boolean): Unit = {
+ @CsvSource(value = Array("false,false", "true,true", "true,false"))
+ def
testAutoDisablingRecordPositionsUnderPendingCompaction(writeRecordPosition:
Boolean,
+ enableNBCC:
Boolean): Unit = {
val options = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
"hoodie.bulkinsert.shuffle.parallelism" -> "2",
"hoodie.delete.shuffle.parallelism" -> "1",
"hoodie.merge.small.file.group.candidates.limit" -> "0",
- HoodieWriteConfig.WRITE_RECORD_POSITIONS.key -> "true",
+ HoodieWriteConfig.WRITE_RECORD_POSITIONS.key ->
writeRecordPosition.toString,
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key ->
classOf[NonpartitionedKeyGenerator].getName,
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
@@ -241,6 +243,7 @@ class TestMORDataSourceStorage extends
SparkClientFunctionalTestHarness {
.option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key,
"true").load(basePath).count())
val metaClient = HoodieTestUtils.createMetaClient(storage.getConf,
basePath)
+ var logFileList = List[HoodieLogFile]()
// Upsert
for (i <- 1 to 3) {
// Generate some deletes so that if the record positions are still
enabled during pending
@@ -272,12 +275,11 @@ class TestMORDataSourceStorage extends
SparkClientFunctionalTestHarness {
assertEquals(1,
metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().countInstants())
assertEquals(i + 1,
metaClient.getActiveTimeline.getDeltaCommitTimeline.countInstants())
- // The deltacommit from the first round should write record positions in
the log files
- // since it happens before the pending compaction.
- // The deltacommit from the second and third round should not write
record positions in the
- // log files since it happens after the pending compaction
- validateRecordPositionsInLogFiles(
- metaClient, shouldContainRecordPosition = !enableNBCC && i == 1)
+ // The deltacommit from all three rounds should write record positions
in the log files
+ // and the base file instant time of the record positions should match
the latest
+ // base file before compaction happens
+ logFileList = validateRecordPositionsInLogFiles(
+ metaClient, writeRecordPosition && !enableNBCC)
}
for (i <- 4 to 6) {
@@ -308,17 +310,28 @@ class TestMORDataSourceStorage extends
SparkClientFunctionalTestHarness {
assertTrue(metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().empty())
assertEquals(1,
metaClient.getActiveTimeline.getCommitAndReplaceTimeline.countInstants())
assertEquals(i + 1,
metaClient.getActiveTimeline.getDeltaCommitTimeline.countInstants())
- // The deltacommit from the forth round should not write record
positions in the log files
- // since it happens before the compaction is executed.
+ // The deltacommit from the forth round should write record positions in
the log files
+ // but the base file instant time of the record positions should not
match the latest
+ // base file, since the compaction happens afterwards.
+ if (i == 4) {
+ // Also revalidate the log files generate from the third round as the
base instant
+ // time should not match the the latest base file after compaction
+ validateRecordPositionsInLogFiles(
+ metaClient, shouldContainRecordPosition = writeRecordPosition &&
!enableNBCC,
+ logFileList, shouldBaseFileInstantTimeMatch = false)
+ }
// The deltacommit from the fifth and sixth round should write record
positions in the log
- // files since it happens after the completed compaction
+ // files and the base file instant time of the record positions should
match the latest
+ // base file generated by the compaction.
validateRecordPositionsInLogFiles(
- metaClient, shouldContainRecordPosition = !enableNBCC && i != 4)
+ metaClient, shouldContainRecordPosition = writeRecordPosition &&
!enableNBCC,
+ shouldBaseFileInstantTimeMatch = i != 4)
}
}
def validateRecordPositionsInLogFiles(metaClient: HoodieTableMetaClient,
- shouldContainRecordPosition: Boolean):
Unit = {
+ shouldContainRecordPosition: Boolean,
+ shouldBaseFileInstantTimeMatch:
Boolean = true): List[HoodieLogFile] = {
val instant =
metaClient.getActiveTimeline.getDeltaCommitTimeline.lastInstant().get()
val commitMetadata = metaClient.getCommitMetadataSerDe.deserialize(
instant, metaClient.getActiveTimeline.getInstantDetails(instant).get,
@@ -329,7 +342,18 @@ class TestMORDataSourceStorage extends
SparkClientFunctionalTestHarness {
.map(e => new HoodieLogFile(new StoragePath(e)))
.toList
assertFalse(logFileList.isEmpty)
+ validateRecordPositionsInLogFiles(
+ metaClient, shouldContainRecordPosition, logFileList,
shouldBaseFileInstantTimeMatch)
+ logFileList
+ }
+
+ def validateRecordPositionsInLogFiles(metaClient: HoodieTableMetaClient,
+ shouldContainRecordPosition: Boolean,
+ logFileList: List[HoodieLogFile],
+ shouldBaseFileInstantTimeMatch:
Boolean): Unit = {
val schema = new TableSchemaResolver(metaClient).getTableAvroSchema
+ val fsv = FileSystemViewManager.createInMemoryFileSystemView(
+ context(), metaClient, HoodieMetadataConfig.newBuilder().build())
logFileList.foreach(filename => {
val logFormatReader = new HoodieLogFileReader(metaClient.getStorage,
filename, schema, 81920)
var numBlocks = 0
@@ -337,6 +361,13 @@ class TestMORDataSourceStorage extends
SparkClientFunctionalTestHarness {
val logBlock = logFormatReader.next()
val recordPositions = logBlock.getRecordPositions
assertEquals(shouldContainRecordPosition, !recordPositions.isEmpty)
+ if (shouldContainRecordPosition) {
+ val baseFile = fsv.getLatestBaseFile("", filename.getFileId)
+ assertTrue(baseFile.isPresent)
+ assertEquals(
+ shouldBaseFileInstantTimeMatch,
+
baseFile.get().getCommitTime.equals(logBlock.getBaseFileInstantTimeOfPositions))
+ }
numBlocks += 1
}
logFormatReader.close()
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index 15032990731..526ead7e02d 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -717,7 +717,7 @@ public class HiveTestUtil {
Map<HeaderMetadataType, String> header = new HashMap<>(2);
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME,
dataFile.getCommitTime());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, false,
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
logWriter.appendBlock(dataBlock);
logWriter.close();
return logWriter.getLogFile();
@@ -735,7 +735,7 @@ public class HiveTestUtil {
Map<HeaderMetadataType, String> header = new HashMap<>(2);
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME,
dataFile.getCommitTime());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, false,
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
logWriter.appendBlock(dataBlock);
logWriter.close();
return logWriter.getLogFile();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index 793c3bb0422..7e424968a4c 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -853,7 +853,7 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME,
instantTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
getSimpleSchema().toString());
writer.appendBlock(new HoodieAvroDataBlock(
- Collections.emptyList(), false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ Collections.emptyList(), header,
HoodieRecord.RECORD_KEY_METADATA_FIELD));
} else {
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME,
instantTime);
header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME,
baseInstantTime);