This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6727c45ce28 [HUDI-6795] Implement writing record_positions to log
blocks for updates and deletes (#9581)
6727c45ce28 is described below
commit 6727c45ce289be41d7ddde14406b274d3f926f2e
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Oct 10 00:58:37 2023 -0700
[HUDI-6795] Implement writing record_positions to log blocks for updates
and deletes (#9581)
Implement writing record positions to the block header for data blocks
containing updates and delete blocks.
- Add a new long field `position` to the `HoodieRecordLocation` model. It
is the position of a record in the file, e.g., row position starting from 0 in
the Parquet file.
- Positions will only be written if `hoodie.write.record.positions` is set
to true.
- Changes in `HoodieDataBlock` and `HoodieDeleteBlock` to handle writing
record positions.
- Change `HoodieFileReader#filterRowKeys` to return not only the set of
filtered keys but also their position in the file.
- Log format tests for the new header and e2e tests for MOR table with the
aforementioned config enabled.
---------
Co-authored-by: Sagar Sumit <[email protected]>
---
.../cli/commands/TestHoodieLogFileCommand.java | 4 +-
.../org/apache/hudi/config/HoodieWriteConfig.java | 17 +++++
.../org/apache/hudi/index/HoodieIndexUtils.java | 19 +++--
.../bloom/ListBasedHoodieBloomIndexHelper.java | 15 ++--
.../org/apache/hudi/io/HoodieAppendHandle.java | 24 ++++---
.../hudi/io/HoodieKeyLocationFetchHandle.java | 20 +++---
.../org/apache/hudi/io/HoodieKeyLookupHandle.java | 8 +--
.../org/apache/hudi/io/HoodieKeyLookupResult.java | 12 ++--
.../metadata/HoodieBackedTableMetadataWriter.java | 4 +-
.../utils/TestLegacyArchivedMetaEntryReader.java | 2 +-
.../hudi/testutils/HoodieWriteableTestTable.java | 3 +-
.../index/bloom/TestFlinkHoodieBloomIndex.java | 11 ++-
.../testutils/HoodieFlinkWriteableTestTable.java | 2 +-
.../index/bloom/HoodieFileProbingFunction.java | 9 +--
.../index/bloom/SparkHoodieBloomIndexHelper.java | 10 +--
.../hudi/io/storage/HoodieSparkParquetReader.java | 15 ++--
.../hudi/index/bloom/TestHoodieBloomIndex.java | 11 ++-
.../hudi/io/TestHoodieKeyLocationFetchHandle.java | 4 +-
.../org/apache/hudi/common/model/HoodieRecord.java | 16 +++--
.../common/model/HoodieRecordGlobalLocation.java | 15 ++--
.../hudi/common/model/HoodieRecordLocation.java | 32 ++++++++-
.../hudi/common/table/log/LogReaderUtils.java | 3 +-
.../table/log/block/HoodieAvroDataBlock.java | 9 +--
.../common/table/log/block/HoodieCDCDataBlock.java | 2 +-
.../common/table/log/block/HoodieDataBlock.java | 24 +++++++
.../common/table/log/block/HoodieDeleteBlock.java | 38 +++++++++-
.../table/log/block/HoodieHFileDataBlock.java | 2 +-
.../common/table/log/block/HoodieLogBlock.java | 19 ++++-
.../table/log/block/HoodieParquetDataBlock.java | 3 +-
.../org/apache/hudi/common/util/BaseFileUtils.java | 31 +++++----
.../java/org/apache/hudi/common/util/OrcUtils.java | 33 +++++----
.../org/apache/hudi/common/util/ParquetUtils.java | 45 +++++++-----
.../hudi/io/storage/HoodieAvroHFileReader.java | 30 ++++----
.../hudi/io/storage/HoodieAvroOrcReader.java | 5 +-
.../hudi/io/storage/HoodieAvroParquetReader.java | 3 +-
.../hudi/io/storage/HoodieBootstrapFileReader.java | 3 +-
.../apache/hudi/io/storage/HoodieFileReader.java | 6 +-
.../common/functional/TestHoodieLogFormat.java | 81 ++++++++++++++++------
.../TestHoodieLogFormatAppendFailure.java | 3 +-
.../hudi/common/table/log/TestLogReaderUtils.java | 15 ++--
.../table/log/block/TestHoodieDeleteBlock.java | 9 ++-
.../apache/hudi/common/util/TestParquetUtils.java | 27 ++++----
.../hudi/common/util/TestSerializationUtils.java | 7 +-
.../io/storage/TestHoodieReaderWriterBase.java | 14 ++--
.../hudi/hadoop/testutils/InputFormatTestUtil.java | 4 +-
.../TestHoodieSparkMergeOnReadTableClustering.java | 13 ++--
.../hudi/functional/TestMORDataSourceStorage.scala | 2 +
.../apache/hudi/hive/testutils/HiveTestUtil.java | 2 +-
48 files changed, 468 insertions(+), 218 deletions(-)
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index 12de150f2d3..9bfbdfc4edd 100644
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -121,7 +121,7 @@ public class TestHoodieLogFileCommand extends
CLIFunctionalTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
getSimpleSchema().toString());
- dataBlock = new HoodieAvroDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ dataBlock = new HoodieAvroDataBlock(records, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
writer.appendBlock(dataBlock);
Map<HoodieLogBlock.HeaderMetadataType, String> rollbackHeader = new
HashMap<>();
@@ -217,7 +217,7 @@ public class TestHoodieLogFileCommand extends
CLIFunctionalTestHarness {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, INSTANT_TIME);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1,
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1, false,
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
writer.appendBlock(dataBlock);
} finally {
if (writer != null) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 87734c042d3..4a0087bc0a3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -752,6 +752,14 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "The class must be a subclass of
`org.apache.hudi.callback.HoodieClientInitCallback`."
+ "By default, no Hudi client init callback is executed.");
+ public static final ConfigProperty<Boolean> WRITE_RECORD_POSITIONS =
ConfigProperty
+ .key("hoodie.write.record.positions")
+ .defaultValue(false)
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("Whether to write record positions to the block
header for data blocks containing updates and delete blocks. "
+ + "The record positions can be used to improve the performance of
merging records from base and log files.");
+
/**
* Config key with boolean value that indicates whether record being written
during MERGE INTO Spark SQL
* operation are already prepped.
@@ -2052,6 +2060,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getLong(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE);
}
+ public boolean shouldWriteRecordPositions() {
+ return getBoolean(WRITE_RECORD_POSITIONS);
+ }
+
public double getParquetCompressionRatio() {
return getDouble(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION);
}
@@ -3090,6 +3102,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withWriteRecordPositionsEnabled(boolean
shouldWriteRecordPositions) {
+ writeConfig.setValue(WRITE_RECORD_POSITIONS,
String.valueOf(shouldWriteRecordPositions));
+ return this;
+ }
+
protected void setDefaults() {
writeConfig.setDefaultValue(MARKERS_TYPE,
getDefaultMarkersType(engineType));
// Check for mandatory properties
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 24a4dc05d10..33e8d501943 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -61,7 +61,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
-import java.util.TreeSet;
+import java.util.stream.Collectors;
import static java.util.stream.Collectors.toList;
import static
org.apache.hudi.table.action.commit.HoodieDeleteHelper.createDeleteRecord;
@@ -173,18 +173,18 @@ public class HoodieIndexUtils {
*
* @param filePath - File to filter keys from
* @param candidateRecordKeys - Candidate keys to filter
- * @return List of candidate keys that are available in the file
+ * @return List of pairs of candidate keys and positions that are available
in the file
*/
- public static List<String> filterKeysFromFile(Path filePath, List<String>
candidateRecordKeys,
- Configuration configuration)
throws HoodieIndexException {
+ public static List<Pair<String, Long>> filterKeysFromFile(Path filePath,
List<String> candidateRecordKeys,
+ Configuration
configuration) throws HoodieIndexException {
ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
- List<String> foundRecordKeys = new ArrayList<>();
+ List<Pair<String, Long>> foundRecordKeys = new ArrayList<>();
try (HoodieFileReader fileReader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
.getFileReader(configuration, filePath)) {
// Load all rowKeys from the file, to double-confirm
if (!candidateRecordKeys.isEmpty()) {
HoodieTimer timer = HoodieTimer.start();
- Set<String> fileRowKeys = fileReader.filterRowKeys(new
TreeSet<>(candidateRecordKeys));
+ Set<Pair<String, Long>> fileRowKeys =
fileReader.filterRowKeys(candidateRecordKeys.stream().collect(Collectors.toSet()));
foundRecordKeys.addAll(fileRowKeys);
LOG.info(String.format("Checked keys against file %s, in %d ms.
#candidates (%d) #found (%d)", filePath,
timer.endTimer(), candidateRecordKeys.size(),
foundRecordKeys.size()));
@@ -282,9 +282,14 @@ public class HoodieIndexUtils {
HoodieData<HoodieRecord<R>> untaggedUpdatingRecords =
incomingRecordsAndLocations.filter(p ->
p.getRight().isPresent()).map(Pair::getLeft)
.distinctWithKey(HoodieRecord::getRecordKey,
config.getGlobalIndexReconcileParallelism());
// the tagging partitions and locations
+ // NOTE: The incoming records may only differ in record position, however,
for the purpose of
+ // merging in case of partition updates, it is safe to ignore the
record positions.
HoodieData<HoodieRecordGlobalLocation> globalLocations =
incomingRecordsAndLocations
.filter(p -> p.getRight().isPresent())
- .map(p -> p.getRight().get())
+ .map(p -> new HoodieRecordGlobalLocation(
+ p.getRight().get().getPartitionPath(),
+ p.getRight().get().getInstantTime(),
+ p.getRight().get().getFileId()))
.distinct(config.getGlobalIndexReconcileParallelism());
// merged existing records with current locations being set
HoodieData<HoodieRecord<R>> existingRecords =
getExistingRecords(globalLocations, config, hoodieTable);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
index b47f5cf066c..03e4441393d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
@@ -64,22 +64,23 @@ public class ListBasedHoodieBloomIndexHelper extends
BaseHoodieBloomIndexHelper
List<HoodieKeyLookupResult> keyLookupResults =
CollectionUtils.toStream(
- new HoodieBloomIndexCheckFunction<Pair<HoodieFileGroupId,
String>>(hoodieTable, config, Pair::getLeft, Pair::getRight)
- .apply(fileComparisonPairList.iterator())
- )
+ new HoodieBloomIndexCheckFunction<Pair<HoodieFileGroupId,
String>>(hoodieTable, config, Pair::getLeft, Pair::getRight)
+ .apply(fileComparisonPairList.iterator())
+ )
.flatMap(Collection::stream)
- .filter(lr -> lr.getMatchingRecordKeys().size() > 0)
+ .filter(lr -> lr.getMatchingRecordKeysAndPositions().size() > 0)
.collect(toList());
return context.parallelize(keyLookupResults).flatMap(lookupResult ->
- lookupResult.getMatchingRecordKeys().stream()
+ lookupResult.getMatchingRecordKeysAndPositions().stream()
.map(recordKey -> new ImmutablePair<>(lookupResult,
recordKey)).iterator()
).mapToPair(pair -> {
HoodieKeyLookupResult lookupResult = pair.getLeft();
- String recordKey = pair.getRight();
+ String recordKey = pair.getRight().getLeft();
+ long recordPosition = pair.getRight().getRight();
return new ImmutablePair<>(
new HoodieKey(recordKey, lookupResult.getPartitionPath()),
- new HoodieRecordLocation(lookupResult.getBaseInstantTime(),
lookupResult.getFileId()));
+ new HoodieRecordLocation(lookupResult.getBaseInstantTime(),
lookupResult.getFileId(), recordPosition));
});
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index f4cd010da57..1e14ba724b7 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -49,6 +49,7 @@ import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.SizeEstimator;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieAppendException;
import org.apache.hudi.exception.HoodieException;
@@ -87,10 +88,11 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_RECORD_SIZE = 100;
protected final String fileId;
+ private final boolean shouldWriteRecordPositions;
// Buffer for holding records in memory before they are flushed to disk
private final List<HoodieRecord> recordList = new ArrayList<>();
- // Buffer for holding records (to be deleted) in memory before they are
flushed to disk
- private final List<DeleteRecord> recordsToDelete = new ArrayList<>();
+ // Buffer for holding records (to be deleted), along with their position in
log block, in memory before they are flushed to disk
+ private final List<Pair<DeleteRecord, Long>> recordsToDeleteWithPositions =
new ArrayList<>();
// Incoming records to be written to logs.
protected Iterator<HoodieRecord<T>> recordItr;
// Writer to log into the file group's latest slice.
@@ -155,6 +157,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
this.statuses = new ArrayList<>();
this.recordProperties.putAll(config.getProps());
this.attemptNumber = taskContextSupplier.getAttemptNumberSupplier().get();
+ this.shouldWriteRecordPositions = config.shouldWriteRecordPositions();
}
public HoodieAppendHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
@@ -459,12 +462,14 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
? HoodieRecord.RECORD_KEY_METADATA_FIELD
:
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
- blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList,
getUpdatedHeader(header, blockSequenceNumber++, attemptNumber, config,
+ blocks.add(getBlock(config, pickLogDataBlockFormat(), recordList,
shouldWriteRecordPositions,
+ getUpdatedHeader(header, blockSequenceNumber++, attemptNumber,
config,
addBlockIdentifier()), keyField));
}
- if (appendDeleteBlocks && recordsToDelete.size() > 0) {
- blocks.add(new HoodieDeleteBlock(recordsToDelete.toArray(new
DeleteRecord[0]), getUpdatedHeader(header, blockSequenceNumber++,
attemptNumber, config,
+ if (appendDeleteBlocks && recordsToDeleteWithPositions.size() > 0) {
+ blocks.add(new HoodieDeleteBlock(recordsToDeleteWithPositions,
shouldWriteRecordPositions,
+ getUpdatedHeader(header, blockSequenceNumber++, attemptNumber,
config,
addBlockIdentifier())));
}
@@ -473,7 +478,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
processAppendResult(appendResult, recordList);
recordList.clear();
if (appendDeleteBlocks) {
- recordsToDelete.clear();
+ recordsToDeleteWithPositions.clear();
}
}
} catch (Exception e) {
@@ -594,7 +599,8 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
LOG.error("Error writing record " + indexedRecord.get(), e);
}
} else {
- recordsToDelete.add(DeleteRecord.create(record.getKey(), orderingVal));
+ long position = shouldWriteRecordPositions ? record.getCurrentPosition()
: -1L;
+
recordsToDeleteWithPositions.add(Pair.of(DeleteRecord.create(record.getKey(),
orderingVal), position));
}
numberOfRecords++;
}
@@ -652,17 +658,19 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
private static HoodieLogBlock getBlock(HoodieWriteConfig writeConfig,
HoodieLogBlock.HoodieLogBlockType
logDataBlockFormat,
List<HoodieRecord> records,
+ boolean shouldWriteRecordPositions,
Map<HeaderMetadataType, String>
header,
String keyField) {
switch (logDataBlockFormat) {
case AVRO_DATA_BLOCK:
- return new HoodieAvroDataBlock(records, header, keyField);
+ return new HoodieAvroDataBlock(records, shouldWriteRecordPositions,
header, keyField);
case HFILE_DATA_BLOCK:
return new HoodieHFileDataBlock(
records, header, writeConfig.getHFileCompressionAlgorithm(), new
Path(writeConfig.getBasePath()));
case PARQUET_DATA_BLOCK:
return new HoodieParquetDataBlock(
records,
+ shouldWriteRecordPositions,
header,
keyField,
writeConfig.getParquetCompressionCodec(),
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
index ae643b80cbc..135e4866cc5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
@@ -51,26 +51,28 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O>
extends HoodieReadHandle<T
this.keyGeneratorOpt = keyGeneratorOpt;
}
- private List<HoodieKey> fetchHoodieKeys(HoodieBaseFile baseFile) {
+ private List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieBaseFile baseFile) {
BaseFileUtils baseFileUtils =
BaseFileUtils.getInstance(baseFile.getPath());
if (keyGeneratorOpt.isPresent()) {
- return baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new
Path(baseFile.getPath()), keyGeneratorOpt);
+ return
baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getHadoopConf(), new
Path(baseFile.getPath()), keyGeneratorOpt);
} else {
- return baseFileUtils.fetchHoodieKeys(hoodieTable.getHadoopConf(), new
Path(baseFile.getPath()));
+ return
baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getHadoopConf(), new
Path(baseFile.getPath()));
}
}
public Stream<Pair<HoodieKey, HoodieRecordLocation>> locations() {
HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
- return fetchHoodieKeys(baseFile).stream()
- .map(entry -> Pair.of(entry,
- new HoodieRecordLocation(baseFile.getCommitTime(),
baseFile.getFileId())));
+ return fetchRecordKeysWithPositions(baseFile).stream()
+ .map(entry -> Pair.of(entry.getLeft(),
+ new HoodieRecordLocation(baseFile.getCommitTime(),
baseFile.getFileId(), entry.getRight())));
}
public Stream<Pair<String, HoodieRecordGlobalLocation>> globalLocations() {
HoodieBaseFile baseFile = partitionPathBaseFilePair.getRight();
- return fetchHoodieKeys(baseFile).stream()
- .map(entry -> Pair.of(entry.getRecordKey(),
- new HoodieRecordGlobalLocation(entry.getPartitionPath(),
baseFile.getCommitTime(), baseFile.getFileId())));
+ return fetchRecordKeysWithPositions(baseFile).stream()
+ .map(entry -> Pair.of(entry.getLeft().getRecordKey(),
+ new HoodieRecordGlobalLocation(
+ entry.getLeft().getPartitionPath(), baseFile.getCommitTime(),
+ baseFile.getFileId(), entry.getRight())));
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
index 9590e8fcc2e..db3be1c1842 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
@@ -101,12 +101,12 @@ public class HoodieKeyLookupHandle<T, I, K, O> extends
HoodieReadHandle<T, I, K,
}
HoodieBaseFile baseFile = getLatestBaseFile();
- List<String> matchingKeys = HoodieIndexUtils.filterKeysFromFile(new
Path(baseFile.getPath()), candidateRecordKeys,
- hoodieTable.getHadoopConf());
+ List<Pair<String, Long>> matchingKeysAndPositions =
HoodieIndexUtils.filterKeysFromFile(
+ new Path(baseFile.getPath()), candidateRecordKeys,
hoodieTable.getHadoopConf());
LOG.info(
String.format("Total records (%d), bloom filter candidates
(%d)/fp(%d), actual matches (%d)", totalKeysChecked,
- candidateRecordKeys.size(), candidateRecordKeys.size() -
matchingKeys.size(), matchingKeys.size()));
+ candidateRecordKeys.size(), candidateRecordKeys.size() -
matchingKeysAndPositions.size(), matchingKeysAndPositions.size()));
return new HoodieKeyLookupResult(partitionPathFileIDPair.getRight(),
partitionPathFileIDPair.getLeft(),
- baseFile.getCommitTime(), matchingKeys);
+ baseFile.getCommitTime(), matchingKeysAndPositions);
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupResult.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupResult.java
index 19096a21d87..b64e57e0f03 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupResult.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupResult.java
@@ -18,6 +18,8 @@
package org.apache.hudi.io;
+import org.apache.hudi.common.util.collection.Pair;
+
import java.util.List;
/**
@@ -27,15 +29,15 @@ public class HoodieKeyLookupResult {
private final String fileId;
private final String baseInstantTime;
- private final List<String> matchingRecordKeys;
+ private final List<Pair<String, Long>> matchingRecordKeysAndPositions;
private final String partitionPath;
public HoodieKeyLookupResult(String fileId, String partitionPath, String
baseInstantTime,
- List<String> matchingRecordKeys) {
+ List<Pair<String, Long>>
matchingRecordKeysAndPositions) {
this.fileId = fileId;
this.partitionPath = partitionPath;
this.baseInstantTime = baseInstantTime;
- this.matchingRecordKeys = matchingRecordKeys;
+ this.matchingRecordKeysAndPositions = matchingRecordKeysAndPositions;
}
public String getFileId() {
@@ -50,8 +52,8 @@ public class HoodieKeyLookupResult {
return partitionPath;
}
- public List<String> getMatchingRecordKeys() {
- return matchingRecordKeys;
+ public List<Pair<String, Long>> getMatchingRecordKeysAndPositions() {
+ return matchingRecordKeysAndPositions;
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 6a49daf817d..0c1bb81f01e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -33,7 +33,6 @@ import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.function.SerializableFunction;
-import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -709,7 +708,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
engineContext.foreach(fileGroupFileIds, fileGroupFileId -> {
try {
final Map<HeaderMetadataType, String> blockHeader =
Collections.singletonMap(HeaderMetadataType.INSTANT_TIME, instantTime);
- final HoodieDeleteBlock block = new HoodieDeleteBlock(new
DeleteRecord[0], blockHeader);
+
+ final HoodieDeleteBlock block = new
HoodieDeleteBlock(Collections.emptyList(), false, blockHeader);
HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
.onParentPath(FSUtils.getPartitionPath(metadataWriteConfig.getBasePath(),
metadataPartition.getPartitionPath()))
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
index e104bef23fb..0423d29a5f4 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
@@ -134,7 +134,7 @@ public class TestLegacyArchivedMetaEntryReader {
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
wrapperSchema.toString());
final String keyField =
metaClient.getTableConfig().getRecordKeyFieldProp();
List<HoodieRecord> indexRecords =
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
- HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords,
header, keyField);
+ HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords, false,
header, keyField);
writer.appendBlock(block);
records.clear();
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
index af5d3e9a68d..bb87bc24e78 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
@@ -183,7 +183,8 @@ public class HoodieWriteableTestTable extends
HoodieMetadataTestTable {
LOG.warn("Failed to convert record " + r.toString(), e);
return null;
}
- }).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ }).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
+ false, header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
return Pair.of(partitionPath, logWriter.getLogFile());
}
}
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
index d4b4007bedb..f9a6c0d5ea1 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
@@ -246,14 +246,13 @@ public class TestFlinkHoodieBloomIndex extends
HoodieFlinkClientTestHarness {
List<String> uuids = asList(record1.getRecordKey(),
record2.getRecordKey(), record3.getRecordKey(), record4.getRecordKey());
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).build();
- HoodieFlinkTable table = HoodieFlinkTable.create(config, context,
metaClient);
- List<String> results = HoodieIndexUtils.filterKeysFromFile(
+ List<Pair<String, Long>> results = HoodieIndexUtils.filterKeysFromFile(
new Path(java.nio.file.Paths.get(basePath, partition,
filename).toString()), uuids, hadoopConf);
assertEquals(results.size(), 2);
- assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")
- || results.get(1).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0"));
- assertTrue(results.get(0).equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")
- || results.get(1).equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0"));
+
assertTrue(results.get(0).getLeft().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")
+ ||
results.get(1).getLeft().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0"));
+
assertTrue(results.get(0).getLeft().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")
+ ||
results.get(1).getLeft().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0"));
// TODO(vc): Need more coverage on actual filenames
// assertTrue(results.get(0)._2().equals(filename));
// assertTrue(results.get(1)._2().equals(filename));
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
index e9c5b6f6f5b..9ab96369d8e 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
@@ -145,7 +145,7 @@ public class HoodieFlinkWriteableTestTable extends
HoodieWriteableTestTable {
LOG.warn("Failed to convert record " + r.toString(), e);
return null;
}
- }).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
+ }).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
false, header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
return Pair.of(partitionPath, logWriter.getLogFile());
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
index 2b6a96b3d05..403eb35926c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
@@ -127,14 +127,15 @@ public class HoodieFileProbingFunction implements
// TODO add assertion that file is checked only once
final HoodieBaseFile dataFile = fileIDBaseFileMap.get(fileId);
- List<String> matchingKeys =
HoodieIndexUtils.filterKeysFromFile(new Path(dataFile.getPath()),
- candidateRecordKeys, hadoopConf.get());
+ List<Pair<String, Long>> matchingKeysAndPositions =
HoodieIndexUtils.filterKeysFromFile(
+ new Path(dataFile.getPath()), candidateRecordKeys,
hadoopConf.get());
LOG.debug(
String.format("Bloom filter candidates (%d) / false positives
(%d), actual matches (%d)",
- candidateRecordKeys.size(), candidateRecordKeys.size() -
matchingKeys.size(), matchingKeys.size()));
+ candidateRecordKeys.size(), candidateRecordKeys.size() -
matchingKeysAndPositions.size(),
+ matchingKeysAndPositions.size()));
- return new HoodieKeyLookupResult(fileId, partitionPath,
dataFile.getCommitTime(), matchingKeys);
+ return new HoodieKeyLookupResult(fileId, partitionPath,
dataFile.getCommitTime(), matchingKeysAndPositions);
})
.collect(Collectors.toList());
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
index 37ce8740af5..cd5f14d8fdf 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
@@ -173,10 +173,12 @@ public class SparkHoodieBloomIndexHelper extends
BaseHoodieBloomIndexHelper {
}
return HoodieJavaPairRDD.of(keyLookupResultRDD.flatMap(List::iterator)
- .filter(lr -> lr.getMatchingRecordKeys().size() > 0)
- .flatMapToPair(lookupResult ->
lookupResult.getMatchingRecordKeys().stream()
- .map(recordKey -> new Tuple2<>(new HoodieKey(recordKey,
lookupResult.getPartitionPath()),
- new HoodieRecordLocation(lookupResult.getBaseInstantTime(),
lookupResult.getFileId())))
+ .filter(lr -> lr.getMatchingRecordKeysAndPositions().size() > 0)
+ .flatMapToPair(lookupResult ->
lookupResult.getMatchingRecordKeysAndPositions().stream()
+ .map(recordKeyAndPosition -> new Tuple2<>(
+ new HoodieKey(recordKeyAndPosition.getLeft(),
lookupResult.getPartitionPath()),
+ new HoodieRecordLocation(lookupResult.getBaseInstantTime(),
lookupResult.getFileId(),
+ recordKeyAndPosition.getRight())))
.collect(Collectors.toList()).iterator()));
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index 2a22eacea8c..046fa98fbea 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -18,22 +18,23 @@
package org.apache.hudi.io.storage;
-import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.util.BaseFileUtils;
-import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.schema.MessageType;
@@ -80,7 +81,7 @@ public class HoodieSparkParquetReader implements
HoodieSparkFileReader {
}
@Override
- public Set<String> filterRowKeys(Set<String> candidateRowKeys) {
+ public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {
return parquetUtils.filterRowKeys(conf, path, candidateRowKeys);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 34e144dcb82..9edf1018582 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -330,15 +330,14 @@ public class TestHoodieBloomIndex extends
TestHoodieMetadataBase {
Arrays.asList(record1.getRecordKey(), record2.getRecordKey(),
record3.getRecordKey(), record4.getRecordKey());
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath).build();
- HoodieSparkTable table = HoodieSparkTable.create(config, context,
metaClient);
- List<String> results = HoodieIndexUtils.filterKeysFromFile(
+ List<Pair<String, Long>> results = HoodieIndexUtils.filterKeysFromFile(
new Path(Paths.get(basePath, partition, filename).toString()), uuids,
hadoopConf);
assertEquals(results.size(), 2);
- assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")
- || results.get(1).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0"));
- assertTrue(results.get(0).equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")
- || results.get(1).equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0"));
+
assertTrue(results.get(0).getLeft().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")
+ ||
results.get(1).getLeft().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0"));
+
assertTrue(results.get(0).getLeft().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")
+ ||
results.get(1).getLeft().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0"));
// TODO(vc): Need more coverage on actual filenames
// assertTrue(results.get(0)._2().equals(filename));
// assertTrue(results.get(1)._2().equals(filename));
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
index 3e2620c1e4b..1acfe309a2f 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieKeyLocationFetchHandle.java
@@ -147,8 +147,10 @@ public class TestHoodieKeyLocationFetchHandle extends
HoodieSparkClientTestHarne
String fileId =
testTable.addCommit(instantTime).getFileIdWithInserts(entry.getKey(),
recordsPerSlice.toArray(new HoodieRecord[0]));
Tuple2<String, String> fileIdInstantTimePair = new Tuple2<>(fileId,
instantTime);
List<Tuple2<HoodieKey, HoodieRecordLocation>> expectedEntries = new
ArrayList<>();
+ // record position
+ long position = 0;
for (HoodieRecord record : recordsPerSlice) {
- expectedEntries.add(new Tuple2<>(record.getKey(), new
HoodieRecordLocation(fileIdInstantTimePair._2, fileIdInstantTimePair._1)));
+ expectedEntries.add(new Tuple2<>(record.getKey(), new
HoodieRecordLocation(fileIdInstantTimePair._2, fileIdInstantTimePair._1,
position++)));
}
expectedList.put(new Tuple2<>(entry.getKey(),
fileIdInstantTimePair._1), expectedEntries);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 2a519d1334b..827c143d877 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -18,16 +18,17 @@
package org.apache.hudi.common.model;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.keygen.BaseKeyGenerator;
import javax.annotation.Nullable;
@@ -248,6 +249,13 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
return this.currentLocation != null;
}
+ public long getCurrentPosition() {
+ if (isCurrentLocationKnown()) {
+ return this.currentLocation.getPosition();
+ }
+ return HoodieRecordLocation.INVALID_POSITION;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
index 4121a334548..ed0e4cbe905 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
@@ -40,12 +40,18 @@ public final class HoodieRecordGlobalLocation extends
HoodieRecordLocation {
this.partitionPath = partitionPath;
}
+ public HoodieRecordGlobalLocation(String partitionPath, String instantTime,
String fileId, long position) {
+ super(instantTime, fileId, position);
+ this.partitionPath = partitionPath;
+ }
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("HoodieGlobalRecordLocation {");
sb.append("partitionPath=").append(partitionPath).append(", ");
sb.append("instantTime=").append(instantTime).append(", ");
- sb.append("fileId=").append(fileId);
+ sb.append("fileId=").append(fileId).append(", ");
+ sb.append("position=").append(position);
sb.append('}');
return sb.toString();
}
@@ -61,7 +67,8 @@ public final class HoodieRecordGlobalLocation extends
HoodieRecordLocation {
HoodieRecordGlobalLocation otherLoc = (HoodieRecordGlobalLocation) o;
return Objects.equals(partitionPath, otherLoc.partitionPath)
&& Objects.equals(instantTime, otherLoc.instantTime)
- && Objects.equals(fileId, otherLoc.fileId);
+ && Objects.equals(fileId, otherLoc.fileId)
+ && Objects.equals(position, otherLoc.position);
}
@Override
@@ -88,14 +95,14 @@ public final class HoodieRecordGlobalLocation extends
HoodieRecordLocation {
* Returns the record location as local.
*/
public HoodieRecordLocation toLocal(String instantTime) {
- return new HoodieRecordLocation(instantTime, fileId);
+ return new HoodieRecordLocation(instantTime, fileId, position);
}
/**
* Copy the location with given partition path.
*/
public HoodieRecordGlobalLocation copy(String partitionPath) {
- return new HoodieRecordGlobalLocation(partitionPath, instantTime, fileId);
+ return new HoodieRecordGlobalLocation(partitionPath, instantTime, fileId,
position);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
index 8b1dd2b378a..15f72918096 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
@@ -30,16 +30,26 @@ import java.util.Objects;
* Location of a HoodieRecord within the partition it belongs to. Ultimately,
this points to an actual file on disk
*/
public class HoodieRecordLocation implements Serializable, KryoSerializable {
+ public static final long INVALID_POSITION = -1L;
protected String instantTime;
protected String fileId;
+ // Position of the record in the file, e.g., row position starting from 0 in
the Parquet file
+ // Valid position should be non-negative. Negative position, i.e., -1, means
it's invalid
+ // and should not be used
+ protected long position;
public HoodieRecordLocation() {
}
public HoodieRecordLocation(String instantTime, String fileId) {
+ this(instantTime, fileId, INVALID_POSITION);
+ }
+
+ public HoodieRecordLocation(String instantTime, String fileId, long
position) {
this.instantTime = instantTime;
this.fileId = fileId;
+ this.position = position;
}
@Override
@@ -51,19 +61,21 @@ public class HoodieRecordLocation implements Serializable,
KryoSerializable {
return false;
}
HoodieRecordLocation otherLoc = (HoodieRecordLocation) o;
- return Objects.equals(instantTime, otherLoc.instantTime) &&
Objects.equals(fileId, otherLoc.fileId);
+ return Objects.equals(instantTime, otherLoc.instantTime) &&
Objects.equals(fileId, otherLoc.fileId)
+ && Objects.equals(position, otherLoc.position);
}
@Override
public int hashCode() {
- return Objects.hash(instantTime, fileId);
+ return Objects.hash(instantTime, fileId, position);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("HoodieRecordLocation {");
sb.append("instantTime=").append(instantTime).append(", ");
- sb.append("fileId=").append(fileId);
+ sb.append("fileId=").append(fileId).append(", ");
+ sb.append("position=").append(position);
sb.append('}');
return sb.toString();
}
@@ -84,15 +96,29 @@ public class HoodieRecordLocation implements Serializable,
KryoSerializable {
this.fileId = fileId;
}
+ public static boolean isPositionValid(long position) {
+ return position > INVALID_POSITION;
+ }
+
+ public long getPosition() {
+ return position;
+ }
+
+ public void setPosition(long position) {
+ this.position = position;
+ }
+
@Override
public void write(Kryo kryo, Output output) {
output.writeString(instantTime);
output.writeString(fileId);
+ output.writeLong(position);
}
@Override
public void read(Kryo kryo, Input input) {
this.instantTime = input.readString();
this.fileId = input.readString();
+ this.position = input.readLong();
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
index 0b1a1d5c84d..778a2d45cfd 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
@@ -42,6 +42,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -102,7 +103,7 @@ public class LogReaderUtils {
* generated from serializing {@link Roaring64NavigableMap} bitmap using the
portable format.
* @throws IOException upon I/O error.
*/
- public static String encodePositions(List<Long> positions) throws
IOException {
+ public static String encodePositions(Set<Long> positions) throws IOException
{
Roaring64NavigableMap positionBitmap = new Roaring64NavigableMap();
positions.forEach(positionBitmap::add);
return encodePositions(positionBitmap);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index c2e0aa50364..ca568c0f38c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -84,10 +84,11 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
}
public HoodieAvroDataBlock(@Nonnull List<HoodieRecord> records,
- @Nonnull Map<HeaderMetadataType, String> header,
- @Nonnull String keyField
+ boolean shouldWriteRecordPositions,
+ @Nonnull Map<HeaderMetadataType, String> header,
+ @Nonnull String keyField
) {
- super(records, header, new HashMap<>(), keyField);
+ super(records, shouldWriteRecordPositions, header, new HashMap<>(),
keyField);
}
@Override
@@ -225,7 +226,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
*/
@Deprecated
public HoodieAvroDataBlock(List<HoodieRecord> records, Schema schema) {
- super(records, Collections.singletonMap(HeaderMetadataType.SCHEMA,
schema.toString()), new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ super(records, false, Collections.singletonMap(HeaderMetadataType.SCHEMA,
schema.toString()), new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
public static HoodieAvroDataBlock getBlock(byte[] content, Schema
readerSchema) throws IOException {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
index 93bd41b88d0..f74a02e2d0b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
@@ -48,7 +48,7 @@ public class HoodieCDCDataBlock extends HoodieAvroDataBlock {
public HoodieCDCDataBlock(List<HoodieRecord> records,
Map<HeaderMetadataType, String> header,
String keyField) {
- super(records, header, keyField);
+ super(records, false, header, keyField);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index 7415c0931ee..2b3dfaf6a61 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -27,6 +27,8 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
@@ -38,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.model.HoodieRecordLocation.isPositionValid;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -51,6 +54,7 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkState;
* 3. Actual serialized content of the records
*/
public abstract class HoodieDataBlock extends HoodieLogBlock {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieDataBlock.class);
// TODO rebase records/content to leverage Either to warrant
// that they are mutex (used by read/write flows respectively)
@@ -64,6 +68,7 @@ public abstract class HoodieDataBlock extends HoodieLogBlock {
private final boolean enablePointLookups;
protected Schema readerSchema;
+ protected final boolean shouldWriteRecordPositions;
// Map of string schema to parsed schema.
private static ConcurrentHashMap<String, Schema> schemaMap = new
ConcurrentHashMap<>();
@@ -72,14 +77,31 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
* NOTE: This ctor is used on the write-path (ie when records ought to be
written into the log)
*/
public HoodieDataBlock(List<HoodieRecord> records,
+ boolean shouldWriteRecordPositions,
Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer,
String keyFieldName) {
super(header, footer, Option.empty(), Option.empty(), null, false);
+ if (shouldWriteRecordPositions) {
+ records.sort((o1, o2) -> {
+ long v1 = o1.getCurrentPosition();
+ long v2 = o2.getCurrentPosition();
+ return Long.compare(v1, v2);
+ });
+ if (isPositionValid(records.get(0).getCurrentPosition())) {
+ addRecordPositionsToHeader(
+
records.stream().map(HoodieRecord::getCurrentPosition).collect(Collectors.toSet()),
+ records.size());
+ } else {
+ LOG.warn("There are records without valid positions. "
+ + "Skip writing record positions to the data block header.");
+ }
+ }
this.records = Option.of(records);
this.keyFieldName = keyFieldName;
// If no reader-schema has been provided assume writer-schema as one
this.readerSchema = getWriterSchema(super.getLogBlockHeader());
+ this.shouldWriteRecordPositions = shouldWriteRecordPositions;
this.enablePointLookups = false;
}
@@ -96,6 +118,8 @@ public abstract class HoodieDataBlock extends HoodieLogBlock
{
String keyFieldName,
boolean enablePointLookups) {
super(headers, footer, blockContentLocation, content, inputStream,
readBlockLazily);
+ // Setting `shouldWriteRecordPositions` to false as this constructor is
only used by the reader
+ this.shouldWriteRecordPositions = false;
this.records = Option.empty();
this.keyFieldName = keyFieldName;
// If no reader-schema has been provided assume writer-schema as one
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
index 23ce76c5ef4..23c42b964d8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SerializationUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.util.Lazy;
@@ -37,6 +38,8 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -51,11 +54,13 @@ import java.util.stream.Collectors;
import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
+import static
org.apache.hudi.common.model.HoodieRecordLocation.isPositionValid;
/**
* Delete block contains a list of keys to be deleted from scanning the blocks
so far.
*/
public class HoodieDeleteBlock extends HoodieLogBlock {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieDeleteBlock.class);
/**
* These static builders are added to avoid performance issue in Avro 1.10.
* You can find more details in HoodieAvroUtils, HUDI-3834, and AVRO-3048.
@@ -65,17 +70,44 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
private static final Lazy<HoodieDeleteRecord.Builder>
HOODIE_DELETE_RECORD_BUILDER_STUB =
Lazy.lazily(HoodieDeleteRecord::newBuilder);
+ private final boolean shouldWriteRecordPositions;
+ // Records to delete, sorted based on the record position if writing record
position to the log block header
private DeleteRecord[] recordsToDelete;
- public HoodieDeleteBlock(DeleteRecord[] recordsToDelete,
Map<HeaderMetadataType, String> header) {
- this(Option.empty(), null, false, Option.empty(), header, new HashMap<>());
- this.recordsToDelete = recordsToDelete;
+ public HoodieDeleteBlock(List<Pair<DeleteRecord, Long>> recordsToDelete,
+ boolean shouldWriteRecordPositions,
+ Map<HeaderMetadataType, String> header) {
+ this(Option.empty(), null, false, Option.empty(), header, new HashMap<>(),
shouldWriteRecordPositions);
+ if (shouldWriteRecordPositions && !recordsToDelete.isEmpty()) {
+ recordsToDelete.sort((o1, o2) -> {
+ long v1 = o1.getRight();
+ long v2 = o2.getRight();
+ return Long.compare(v1, v2);
+ });
+ if (isPositionValid(recordsToDelete.get(0).getRight())) {
+ addRecordPositionsToHeader(
+
recordsToDelete.stream().map(Pair::getRight).collect(Collectors.toSet()),
+ recordsToDelete.size());
+ } else {
+ LOG.warn("There are delete records without valid positions. "
+ + "Skip writing record positions to the delete block header.");
+ }
+ }
+ this.recordsToDelete =
recordsToDelete.stream().map(Pair::getLeft).toArray(DeleteRecord[]::new);
}
public HoodieDeleteBlock(Option<byte[]> content, FSDataInputStream
inputStream, boolean readBlockLazily,
Option<HoodieLogBlockContentLocation>
blockContentLocation, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) {
+ // Setting `shouldWriteRecordPositions` to false as this constructor is
only used by the reader
+ this(content, inputStream, readBlockLazily, blockContentLocation, header,
footer, false);
+ }
+
+ HoodieDeleteBlock(Option<byte[]> content, FSDataInputStream inputStream,
boolean readBlockLazily,
+ Option<HoodieLogBlockContentLocation>
blockContentLocation, Map<HeaderMetadataType, String> header,
+ Map<HeaderMetadataType, String> footer, boolean
shouldWriteRecordPositions) {
super(header, footer, blockContentLocation, content, inputStream,
readBlockLazily);
+ this.shouldWriteRecordPositions = shouldWriteRecordPositions;
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 3f213881b8d..d00d864c112 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -93,7 +93,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
Map<HeaderMetadataType, String> header,
Compression.Algorithm compressionAlgorithm,
Path pathForReader) {
- super(records, header, new HashMap<>(),
HoodieAvroHFileReader.KEY_FIELD_NAME);
+ super(records, false, header, new HashMap<>(),
HoodieAvroHFileReader.KEY_FIELD_NAME);
this.compressionAlgorithm = Option.of(compressionAlgorithm);
this.pathForReader = pathForReader;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index 106c188f0f1..443bcab9b8e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -29,6 +29,8 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -40,6 +42,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -48,7 +51,7 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkState;
* Abstract class defining a block in HoodieLogFile.
*/
public abstract class HoodieLogBlock {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieLogBlock.class);
/**
* The current version of the log block. Anytime the logBlock format changes
this version needs to be bumped and
* corresponding changes need to be made to {@link HoodieLogBlockVersion}
TODO : Change this to a class, something
@@ -138,6 +141,20 @@ public abstract class HoodieLogBlock {
return
LogReaderUtils.decodeRecordPositionsHeader(logBlockHeader.get(HeaderMetadataType.RECORD_POSITIONS));
}
+ protected void addRecordPositionsToHeader(Set<Long> positionSet, int
numRecords) {
+ if (positionSet.size() == numRecords) {
+ try {
+ logBlockHeader.put(HeaderMetadataType.RECORD_POSITIONS,
LogReaderUtils.encodePositions(positionSet));
+ } catch (IOException e) {
+ LOG.error("Cannot write record positions to the log block header.", e);
+ }
+ } else {
+ LOG.warn("There are duplicate keys in the records (number of unique
positions: %s, "
+ + "number of records: %s). Skip writing record positions to the
log block header.",
+ positionSet.size(), numRecords);
+ }
+ }
+
/**
* Type of the log block WARNING: This enum is serialized as the ordinal.
Only add new enums at the end.
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index 796d7454cc5..c1ac0aba1d3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -78,13 +78,14 @@ public class HoodieParquetDataBlock extends HoodieDataBlock
{
}
public HoodieParquetDataBlock(List<HoodieRecord> records,
+ boolean shouldWriteRecordPositions,
Map<HeaderMetadataType, String> header,
String keyField,
CompressionCodecName compressionCodecName,
double expectedCompressionRatio,
boolean useDictionaryEncoding
) {
- super(records, header, new HashMap<>(), keyField);
+ super(records, shouldWriteRecordPositions, header, new HashMap<>(),
keyField);
this.compressionCodecName = Option.of(compressionCodecName);
this.expectedCompressionRatio = Option.of(expectedCompressionRatio);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
index d402f58a40a..be41857a38e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.BaseKeyGenerator;
@@ -39,6 +40,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* Utils for Hudi base file.
@@ -69,12 +71,14 @@ public abstract class BaseFileUtils {
/**
* Read the rowKey list from the given data file.
+ *
* @param filePath The data file path
* @param configuration configuration to build fs object
* @return Set Set of row keys
*/
public Set<String> readRowKeys(Configuration configuration, Path filePath) {
- return filterRowKeys(configuration, filePath, new HashSet<>());
+ return filterRowKeys(configuration, filePath, new HashSet<>())
+ .stream().map(Pair::getKey).collect(Collectors.toSet());
}
/**
@@ -162,21 +166,23 @@ public abstract class BaseFileUtils {
/**
* Read the rowKey list matching the given filter, from the given data file.
- * If the filter is empty, then this will return all the row keys.
+ * If the filter is empty, then this will return all the row keys and
corresponding positions.
+ *
* @param filePath The data file path
* @param configuration configuration to build fs object
* @param filter record keys filter
- * @return Set Set of row keys matching candidateRecordKeys
+ * @return Set Set of pairs of row key and position matching
candidateRecordKeys
*/
- public abstract Set<String> filterRowKeys(Configuration configuration, Path
filePath, Set<String> filter);
+ public abstract Set<Pair<String, Long>> filterRowKeys(Configuration
configuration, Path filePath, Set<String> filter);
/**
- * Fetch {@link HoodieKey}s from the given data file.
+ * Fetch {@link HoodieKey}s with positions from the given data file.
+ *
* @param configuration configuration to build fs object
* @param filePath The data file path
- * @return {@link List} of {@link HoodieKey}s fetched from the data file
+ * @return {@link List} of pairs of {@link HoodieKey} and position fetched
from the data file
*/
- public abstract List<HoodieKey> fetchHoodieKeys(Configuration configuration,
Path filePath);
+ public abstract List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(Configuration configuration, Path filePath);
/**
* Provides a closable iterator for reading the given data file.
@@ -196,13 +202,14 @@ public abstract class BaseFileUtils {
public abstract ClosableIterator<HoodieKey>
getHoodieKeyIterator(Configuration configuration, Path filePath);
/**
- * Fetch {@link HoodieKey}s from the given data file.
- * @param configuration configuration to build fs object
- * @param filePath The data file path
+ * Fetch {@link HoodieKey}s with positions from the given data file.
+ *
+ * @param configuration configuration to build fs object
+ * @param filePath The data file path
* @param keyGeneratorOpt instance of KeyGenerator.
- * @return {@link List} of {@link HoodieKey}s fetched from the data file
+ * @return {@link List} of pairs of {@link HoodieKey} and position fetched
from the data file
*/
- public abstract List<HoodieKey> fetchHoodieKeys(Configuration configuration,
Path filePath, Option<BaseKeyGenerator> keyGeneratorOpt);
+ public abstract List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(Configuration configuration, Path filePath,
Option<BaseKeyGenerator> keyGeneratorOpt);
/**
* Read the Avro schema of the data file.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
index dfbb80cfb63..50fa80213c3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrcUtils.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.MetadataNotFoundException;
@@ -107,7 +108,7 @@ public class OrcUtils extends BaseFileUtils {
* @return {@link List} of {@link HoodieKey}s fetched from the ORC file
*/
@Override
- public List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path
filePath) {
+ public List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(Configuration configuration, Path filePath) {
try {
if (!filePath.getFileSystem(configuration).exists(filePath)) {
return Collections.emptyList();
@@ -115,15 +116,19 @@ public class OrcUtils extends BaseFileUtils {
} catch (IOException e) {
throw new HoodieIOException("Failed to read from ORC file:" + filePath,
e);
}
- List<HoodieKey> hoodieKeys = new ArrayList<>();
- try (ClosableIterator<HoodieKey> iterator =
getHoodieKeyIterator(configuration, filePath, Option.empty())) {
- iterator.forEachRemaining(hoodieKeys::add);
+ List<Pair<HoodieKey, Long>> hoodieKeysAndPositions = new ArrayList<>();
+ long position = 0;
+ try (ClosableIterator<HoodieKey> iterator =
getHoodieKeyIterator(configuration, filePath, Option.empty())) {
+ while (iterator.hasNext()) {
+ hoodieKeysAndPositions.add(Pair.of(iterator.next(), position));
+ position++;
+ }
}
- return hoodieKeys;
+ return hoodieKeysAndPositions;
}
@Override
- public List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path
filePath, Option<BaseKeyGenerator> keyGeneratorOpt) {
+ public List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(Configuration configuration, Path filePath,
Option<BaseKeyGenerator> keyGeneratorOpt) {
throw new UnsupportedOperationException("Custom key generator is not
supported yet");
}
@@ -171,18 +176,19 @@ public class OrcUtils extends BaseFileUtils {
* Read the rowKey list matching the given filter, from the given ORC file.
If the filter is empty, then this will
* return all the rowkeys.
*
- * @param conf configuration to build fs object.
- * @param filePath The ORC file path.
- * @param filter record keys filter
- * @return Set Set of row keys matching candidateRecordKeys
+ * @param conf configuration to build fs object.
+ * @param filePath The ORC file path.
+ * @param filter record keys filter
+ * @return Set Set of pairs of row key and position matching
candidateRecordKeys
*/
@Override
- public Set<String> filterRowKeys(Configuration conf, Path filePath,
Set<String> filter)
+ public Set<Pair<String, Long>> filterRowKeys(Configuration conf, Path
filePath, Set<String> filter)
throws HoodieIOException {
+ long rowPosition = 0;
try (Reader reader = OrcFile.createReader(filePath,
OrcFile.readerOptions(conf));) {
TypeDescription schema = reader.getSchema();
try (RecordReader recordReader = reader.rows(new
Options(conf).schema(schema))) {
- Set<String> filteredRowKeys = new HashSet<>();
+ Set<Pair<String, Long>> filteredRowKeys = new HashSet<>();
List<String> fieldNames = schema.getFieldNames();
VectorizedRowBatch batch = schema.createRowBatch();
@@ -202,8 +208,9 @@ public class OrcUtils extends BaseFileUtils {
for (int i = 0; i < batch.size; i++) {
String rowKey = rowKeys.toString(i);
if (filter.isEmpty() || filter.contains(rowKey)) {
- filteredRowKeys.add(rowKey);
+ filteredRowKeys.add(Pair.of(rowKey, rowPosition));
}
+ rowPosition++;
}
}
return filteredRowKeys;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index de5572523c1..2decdd3fc5e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.MetadataNotFoundException;
import org.apache.hudi.keygen.BaseKeyGenerator;
@@ -74,15 +75,15 @@ public class ParquetUtils extends BaseFileUtils {
/**
* Read the rowKey list matching the given filter, from the given parquet
file. If the filter is empty, then this will
- * return all the rowkeys.
+ * return all the rowkeys and corresponding positions.
*
* @param filePath The parquet file path.
* @param configuration configuration to build fs object
* @param filter record keys filter
- * @return Set Set of row keys matching candidateRecordKeys
+ * @return Set Set of pairs of row key and position matching
candidateRecordKeys
*/
@Override
- public Set<String> filterRowKeys(Configuration configuration, Path filePath,
Set<String> filter) {
+ public Set<Pair<String, Long>> filterRowKeys(Configuration configuration,
Path filePath, Set<String> filter) {
return filterParquetRowKeys(configuration, filePath, filter,
HoodieAvroUtils.getRecordKeySchema());
}
@@ -105,10 +106,10 @@ public class ParquetUtils extends BaseFileUtils {
* @param configuration configuration to build fs object
* @param filter record keys filter
* @param readSchema schema of columns to be read
- * @return Set Set of row keys matching candidateRecordKeys
+ * @return Set Set of pairs of row key and position matching
candidateRecordKeys
*/
- private static Set<String> filterParquetRowKeys(Configuration configuration,
Path filePath, Set<String> filter,
- Schema readSchema) {
+ private static Set<Pair<String, Long>> filterParquetRowKeys(Configuration
configuration, Path filePath, Set<String> filter,
+ Schema
readSchema) {
Option<RecordKeysFilterFunction> filterFunction = Option.empty();
if (filter != null && !filter.isEmpty()) {
filterFunction = Option.of(new RecordKeysFilterFunction(filter));
@@ -117,17 +118,19 @@ public class ParquetUtils extends BaseFileUtils {
conf.addResource(FSUtils.getFs(filePath.toString(), conf).getConf());
AvroReadSupport.setAvroReadSchema(conf, readSchema);
AvroReadSupport.setRequestedProjection(conf, readSchema);
- Set<String> rowKeys = new HashSet<>();
+ Set<Pair<String, Long>> rowKeys = new HashSet<>();
+ long rowPosition = 0;
try (ParquetReader reader =
AvroParquetReader.builder(filePath).withConf(conf).build()) {
Object obj = reader.read();
while (obj != null) {
if (obj instanceof GenericRecord) {
String recordKey = ((GenericRecord)
obj).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
if (!filterFunction.isPresent() ||
filterFunction.get().apply(recordKey)) {
- rowKeys.add(recordKey);
+ rowKeys.add(Pair.of(recordKey, rowPosition));
}
+ obj = reader.read();
+ rowPosition++;
}
- obj = reader.read();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to read row keys from Parquet " +
filePath, e);
@@ -138,15 +141,15 @@ public class ParquetUtils extends BaseFileUtils {
}
/**
- * Fetch {@link HoodieKey}s from the given parquet file.
+ * Fetch {@link HoodieKey}s with row positions from the given parquet file.
*
* @param filePath The parquet file path.
* @param configuration configuration to build fs object
- * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
+ * @return {@link List} of pairs of {@link HoodieKey} and row position
fetched from the parquet file
*/
@Override
- public List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path
filePath) {
- return fetchHoodieKeys(configuration, filePath, Option.empty());
+ public List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(Configuration configuration, Path filePath) {
+ return fetchRecordKeysWithPositions(configuration, filePath,
Option.empty());
}
@Override
@@ -185,19 +188,23 @@ public class ParquetUtils extends BaseFileUtils {
}
/**
- * Fetch {@link HoodieKey}s from the given parquet file.
+ * Fetch {@link HoodieKey}s with row positions from the given parquet file.
*
* @param configuration configuration to build fs object
* @param filePath The parquet file path.
* @param keyGeneratorOpt instance of KeyGenerator.
- * @return {@link List} of {@link HoodieKey}s fetched from the parquet file
+ * @return {@link List} of pairs of {@link HoodieKey} and row position
fetched from the parquet file
*/
@Override
- public List<HoodieKey> fetchHoodieKeys(Configuration configuration, Path
filePath, Option<BaseKeyGenerator> keyGeneratorOpt) {
- List<HoodieKey> hoodieKeys = new ArrayList<>();
+ public List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(Configuration configuration, Path filePath,
Option<BaseKeyGenerator> keyGeneratorOpt) {
+ List<Pair<HoodieKey, Long>> hoodieKeysAndPositions = new ArrayList<>();
+ long position = 0;
try (ClosableIterator<HoodieKey> iterator =
getHoodieKeyIterator(configuration, filePath, keyGeneratorOpt)) {
- iterator.forEachRemaining(hoodieKeys::add);
- return hoodieKeys;
+ while (iterator.hasNext()) {
+ hoodieKeysAndPositions.add(Pair.of(iterator.next(), position));
+ position++;
+ }
+ return hoodieKeysAndPositions;
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
index b1ff386f120..af08cc85ef5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroHFileReader.java
@@ -24,10 +24,12 @@ import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -58,13 +60,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
+import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.CollectionUtils.toStream;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
-import static org.apache.hudi.common.util.ValidationUtils.checkState;
/**
* NOTE: PLEASE READ DOCS & COMMENTS CAREFULLY BEFORE MAKING CHANGES
@@ -190,9 +192,9 @@ public class HoodieAvroHFileReader extends
HoodieAvroFileReaderBase implements H
* @return Subset of candidate keys that are available
*/
@Override
- public Set<String> filterRowKeys(Set<String> candidateRowKeys) {
- checkState(candidateRowKeys instanceof TreeSet,
- String.format("HFile reader expects a TreeSet as iterating over
ordered keys is more performant, got (%s)",
candidateRowKeys.getClass().getSimpleName()));
+ public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {
+ // candidateRowKeys must be sorted
+ SortedSet<String> sortedCandidateRowKeys = new TreeSet<>(candidateRowKeys);
synchronized (sharedLock) {
if (!sharedScanner.isPresent()) {
@@ -200,14 +202,18 @@ public class HoodieAvroHFileReader extends
HoodieAvroFileReaderBase implements H
// by default, to minimize amount of traffic to the underlying storage
sharedScanner = Option.of(getHFileScanner(getSharedHFileReader(),
true));
}
- return candidateRowKeys.stream().filter(k -> {
- try {
- return isKeyAvailable(k, sharedScanner.get());
- } catch (IOException e) {
- LOG.error("Failed to check key availability: " + k);
- return false;
- }
- }).collect(Collectors.toSet());
+ return sortedCandidateRowKeys.stream()
+ .filter(k -> {
+ try {
+ return isKeyAvailable(k, sharedScanner.get());
+ } catch (IOException e) {
+ LOG.error("Failed to check key availability: " + k);
+ return false;
+ }
+ })
+ // Record position is not supported for HFile
+ .map(key -> Pair.of(key, HoodieRecordLocation.INVALID_POSITION))
+ .collect(Collectors.toSet());
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
index 00ba9fc3bb0..2af8eaa4a46 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroOrcReader.java
@@ -22,8 +22,9 @@ import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.hudi.common.util.BaseFileUtils;
-import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.OrcReaderIterator;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.avro.Schema;
@@ -67,7 +68,7 @@ public class HoodieAvroOrcReader extends
HoodieAvroFileReaderBase {
}
@Override
- public Set<String> filterRowKeys(Set candidateRowKeys) {
+ public Set<Pair<String, Long>> filterRowKeys(Set candidateRowKeys) {
return orcUtils.filterRowKeys(conf, path, candidateRowKeys);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
index 3dd070fa0a7..912ccb28964 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroParquetReader.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetReaderIterator;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -85,7 +86,7 @@ public class HoodieAvroParquetReader extends
HoodieAvroFileReaderBase {
}
@Override
- public Set<String> filterRowKeys(Set<String> candidateRowKeys) {
+ public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {
return parquetUtils.filterRowKeys(conf, path, candidateRowKeys);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java
index b22682be46e..7ac30547a1a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBootstrapFileReader.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.avro.Schema;
@@ -55,7 +56,7 @@ public abstract class HoodieBootstrapFileReader<T> implements
HoodieFileReader<T
}
@Override
- public Set<String> filterRowKeys(Set<String> candidateRowKeys) {
+ public Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys) {
return skeletonFileReader.filterRowKeys(candidateRowKeys);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
index 00fff9a220c..77eaef98d33 100644
--- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
@@ -18,10 +18,12 @@
package org.apache.hudi.io.storage;
-import org.apache.avro.Schema;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
import java.io.IOException;
import java.util.Set;
@@ -45,7 +47,7 @@ public interface HoodieFileReader<T> extends AutoCloseable {
BloomFilter readBloomFilter();
- Set<String> filterRowKeys(Set<String> candidateRowKeys);
+ Set<Pair<String, Long>> filterRowKeys(Set<String> candidateRowKeys);
ClosableIterator<HoodieRecord<T>> getRecordIterator(Schema readerSchema,
Schema requestedSchema) throws IOException;
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 79031d643d9..199b71ab95f 100755
---
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -60,6 +60,7 @@ import
org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.CorruptedLogFileException;
import org.apache.hudi.exception.HoodieIOException;
@@ -1395,7 +1396,11 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.collect(Collectors.toList());
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
- HoodieDeleteBlock deleteBlock = new
HoodieDeleteBlock(deletedRecords.toArray(new DeleteRecord[50]), header);
+ List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+ for (DeleteRecord dr : deletedRecords) {
+ deleteRecordList.add(Pair.of(dr, -1L));
+ }
+ HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
false, header);
writer.appendBlock(deleteBlock);
List<String> allLogFiles =
@@ -1533,9 +1538,12 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.collect(Collectors.toList()).subList(0, 50);
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
- HoodieDeleteBlock deleteBlock = new
HoodieDeleteBlock(deletedKeys.stream().map(deletedKey ->
- DeleteRecord.create(deletedKey.getRecordKey(),
deletedKey.getPartitionPath()))
- .collect(Collectors.toList()).toArray(new DeleteRecord[0]), header);
+ List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+ for (HoodieKey deletedKey : deletedKeys) {
+ deleteRecordList.add(Pair.of(
+ DeleteRecord.create(deletedKey.getRecordKey(),
deletedKey.getPartitionPath()), -1L));
+ }
+ HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
false, header);
writer.appendBlock(deleteBlock);
List<String> allLogFiles =
@@ -1553,10 +1561,12 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
// Recreate the delete block which should have been removed from
consideration because of rollback block next to it.
Map<HoodieLogBlock.HeaderMetadataType, String> deleteBlockHeader = new
HashMap<>();
deleteBlockHeader.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME,
"102");
- deleteBlock = new HoodieDeleteBlock(
- deletedKeys.stream().map(deletedKey ->
- DeleteRecord.create(deletedKey.getRecordKey(),
deletedKey.getPartitionPath()))
- .collect(Collectors.toList()).toArray(new DeleteRecord[0]),
deleteBlockHeader);
+ deleteRecordList = new ArrayList<>();
+ for (HoodieKey deletedKey : deletedKeys) {
+ deleteRecordList.add(Pair.of(
+ DeleteRecord.create(deletedKey.getRecordKey(),
deletedKey.getPartitionPath()), -1L));
+ }
+ deleteBlock = new HoodieDeleteBlock(deleteRecordList, false,
deleteBlockHeader);
writer.appendBlock(deleteBlock);
FileCreateUtils.createDeltaCommit(basePath, "102", fs);
@@ -1644,16 +1654,24 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.collect(Collectors.toList());
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
- HoodieDeleteBlock deleteBlock1 = new
HoodieDeleteBlock(deleteRecords1.toArray(new DeleteRecord[0]), header);
+
+ List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+ for (DeleteRecord dr : deleteRecords1) {
+ deleteRecordList.add(Pair.of(dr, -1L));
+ }
+ HoodieDeleteBlock deleteBlock1 = new HoodieDeleteBlock(deleteRecordList,
false, header);
writer.appendBlock(deleteBlock1);
// Delete another 10 keys with -1 as orderingVal.
// The deletion should not work
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
- HoodieDeleteBlock deleteBlock2 = new
HoodieDeleteBlock(copyOfRecords1.subList(10, 20).stream()
- .map(s -> (DeleteRecord.create(((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
- ((GenericRecord)
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(),
-1))).toArray(DeleteRecord[]::new), header);
+ deleteRecordList = copyOfRecords1.subList(10, 20).stream()
+ .map(s -> Pair.of(DeleteRecord.create(((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+ ((GenericRecord)
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString(), -1), -1L))
+ .collect(Collectors.toList());
+
+ HoodieDeleteBlock deleteBlock2 = new HoodieDeleteBlock(deleteRecordList,
false, header);
writer.appendBlock(deleteBlock2);
// Delete another 10 keys with +1 as orderingVal.
@@ -1664,7 +1682,11 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.collect(Collectors.toList());
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "104");
- HoodieDeleteBlock deleteBlock3 = new
HoodieDeleteBlock(deletedRecords3.toArray(new DeleteRecord[0]), header);
+ deleteRecordList.clear();
+ for (DeleteRecord dr : deletedRecords3) {
+ deleteRecordList.add(Pair.of(dr, -1L));
+ }
+ HoodieDeleteBlock deleteBlock3 = new HoodieDeleteBlock(deleteRecordList,
false, header);
writer.appendBlock(deleteBlock3);
List<String> allLogFiles =
@@ -1760,7 +1782,11 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
((GenericRecord)
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
.collect(Collectors.toList()).subList(0, 50);
- HoodieDeleteBlock deleteBlock = new
HoodieDeleteBlock(deleteRecords.toArray(new DeleteRecord[50]), header);
+ List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+ for (DeleteRecord dr : deleteRecords) {
+ deleteRecordList.add(Pair.of(dr, -1L));
+ }
+ HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
false, header);
writer.appendBlock(deleteBlock);
FileCreateUtils.createDeltaCommit(basePath, "100", fs);
@@ -1817,7 +1843,12 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.map(s -> (DeleteRecord.create(((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
((GenericRecord)
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
.collect(Collectors.toList()).subList(0, 50);
- HoodieDeleteBlock deleteBlock = new
HoodieDeleteBlock(deleteRecords.toArray(new DeleteRecord[50]), header);
+
+ List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+ for (DeleteRecord dr : deleteRecords) {
+ deleteRecordList.add(Pair.of(dr, -1L));
+ }
+ HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
false, header);
writer.appendBlock(deleteBlock);
FileCreateUtils.createDeltaCommit(basePath, "100", fs);
@@ -1906,7 +1937,11 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.map(s -> (DeleteRecord.create(((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
((GenericRecord)
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
.collect(Collectors.toList()).subList(0, 50);
- HoodieDeleteBlock deleteBlock = new
HoodieDeleteBlock(deleteRecords.toArray(new DeleteRecord[50]), header);
+ List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+ for (DeleteRecord dr : deleteRecords) {
+ deleteRecordList.add(Pair.of(dr, -1L));
+ }
+ HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
false, header);
writer.appendBlock(deleteBlock);
FileCreateUtils.createDeltaCommit(basePath, "100", fs);
@@ -1986,7 +2021,11 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.map(s -> (DeleteRecord.create(((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
((GenericRecord)
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
.collect(Collectors.toList()).subList(0, 60);
- writer.appendBlock(new HoodieDeleteBlock(deletedRecords.toArray(new
DeleteRecord[0]), header));
+ List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+ for (DeleteRecord dr : deletedRecords) {
+ deleteRecordList.add(Pair.of(dr, -1L));
+ }
+ writer.appendBlock(new HoodieDeleteBlock(deleteRecordList, false, header));
copyOfRecords2.addAll(copyOfRecords1);
List<String> originalKeys =
@@ -2794,13 +2833,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@ValueSource(booleans = {false, true})
public void testGetRecordPositions(boolean addRecordPositionsHeader) throws
IOException {
Map<HeaderMetadataType, String> header = new HashMap<>();
- List<Long> positions = new ArrayList<>();
+ Set<Long> positions = new HashSet<>();
if (addRecordPositionsHeader) {
positions = TestLogReaderUtils.generatePositions();
String content = LogReaderUtils.encodePositions(positions);
header.put(HeaderMetadataType.RECORD_POSITIONS, content);
}
- HoodieLogBlock logBlock = new HoodieDeleteBlock(new DeleteRecord[0],
header);
+ HoodieLogBlock logBlock = new HoodieDeleteBlock(Collections.emptyList(),
addRecordPositionsHeader, header);
if (addRecordPositionsHeader) {
TestLogReaderUtils.assertPositionEquals(positions,
logBlock.getRecordPositions());
}
@@ -2817,11 +2856,11 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
case CDC_DATA_BLOCK:
return new HoodieCDCDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
case AVRO_DATA_BLOCK:
- return new HoodieAvroDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ return new HoodieAvroDataBlock(records, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
case HFILE_DATA_BLOCK:
return new HoodieHFileDataBlock(records, header,
Compression.Algorithm.GZ, pathForReader);
case PARQUET_DATA_BLOCK:
- return new HoodieParquetDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP, 0.1, true);
+ return new HoodieParquetDataBlock(records, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP, 0.1, true);
default:
throw new RuntimeException("Unknown data block type " + dataBlockType);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
index 83a439c3ad1..da3deeb2da4 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
-
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
@@ -109,7 +108,7 @@ public class TestHoodieLogFormatAppendFailure {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(2);
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
getSimpleSchema().toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, false,
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits")
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestLogReaderUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestLogReaderUtils.java
index fd8e3a5cd28..f06599fbe74 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestLogReaderUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestLogReaderUtils.java
@@ -23,7 +23,6 @@ import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
@@ -42,7 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
public class TestLogReaderUtils {
@Test
public void testEncodeAndDecodePositions() throws IOException {
- List<Long> positions = generatePositions();
+ Set<Long> positions = generatePositions();
String content = LogReaderUtils.encodePositions(positions);
Roaring64NavigableMap roaring64NavigableMap =
LogReaderUtils.decodeRecordPositionsHeader(content);
assertPositionEquals(positions, roaring64NavigableMap);
@@ -51,7 +50,7 @@ public class TestLogReaderUtils {
@Test
public void testEncodeBitmapAndDecodePositions() throws IOException {
Roaring64NavigableMap positionBitmap = new Roaring64NavigableMap();
- List<Long> positions = generatePositions();
+ Set<Long> positions = generatePositions();
positions.forEach(positionBitmap::add);
String content = LogReaderUtils.encodePositions(positionBitmap);
Roaring64NavigableMap roaring64NavigableMap =
LogReaderUtils.decodeRecordPositionsHeader(content);
@@ -60,25 +59,25 @@ public class TestLogReaderUtils {
@Test
public void testCompatibilityOfDecodingPositions() throws IOException {
- List<Long> expectedPositions = Arrays.stream(
+ Set<Long> expectedPositions = Arrays.stream(
readLastLineFromResourceFile("/format/expected_record_positions.data").split(","))
- .map(Long::parseLong).collect(Collectors.toList());
+ .map(Long::parseLong).collect(Collectors.toSet());
String content =
readLastLineFromResourceFile("/format/record_positions_header_v3.data");
Roaring64NavigableMap roaring64NavigableMap =
LogReaderUtils.decodeRecordPositionsHeader(content);
assertPositionEquals(expectedPositions, roaring64NavigableMap);
}
- public static List<Long> generatePositions() {
+ public static Set<Long> generatePositions() {
Random random = new Random(0x2023);
Set<Long> positions = new HashSet<>();
while (positions.size() < 1000) {
long pos = Math.abs(random.nextLong() % 1_000_000_000_000L);
positions.add(pos);
}
- return new ArrayList<>(positions);
+ return positions;
}
- public static void assertPositionEquals(List<Long> expectedPositions,
+ public static void assertPositionEquals(Set<Long> expectedPositions,
Roaring64NavigableMap
roaring64NavigableMap) {
List<Long> sortedExpectedPositions =
expectedPositions.stream().sorted().collect(Collectors.toList());
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
index ccba018e64f..7441f8cdd41 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.table.log.block;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -32,9 +33,11 @@ import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDate;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -116,7 +119,11 @@ public class TestHoodieDeleteBlock {
}
public void testDeleteBlockWithValidation(DeleteRecord[] deleteRecords)
throws IOException {
- HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecords, new
HashMap<>());
+ List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+ for (DeleteRecord dr : deleteRecords) {
+ deleteRecordList.add(Pair.of(dr, -1L));
+ }
+ HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
false, new HashMap<>());
byte[] contentBytes = deleteBlock.getContentBytes();
HoodieDeleteBlock deserializeDeleteBlock = new HoodieDeleteBlock(
Option.of(contentBytes), null, true, Option.empty(), new HashMap<>(),
new HashMap<>());
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
index c29e9275bbc..fec7dc7fd32 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.avro.JsonProperties;
@@ -120,13 +121,13 @@ public class TestParquetUtils extends
HoodieCommonTestHarness {
writeParquetFile(typeCode, filePath, rowKeys);
// Read and verify
- Set<String> filtered =
+ Set<Pair<String, Long>> filtered =
parquetUtils.filterRowKeys(HoodieTestUtils.getDefaultHadoopConf(), new
Path(filePath), filter);
assertEquals(filter.size(), filtered.size(), "Filtered count does not
match");
- for (String rowKey : filtered) {
- assertTrue(filter.contains(rowKey), "filtered key must be in the given
filter");
+ for (Pair<String, Long> rowKeyAndPosition : filtered) {
+ assertTrue(filter.contains(rowKeyAndPosition.getLeft()), "filtered key
must be in the given filter");
}
}
@@ -147,12 +148,12 @@ public class TestParquetUtils extends
HoodieCommonTestHarness {
writeParquetFile(typeCode, filePath, rowKeys, schema, true, partitionPath);
// Read and verify
- List<HoodieKey> fetchedRows =
- parquetUtils.fetchHoodieKeys(HoodieTestUtils.getDefaultHadoopConf(),
new Path(filePath));
+ List<Pair<HoodieKey, Long>> fetchedRows =
+
parquetUtils.fetchRecordKeysWithPositions(HoodieTestUtils.getDefaultHadoopConf(),
new Path(filePath));
assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not
match");
- for (HoodieKey entry : fetchedRows) {
- assertTrue(expected.contains(entry), "Record key must be in the given
filter");
+ for (Pair<HoodieKey, Long> entry : fetchedRows) {
+ assertTrue(expected.contains(entry.getLeft()), "Record key must be in
the given filter");
}
}
@@ -168,18 +169,18 @@ public class TestParquetUtils extends
HoodieCommonTestHarness {
}
String filePath = Paths.get(basePath, "test.parquet").toUri().toString();
- Schema schema = getSchemaWithFields(Arrays.asList(new String[]{"abc",
"def"}));
+ Schema schema = getSchemaWithFields(Arrays.asList(new String[] {"abc",
"def"}));
writeParquetFile(BloomFilterTypeCode.SIMPLE.name(), filePath, rowKeys,
schema, true, partitionPath,
false, "abc", "def");
// Read and verify
- List<HoodieKey> fetchedRows =
- parquetUtils.fetchHoodieKeys(HoodieTestUtils.getDefaultHadoopConf(),
new Path(filePath),
- Option.of(new TestBaseKeyGen("abc","def")));
+ List<Pair<HoodieKey, Long>> fetchedRows =
+
parquetUtils.fetchRecordKeysWithPositions(HoodieTestUtils.getDefaultHadoopConf(),
new Path(filePath),
+ Option.of(new TestBaseKeyGen("abc", "def")));
assertEquals(rowKeys.size(), fetchedRows.size(), "Total count does not
match");
- for (HoodieKey entry : fetchedRows) {
- assertTrue(expected.contains(entry), "Record key must be in the given
filter");
+ for (Pair<HoodieKey, Long> entry : fetchedRows) {
+ assertTrue(expected.contains(entry.getLeft()), "Record key must be in
the given filter");
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
index c2e992223f4..a4d7cb29023 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
@@ -21,15 +21,18 @@ package org.apache.hudi.common.util;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.avro.util.Utf8;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
+import java.util.List;
import java.util.Objects;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -69,7 +72,9 @@ public class TestSerializationUtils {
@Test
public void testClassFullyQualifiedNameSerialization() throws IOException {
DeleteRecord deleteRecord = DeleteRecord.create(new HoodieKey("key",
"partition"));
- HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(new
DeleteRecord[]{deleteRecord}, Collections.emptyMap());
+ List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+ deleteRecordList.add(Pair.of(deleteRecord, -1L));
+ HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecordList,
false, Collections.emptyMap());
byte[] firstBytes = SerializationUtils.serialize(deleteBlock);
byte[] secondBytes = SerializationUtils.serialize(deleteBlock);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
index f6e0fa8f416..d8923d815b3 100644
---
a/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieReaderWriterBase.java
@@ -23,6 +23,8 @@ import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
@@ -242,10 +244,14 @@ public abstract class TestHoodieReaderWriterBase {
private void verifyFilterRowKeys(HoodieAvroFileReader hoodieReader) {
Set<String> candidateRowKeys = IntStream.range(40, NUM_RECORDS * 2)
.mapToObj(i -> "key" + String.format("%02d",
i)).collect(Collectors.toCollection(TreeSet::new));
- List<String> expectedKeys = IntStream.range(40, NUM_RECORDS)
- .mapToObj(i -> "key" + String.format("%02d",
i)).sorted().collect(Collectors.toList());
- assertEquals(expectedKeys, hoodieReader.filterRowKeys(candidateRowKeys)
- .stream().sorted().collect(Collectors.toList()));
+ Set<Pair<String, Long>> expectedKeys = IntStream.range(40, NUM_RECORDS)
+ .mapToObj(i -> {
+ // record position is not written for HFile format
+ long position = hoodieReader instanceof HoodieAvroHFileReader ?
HoodieRecordLocation.INVALID_POSITION : (long) i;
+ String key = "key" + String.format("%02d", i);
+ return Pair.of(key, position);
+ }).collect(Collectors.toSet());
+ assertEquals(expectedKeys, hoodieReader.filterRowKeys(candidateRowKeys));
}
private void verifyReaderWithSchema(String schemaPath, HoodieAvroFileReader
hoodieReader) throws IOException {
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index 4207e3bf113..9ebe2660d2e 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -400,9 +400,9 @@ public class InputFormatTestUtil {
dataBlock = new HoodieHFileDataBlock(
hoodieRecords, header, Compression.Algorithm.GZ,
writer.getLogFile().getPath());
} else if (logBlockType ==
HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) {
- dataBlock = new HoodieParquetDataBlock(hoodieRecords, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP, 0.1, true);
+ dataBlock = new HoodieParquetDataBlock(hoodieRecords, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD, CompressionCodecName.GZIP, 0.1, true);
} else {
- dataBlock = new HoodieAvroDataBlock(hoodieRecords, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ dataBlock = new HoodieAvroDataBlock(hoodieRecords, false, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
}
writer.appendBlock(dataBlock);
return writer;
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
index c6b0560b87e..46bd78fd775 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieSparkMergeOnReadTableClustering.java
@@ -155,16 +155,18 @@ class TestHoodieSparkMergeOnReadTableClustering extends
SparkClientFunctionalTes
private static Stream<Arguments> testClusteringWithNoBaseFiles() {
return Stream.of(
- Arguments.of(true, true),
- Arguments.of(true, false),
- Arguments.of(false, true),
- Arguments.of(false, false)
+ Arguments.of(true, true, false),
+ Arguments.of(true, false, false),
+ Arguments.of(false, true, false),
+ Arguments.of(false, false, false),
+ // do updates with file slice having no base files and write record
positions in log blocks
+ Arguments.of(true, true, true)
);
}
@ParameterizedTest
@MethodSource
- void testClusteringWithNoBaseFiles(boolean clusteringAsRow, boolean
doUpdates) throws Exception {
+ void testClusteringWithNoBaseFiles(boolean clusteringAsRow, boolean
doUpdates, boolean shouldWriteRecordPositions) throws Exception {
// set low compaction small File Size to generate more file groups.
HoodieWriteConfig.Builder cfgBuilder = HoodieWriteConfig.newBuilder()
.forTable("test-trip-table")
@@ -185,6 +187,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends
SparkClientFunctionalTes
// set index type to INMEMORY so that log files can be indexed, and it
is safe to send
// inserts straight to the log to produce file slices with only log
files and no data files
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+ .withWriteRecordPositionsEnabled(shouldWriteRecordPositions)
.withClusteringConfig(HoodieClusteringConfig.newBuilder()
.withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index a1b4f3e307e..300c9ab877b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -138,6 +138,8 @@ class TestMORDataSourceStorage extends
SparkClientFunctionalTestHarness {
"hoodie.upsert.shuffle.parallelism" -> "4",
"hoodie.bulkinsert.shuffle.parallelism" -> "2",
"hoodie.delete.shuffle.parallelism" -> "1",
+ "hoodie.merge.small.file.group.candidates.limit" -> "0",
+ HoodieWriteConfig.WRITE_RECORD_POSITIONS.key -> "true",
DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition_path",
DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index d8a3a01e72e..8958f9de92d 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -521,7 +521,7 @@ public class HiveTestUtil {
Map<HeaderMetadataType, String> header = new HashMap<>(2);
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME,
dataFile.getCommitTime());
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
- HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, false,
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
logWriter.appendBlock(dataBlock);
logWriter.close();
return logWriter.getLogFile();