This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d2ea5236458 [HUDI-9311] Revert HUDI-7146 which causes perf overhead
for merging MDT log files (#13136)
d2ea5236458 is described below
commit d2ea523645856dd0cc1873caa7aabd89f6052e14
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Apr 14 04:21:03 2025 -0700
[HUDI-9311] Revert HUDI-7146 which causes perf overhead for merging MDT log
files (#13136)
---
.../hudi/index/SparkMetadataTableRecordIndex.java | 9 +-
.../hudi/client/functional/TestHoodieIndex.java | 48 +++----
.../table/log/block/HoodieHFileDataBlock.java | 11 --
.../hudi/common/table/view/NoOpTableMetadata.java | 9 +-
.../apache/hudi/metadata/BaseTableMetadata.java | 20 ++-
.../metadata/FileSystemBackedTableMetadata.java | 9 +-
.../hudi/metadata/HoodieBackedTableMetadata.java | 149 ---------------------
.../metadata/HoodieMetadataLogRecordReader.java | 23 +---
.../apache/hudi/metadata/HoodieTableMetadata.java | 16 +--
.../org/apache/hudi/common/util/HFileUtils.java | 45 ++++---
.../hudi/io/hadoop/HoodieAvroHFileWriter.java | 5 +-
.../hudi/io/hadoop/HoodieHBaseAvroHFileReader.java | 83 ++----------
.../hadoop/TestHoodieHBaseHFileReaderWriter.java | 62 ---------
.../io/hadoop/TestHoodieHFileReaderWriterBase.java | 2 +-
.../org/apache/hudi/RecordLevelIndexSupport.scala | 6 +-
.../org/apache/hudi/SecondaryIndexSupport.scala | 6 +-
.../functional/TestHoodieBackedMetadata.java | 13 +-
.../hudi/functional/RecordLevelIndexTestBase.scala | 5 +-
.../hudi/functional/TestMetadataRecordIndex.scala | 5 +-
19 files changed, 97 insertions(+), 429 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
index 3c5d47eefe6..7811d02f2b5 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java
@@ -80,6 +80,10 @@ public class SparkMetadataTableRecordIndex extends
HoodieIndex<Object, Object> {
HoodieWriteConfig otherConfig =
HoodieWriteConfig.newBuilder().withProperties(config.getProps())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(FALLBACK_INDEX_TYPE).build()).build();
HoodieIndex fallbackIndex =
SparkHoodieIndexFactory.createIndex(otherConfig);
+
+ // Fallback index needs to be a global index like record index
+ ValidationUtils.checkArgument(fallbackIndex.isGlobal(), "Fallback index
needs to be a global index like record index");
+
return fallbackIndex.tagLocation(records, context, hoodieTable);
}
@@ -163,10 +167,9 @@ public class SparkMetadataTableRecordIndex extends
HoodieIndex<Object, Object> {
recordKeyIterator.forEachRemaining(keysToLookup::add);
// recordIndexInfo object only contains records that are present in
record_index.
- Map<String, List<HoodieRecordGlobalLocation>> recordIndexInfo =
hoodieTable.getMetadataTable().readRecordIndex(keysToLookup);
+ Map<String, HoodieRecordGlobalLocation> recordIndexInfo =
hoodieTable.getMetadataTable().readRecordIndex(keysToLookup);
return recordIndexInfo.entrySet().stream()
- .flatMap(e -> e.getValue().stream().map(loc -> new
Tuple2<>(e.getKey(), loc)))
- .iterator();
+ .map(e -> new Tuple2<>(e.getKey(), e.getValue())).iterator();
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
index d4cd1000f5d..72beb2f3302 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieIndex.java
@@ -194,11 +194,11 @@ public class TestHoodieIndex extends
TestHoodieMetadataBase {
new RawTripTestPayload(recordStr4).toHoodieRecord());
}
- private static List<HoodieRecord> getInsertsWithSameKeyInTwoPartitions()
throws IOException {
- String recordStr1 =
"{\"_row_key\":\"001\",\"time\":\"2016-01-31T00:00:01.000Z\",\"number\":1}";
- String recordStr2 =
"{\"_row_key\":\"002\",\"time\":\"2016-01-31T00:00:02.000Z\",\"number\":2}";
- String recordStr3 =
"{\"_row_key\":\"003\",\"time\":\"2016-01-31T00:00:03.000Z\",\"number\":3}";
- String recordStr4 =
"{\"_row_key\":\"003\",\"time\":\"2015-01-31T00:00:04.000Z\",\"number\":4}";
+ private static List<HoodieRecord> getInsertsBatch2() throws IOException {
+ String recordStr1 =
"{\"_row_key\":\"005\",\"time\":\"2016-01-31T00:00:01.000Z\",\"number\":5}";
+ String recordStr2 =
"{\"_row_key\":\"006\",\"time\":\"2016-01-31T00:00:02.000Z\",\"number\":6}";
+ String recordStr3 =
"{\"_row_key\":\"007\",\"time\":\"2016-01-31T00:00:03.000Z\",\"number\":7}";
+ String recordStr4 =
"{\"_row_key\":\"008\",\"time\":\"2017-01-31T00:00:04.000Z\",\"number\":8}";
return Arrays.asList(
new RawTripTestPayload(recordStr1).toHoodieRecord(),
new RawTripTestPayload(recordStr2).toHoodieRecord(),
@@ -206,34 +206,16 @@ public class TestHoodieIndex extends
TestHoodieMetadataBase {
new RawTripTestPayload(recordStr4).toHoodieRecord());
}
- @Test
- public void testRecordIndexForNonGlobalWrites() throws Exception {
- setUp(IndexType.RECORD_INDEX, true, true);
- final int totalRecords = 4;
- List<HoodieRecord> records = getInsertsWithSameKeyInTwoPartitions();
- JavaRDD<HoodieRecord> writtenRecords = jsc.parallelize(records, 1);
-
- // Insert totalRecords records
- String newCommitTime = writeClient.createNewInstantTime();
- writeClient.startCommitWithTime(newCommitTime);
- JavaRDD<WriteStatus> writeStatusRdd = writeClient.upsert(writtenRecords,
newCommitTime);
- List<WriteStatus> writeStatuses = writeStatusRdd.collect();
- assertNoWriteErrors(writeStatuses);
- String[] fileIdsFromWriteStatuses =
writeStatuses.stream().map(WriteStatus::getFileId)
- .sorted().toArray(String[]::new);
-
- // Now commit this & update location of records inserted and validate no
errors
- writeClient.commit(newCommitTime, writeStatusRdd);
- // Now tagLocation for these records, index should tag them correctly
- metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieTable hoodieTable = HoodieSparkTable.create(config, context,
metaClient);
- JavaRDD<HoodieRecord> javaRDD = tagLocation(index, writtenRecords,
hoodieTable);
- Map<String, String> recordKeyToPartitionPathMap = new HashMap();
- List<HoodieRecord> hoodieRecords = writtenRecords.collect();
- hoodieRecords.forEach(entry ->
recordKeyToPartitionPathMap.put(entry.getRecordKey(),
entry.getPartitionPath()));
- String[] taggedFileIds = javaRDD.map(record ->
record.getCurrentLocation().getFileId()).distinct().collect()
- .stream().sorted().toArray(String[]::new);
- assertArrayEquals(taggedFileIds, fileIdsFromWriteStatuses);
+ private static List<HoodieRecord> getUpdates() throws IOException {
+ String recordStr1 =
"{\"_row_key\":\"001\",\"time\":\"2016-01-31T00:00:01.000Z\",\"number\":5}";
+ String recordStr2 =
"{\"_row_key\":\"002\",\"time\":\"2016-01-31T00:00:02.000Z\",\"number\":6}";
+ String recordStr3 =
"{\"_row_key\":\"003\",\"time\":\"2016-01-31T00:00:03.000Z\",\"number\":7}";
+ String recordStr4 =
"{\"_row_key\":\"004\",\"time\":\"2017-01-31T00:00:04.000Z\",\"number\":8}";
+ return new ArrayList<>(Arrays.asList(
+ new RawTripTestPayload(recordStr1).toHoodieRecord(),
+ new RawTripTestPayload(recordStr2).toHoodieRecord(),
+ new RawTripTestPayload(recordStr3).toHoodieRecord(),
+ new RawTripTestPayload(recordStr4).toHoodieRecord()));
}
@ParameterizedTest
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index ea474142463..a71bf203246 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -18,7 +18,6 @@
package org.apache.hudi.common.table.log.block;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.engine.HoodieReaderContext;
@@ -38,7 +37,6 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.inline.InLineFSUtils;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -185,15 +183,6 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
}
}
- /**
- * Print the record in json format
- */
- private void printRecord(String msg, byte[] bs, Schema schema) throws
IOException {
- GenericRecord record = HoodieAvroUtils.bytesToAvro(bs, schema);
- byte[] json = HoodieAvroUtils.avroToJson(record, true);
- LOG.error(String.format("%s: %s", msg, new String(json)));
- }
-
private HoodieConfig getHFileReaderConfig(boolean useNativeHFileReader) {
HoodieConfig config = new HoodieConfig();
config.setValue(
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
index 1065adf8227..ae1e36ac39e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/NoOpTableMetadata.java
@@ -97,12 +97,12 @@ class NoOpTableMetadata implements HoodieTableMetadata {
}
@Override
- public Map<String, List<HoodieRecordGlobalLocation>>
readRecordIndex(List<String> recordKeys) {
+ public Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String>
recordKeys) {
throw new HoodieMetadataException("Unsupported operation:
readRecordIndex!");
}
@Override
- public Map<String, List<HoodieRecordGlobalLocation>>
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
+ public Map<String, HoodieRecordGlobalLocation>
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
return Collections.emptyMap();
}
@@ -111,11 +111,6 @@ class NoOpTableMetadata implements HoodieTableMetadata {
throw new HoodieMetadataException("Unsupported operation:
getRecordsByKeyPrefixes!");
}
- @Override
- public Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
getAllRecordsByKeys(List<String> keys, String partitionName) {
- return Collections.emptyMap();
- }
-
@Override
public Option<String> getSyncedInstantTime() {
throw new HoodieMetadataException("Unsupported operation:
getSyncedInstantTime!");
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 0ffbce2ff4f..671ff75ab5c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -28,8 +28,8 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
@@ -287,7 +287,7 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
* @param recordKeys The list of record keys to read
*/
@Override
- public Map<String, List<HoodieRecordGlobalLocation>>
readRecordIndex(List<String> recordKeys) {
+ public Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String>
recordKeys) {
// If record index is not initialized yet, we cannot return an empty
result here unlike the code for reading from other
// indexes. This is because results from this function are used for
upserts and returning an empty result here would lead
// to existing records being inserted again causing duplicates.
@@ -296,15 +296,13 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
"Record index is not initialized in MDT");
HoodieTimer timer = HoodieTimer.start();
- Map<String, List<HoodieRecord<HoodieMetadataPayload>>> result =
getAllRecordsByKeys(recordKeys,
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
- Map<String, List<HoodieRecordGlobalLocation>> recordKeyToLocation = new
HashMap<>(result.size());
- result.forEach((key, records) -> records.forEach(record -> {
+ Map<String, HoodieRecord<HoodieMetadataPayload>> result =
getRecordsByKeys(recordKeys,
MetadataPartitionType.RECORD_INDEX.getPartitionPath());
+ Map<String, HoodieRecordGlobalLocation> recordKeyToLocation = new
HashMap<>(result.size());
+ result.forEach((key, record) -> {
if (!record.getData().isDeleted()) {
- List<HoodieRecordGlobalLocation> locations =
recordKeyToLocation.getOrDefault(key, new ArrayList<>());
- locations.add(record.getData().getRecordGlobalLocation());
- recordKeyToLocation.put(key, locations);
+ recordKeyToLocation.put(key,
record.getData().getRecordGlobalLocation());
}
- }));
+ });
metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_RECORD_INDEX_TIME_STR,
timer.endTimer()));
metrics.ifPresent(m ->
m.setMetric(HoodieMetadataMetrics.LOOKUP_RECORD_INDEX_KEYS_COUNT_STR,
recordKeys.size()));
@@ -321,7 +319,7 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
* @param secondaryKeys The list of secondary keys to read
*/
@Override
- public Map<String, List<HoodieRecordGlobalLocation>>
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
+ public Map<String, HoodieRecordGlobalLocation>
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
"Record index is not initialized in MDT");
ValidationUtils.checkState(
@@ -430,7 +428,7 @@ public abstract class BaseTableMetadata extends
AbstractHoodieTableMetadata {
}
/**
- * Handle spurious deletes. Depending on config, throw an exception or log
warn msg.
+ * Handle spurious deletes. Depending on config, throw an exception or log a
warn msg.
*/
private void checkForSpuriousDeletes(HoodieMetadataPayload metadataPayload,
String partitionName) {
if (!metadataPayload.getDeletions().isEmpty()) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index bd0f4904d29..1677b4b4b88 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -309,17 +309,12 @@ public class FileSystemBackedTableMetadata extends
AbstractHoodieTableMetadata {
}
@Override
- public Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
getAllRecordsByKeys(List<String> keys, String partitionName) {
- throw new HoodieMetadataException("Unsupported operation:
getAllRecordsByKeys!");
- }
-
- @Override
- public Map<String, List<HoodieRecordGlobalLocation>>
readRecordIndex(List<String> recordKeys) {
+ public Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String>
recordKeys) {
throw new HoodieMetadataException("Unsupported operation:
readRecordIndex!");
}
@Override
- public Map<String, List<HoodieRecordGlobalLocation>>
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
+ public Map<String, HoodieRecordGlobalLocation>
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
throw new HoodieMetadataException("Unsupported operation:
readSecondaryIndex!");
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 87433319719..ea71cc83a45 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -29,7 +29,6 @@ import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
@@ -297,41 +296,6 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
return partitionedKeys;
}
- @Override
- public Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
getAllRecordsByKeys(List<String> keys, String partitionName) {
- if (keys.isEmpty()) {
- return Collections.emptyMap();
- }
-
- Map<String, List<HoodieRecord<HoodieMetadataPayload>>> result;
- // Load the file slices for the partition. Each file slice is a shard
which saves a portion of the keys.
- List<FileSlice> partitionFileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName,
- k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
getMetadataFileSystemView(), partitionName));
- final int numFileSlices = partitionFileSlices.size();
- checkState(numFileSlices > 0, "Number of file slices for partition " +
partitionName + " should be > 0");
-
- // Lookup keys from each file slice
- if (numFileSlices == 1) {
- // Optimization for a single slice for smaller metadata table partitions
- result = lookupAllKeysFromFileSlice(partitionName, keys,
partitionFileSlices.get(0));
- } else {
- // Parallel lookup for large sized partitions with many file slices
- // Partition the keys by the file slice which contains it
- ArrayList<ArrayList<String>> partitionedKeys =
partitionKeysByFileSlices(keys, numFileSlices);
- result = new HashMap<>(keys.size());
- getEngineContext().setJobStatus(this.getClass().getSimpleName(),
"Reading keys from metadata table partition " + partitionName);
- getEngineContext().map(partitionedKeys, keysList -> {
- if (keysList.isEmpty()) {
- return Collections.<String,
HoodieRecord<HoodieMetadataPayload>>emptyMap();
- }
- int shardIndex =
HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(keysList.get(0),
numFileSlices);
- return lookupAllKeysFromFileSlice(partitionName, keysList,
partitionFileSlices.get(shardIndex));
- }, partitionedKeys.size()).forEach(map -> result.putAll((Map<String,
List<HoodieRecord<HoodieMetadataPayload>>>) map));
- }
-
- return result;
- }
-
/**
* Lookup list of keys from a single file slice.
*
@@ -440,119 +404,6 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
return result;
}
- private Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
lookupAllKeysFromFileSlice(String partitionName, List<String> keys, FileSlice
fileSlice) {
- Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
getOrCreateReaders(partitionName, fileSlice);
- try {
- List<Long> timings = new ArrayList<>();
- HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
- HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
- if (baseFileReader == null && logRecordScanner == null) {
- return Collections.emptyMap();
- }
-
- // Sort it here once so that we don't need to sort individually for base
file and for each individual log files.
- List<String> sortedKeys = new ArrayList<>(keys);
- Collections.sort(sortedKeys);
- Map<String, List<HoodieRecord<HoodieMetadataPayload>>> logRecords =
readAllLogRecords(logRecordScanner, sortedKeys, timings);
- return readFromBaseAndMergeWithAllLogRecords(baseFileReader, sortedKeys,
true, logRecords, timings, partitionName);
- } catch (IOException ioe) {
- throw new HoodieIOException("Error merging records from metadata table
for " + keys.size() + " key : ", ioe);
- } finally {
- if (!reuse) {
- closeReader(readers);
- }
- }
- }
-
- private Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
readAllLogRecords(HoodieMetadataLogRecordReader logRecordReader,
-
List<String> sortedKeys,
-
List<Long> timings) {
- HoodieTimer timer = HoodieTimer.start();
-
- if (logRecordReader == null) {
- timings.add(timer.endTimer());
- return Collections.emptyMap();
- }
-
- try {
- return logRecordReader.getAllRecordsByKeys(sortedKeys);
- } finally {
- timings.add(timer.endTimer());
- }
- }
-
- private Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
readFromBaseAndMergeWithAllLogRecords(HoodieSeekingFileReader<?> reader,
-
List<String> sortedKeys,
-
boolean fullKeys,
-
Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
logRecords,
-
List<Long> timings,
-
String partitionName) throws IOException {
- HoodieTimer timer = HoodieTimer.start();
-
- if (reader == null) {
- // No base file at all
- timings.add(timer.endTimer());
- return logRecords;
- }
-
- HoodieTimer readTimer = HoodieTimer.start();
-
- Map<String, List<HoodieRecord<HoodieMetadataPayload>>> records =
- fetchBaseFileAllRecordsByKeys(reader, sortedKeys, fullKeys,
partitionName);
-
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
-
- // Iterate over all provided log-records, merging them into existing
records
-
- logRecords.entrySet().forEach(kv -> {
- records.merge(
- kv.getKey(),
- kv.getValue(),
- (oldRecordList, newRecordList) -> {
- List<HoodieRecord<HoodieMetadataPayload>> mergedRecordList = new
ArrayList<>();
- HoodieMetadataPayload mergedPayload = null;
- HoodieKey key = null;
- if (!oldRecordList.isEmpty() && !newRecordList.isEmpty()) {
- mergedPayload =
newRecordList.get(0).getData().preCombine(oldRecordList.get(0).getData());
- key = newRecordList.get(0).getKey();
- } else if (!oldRecordList.isEmpty()) {
- mergedPayload = oldRecordList.get(0).getData();
- key = oldRecordList.get(0).getKey();
- } else if (!newRecordList.isEmpty()) {
- mergedPayload = newRecordList.get(0).getData();
- key = newRecordList.get(0).getKey();
- }
-
- if (mergedPayload != null && !mergedPayload.isDeleted()) {
- mergedRecordList.add(new HoodieAvroRecord<>(key, mergedPayload));
- }
- return mergedRecordList;
- }
- );
- });
-
- timings.add(timer.endTimer());
- return records;
- }
-
- private Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
fetchBaseFileAllRecordsByKeys(HoodieSeekingFileReader reader,
-
List<String> sortedKeys,
-
boolean fullKeys,
-
String partitionName) throws IOException {
- ClosableIterator<HoodieRecord<?>> records = fullKeys
- ? reader.getRecordsByKeysIterator(sortedKeys)
- : reader.getRecordsByKeyPrefixIterator(sortedKeys);
-
- return toStream(records)
- .map(record -> {
- GenericRecord data = (GenericRecord) record.getData();
- return Pair.of(
- (String) (data).get(HoodieMetadataPayload.KEY_FIELD_NAME),
- composeRecord(data, partitionName));
- })
- .collect(Collectors.groupingBy(Pair::getKey,
Collectors.mapping(Pair::getValue, Collectors.toList())));
- }
-
private HoodieRecord<HoodieMetadataPayload> composeRecord(GenericRecord
avroRecord, String partitionName) {
if (metadataTableConfig.populateMetaFields()) {
return SpillableMapUtils.convertToHoodieRecordPayload(avroRecord,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
index 0277672237a..92a6841103b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
@@ -18,6 +18,8 @@
package org.apache.hudi.metadata;
+import org.apache.avro.Schema;
+
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
@@ -27,8 +29,6 @@ import
org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-import org.apache.avro.Schema;
-
import javax.annotation.concurrent.ThreadSafe;
import java.io.Closeable;
@@ -112,24 +112,7 @@ public class HoodieMetadataLogRecordReader implements
Closeable {
return sortedKeys.stream()
.map(key -> (HoodieRecord<HoodieMetadataPayload>)
allRecords.get(key))
.filter(Objects::nonNull)
- .collect(Collectors.toMap(HoodieRecord::getRecordKey, r -> r, (r1,
r2) -> r2));
- }
- }
-
- public Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
getAllRecordsByKeys(List<String> sortedKeys) {
- if (sortedKeys.isEmpty()) {
- return Collections.emptyMap();
- }
-
- // NOTE: Locking is necessary since we're accessing
[[HoodieMetadataLogRecordReader]]
- // materialized state, to make sure there's no concurrent access
- synchronized (this) {
- logRecordScanner.scanByFullKeys(sortedKeys);
- Map<String, HoodieRecord> allRecords = logRecordScanner.getRecords();
- return sortedKeys.stream()
- .map(key -> (HoodieRecord<HoodieMetadataPayload>)
allRecords.get(key))
- .filter(Objects::nonNull)
- .collect(Collectors.groupingBy(HoodieRecord::getRecordKey));
+ .collect(Collectors.toMap(HoodieRecord::getRecordKey, r -> r));
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
index 47db4b33736..ff8eae014e1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -189,6 +189,7 @@ public interface HoodieTableMetadata extends Serializable,
AutoCloseable {
/**
* Fetch all files for given partition paths.
+ *
* NOTE: Absolute partition paths are expected here
*/
Map<String, List<StoragePathInfo>>
getAllFilesInPartitions(Collection<String> partitionPaths)
@@ -255,15 +256,15 @@ public interface HoodieTableMetadata extends
Serializable, AutoCloseable {
/**
* Returns the location of record keys which are found in the record index.
- * Records that are not found are ignored and won't be part of map object
that is returned.
+ * Records that are not found are ignored and wont be part of map object
that is returned.
*/
- Map<String, List<HoodieRecordGlobalLocation>> readRecordIndex(List<String>
recordKeys);
+ Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String>
recordKeys);
/**
* Returns the location of records which the provided secondary keys maps to.
* Records that are not found are ignored and won't be part of map object
that is returned.
*/
- Map<String, List<HoodieRecordGlobalLocation>>
readSecondaryIndex(List<String> secondaryKeys, String partitionName);
+ Map<String, HoodieRecordGlobalLocation> readSecondaryIndex(List<String>
secondaryKeys, String partitionName);
/**
* Fetch records by key prefixes. Key prefix passed is expected to match the
same prefix as stored in Metadata table partitions. For eg, in case of col
stats partition,
@@ -277,15 +278,6 @@ public interface HoodieTableMetadata extends Serializable,
AutoCloseable {
String partitionName,
boolean shouldLoadInMemory);
- /**
- * Fetch records for given keys. A key could have multiple records
associated with it. This method returns all the records for given keys.
- *
- * @param keys list of key for which interested records are looked
up for.
- * @param partitionName partition name in metadata table where the records
are looked up for.
- * @return Map of key to {@link List} of {@link HoodieRecord}s with records
matching the passed in keys.
- */
- Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
getAllRecordsByKeys(List<String> keys, String partitionName);
-
/**
* Get the instant time to which the metadata is synced w.r.t data timeline.
*/
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
index 0d423251e6e..314a7d0dcfe 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.compress.CompressionCodec;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
@@ -51,7 +52,6 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -185,7 +185,7 @@ public class HFileUtils extends FileFormatUtils {
int keyWidth = useIntegerKey ? (int) Math.ceil(Math.log(records.size())) +
1 : -1;
// Serialize records into bytes
- Map<String, List<byte[]>> sortedRecordsMap = new TreeMap<>();
+ Map<String, byte[]> sortedRecordsMap = new TreeMap<>();
Iterator<HoodieRecord> itr = records.iterator();
int id = 0;
@@ -199,26 +199,26 @@ public class HFileUtils extends FileFormatUtils {
}
final byte[] recordBytes = serializeRecord(record, writerSchema,
keyFieldName);
- // If key exists in the map, append to its list. If not, create a new
list.
- // Get the existing list of recordBytes for the recordKey, or an empty
list if it doesn't exist
- List<byte[]> recordBytesList = sortedRecordsMap.getOrDefault(recordKey,
new ArrayList<>());
- recordBytesList.add(recordBytes);
- // Put the updated list back into the map
- sortedRecordsMap.put(recordKey, recordBytesList);
+ if (sortedRecordsMap.containsKey(recordKey)) {
+ LOG.error("Found duplicate record with recordKey: {} ", recordKey);
+ logRecordMetadata("Previous record", sortedRecordsMap.get(recordKey),
writerSchema);
+ logRecordMetadata("Current record", recordBytes, writerSchema);
+ throw new HoodieException(String.format("Writing multiple records with
same key %s not supported for Hfile format with Metadata table",
+ recordKey));
+ }
+ sortedRecordsMap.put(recordKey, recordBytes);
}
HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
.withOutputStream(ostream).withFileContext(context).create();
// Write the records
- sortedRecordsMap.forEach((recordKey, recordBytesList) -> {
- for (byte[] recordBytes : recordBytesList) {
- try {
- KeyValue kv = new KeyValue(recordKey.getBytes(), null, null,
recordBytes);
- writer.append(kv);
- } catch (IOException e) {
- throw new HoodieIOException("IOException serializing records", e);
- }
+ sortedRecordsMap.forEach((recordKey, recordBytes) -> {
+ try {
+ KeyValue kv = new KeyValue(recordKey.getBytes(), null, null,
recordBytes);
+ writer.append(kv);
+ } catch (IOException e) {
+ throw new HoodieIOException("IOException serializing records", e);
}
});
@@ -232,6 +232,19 @@ public class HFileUtils extends FileFormatUtils {
return baos.toByteArray();
}
+ /**
+ * Print the meta fields of the record of interest
+ */
+ private void logRecordMetadata(String msg, byte[] bs, Schema schema) throws
IOException {
+ GenericRecord record = HoodieAvroUtils.bytesToAvro(bs, schema);
+ if (schema.getField(HoodieRecord.RECORD_KEY_METADATA_FIELD) != null) {
+ LOG.error("{}: Hudi meta field values -> Record key: {}, Partition Path:
{}, FileName: {}, CommitTime: {}, CommitSeqNo: {}", msg,
+ record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD),
record.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD),
+ record.get(HoodieRecord.FILENAME_METADATA_FIELD),
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD),
+ record.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD));
+ }
+ }
+
@Override
public Pair<byte[], Object> serializeRecordsToLogBlock(
HoodieStorage storage,
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
index 6967f19246d..b81f57beb69 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroHFileWriter.java
@@ -29,7 +29,6 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.fs.HoodieWrapperFileSystem;
import org.apache.hudi.io.storage.HoodieAvroFileWriter;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
-import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
@@ -141,9 +140,7 @@ public class HoodieAvroHFileWriter
@Override
public void writeAvro(String recordKey, IndexedRecord record) throws
IOException {
- // do not allow duplicates for record index (primary index)
- // secondary index can have duplicates
- if (prevRecordKey.equals(recordKey) &&
file.getName().startsWith(MetadataPartitionType.RECORD_INDEX.getFileIdPrefix()))
{
+ if (prevRecordKey.equals(recordKey)) {
throw new HoodieDuplicateKeyException("Duplicate recordKey " + recordKey
+ " found while writing to HFile."
+ "Record payload: " + record);
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHBaseAvroHFileReader.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHBaseAvroHFileReader.java
index cabff429f13..dfcc36d2ab1 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHBaseAvroHFileReader.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieHBaseAvroHFileReader.java
@@ -370,11 +370,9 @@ public class HoodieHBaseAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
return new KeyPrefixIterator();
}
- private static Iterator<IndexedRecord>
getRecordByKeyIteratorInternal(HFileScanner scanner,
- String
key,
- Schema
writerSchema,
- Schema
readerSchema) throws IOException {
- KeyValue kv = new KeyValue(getUTF8Bytes(key), null, null, null);
+ private static Option<IndexedRecord> fetchRecordByKeyInternal(HFileScanner
scanner, String key, Schema writerSchema, Schema readerSchema) throws
IOException {
+ byte[] keyBytes = getUTF8Bytes(key);
+ KeyValue kv = new KeyValue(keyBytes, null, null, null);
// NOTE: HFile persists both keys/values as bytes, therefore
lexicographical sorted is
// essentially employed
//
@@ -384,59 +382,22 @@ public class HoodieHBaseAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
// b) 0, such that c[i] = cell and scanner is left in position i;
// c) and 1, such that c[i] < cell, and scanner is left in position i.
// In summary, with exact match, we are interested in return value of 0.
in all other cases, key is not found.
- // Also, do remember we are using reseekTo(), which means, the cursor will
not rewind after searching for a key.
- // Let's say the file contains key01, key02, .., key20.
+ // Also, do remeber we are using reseek(), which means, the cursor will
not rewind after searching for a key.
+ // Lets say the file contains key01, key02, .., key20.
// After searching for key09, if we search for key05, it may not return
the matching entry since just after reseeking to key09, the cursor is at key09
and
// further reseek calls may not look back in positions.
- int val = scanner.reseekTo(kv);
- if (val != 0) {
+
+ if (scanner.reseekTo(kv) != 0) {
// key is not found.
- return Collections.emptyIterator();
+ return Option.empty();
}
// key is found and the cursor is left where the key is found
- class KeyIterator implements Iterator<IndexedRecord> {
- private IndexedRecord next = null;
- private boolean eof = false;
-
- @Override
- public boolean hasNext() {
- if (next != null) {
- return true;
- } else if (eof) {
- return false;
- }
+ Cell c = scanner.getCell();
+ byte[] valueBytes = copyValueFromCell(c);
+ GenericRecord record = deserialize(keyBytes, valueBytes, writerSchema,
readerSchema);
- Cell c = Objects.requireNonNull(scanner.getCell());
- byte[] keyBytes = copyKeyFromCell(c);
- String currentKey = new String(keyBytes);
- // Check whether we're still reading records corresponding to the
key-prefix
- if (!currentKey.equals(key)) {
- return false;
- }
-
- // Extract the byte value before releasing the lock since we cannot
hold on to the returned cell afterward
- byte[] valueBytes = copyValueFromCell(c);
- try {
- next = deserialize(keyBytes, valueBytes, writerSchema, readerSchema);
- // In case scanner is not able to advance, it means we reached EOF
- eof = !scanner.next();
- } catch (IOException e) {
- throw new HoodieIOException("Failed to deserialize payload", e);
- }
-
- return true;
- }
-
- @Override
- public IndexedRecord next() {
- IndexedRecord next = this.next;
- this.next = null;
- return next;
- }
- }
-
- return new KeyIterator();
+ return Option.of(record);
}
private static GenericRecord getRecordFromCell(Cell cell, Schema
writerSchema, Schema readerSchema) throws IOException {
@@ -550,7 +511,6 @@ public class HoodieHBaseAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
private final Schema writerSchema;
private IndexedRecord next = null;
- private Iterator<IndexedRecord> currentRecordIterator = null;
RecordByKeyIterator(HFile.Reader reader, HFileScanner scanner,
List<String> sortedKeys, Schema writerSchema, Schema readerSchema) throws
IOException {
this.sortedKeyIterator = sortedKeys.iterator();
@@ -570,28 +530,13 @@ public class HoodieHBaseAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
return true;
}
- // Continue returning records for the current key, if any left
- while (currentRecordIterator != null &&
currentRecordIterator.hasNext()) {
- Option<IndexedRecord> value =
Option.of(currentRecordIterator.next());
+ while (sortedKeyIterator.hasNext()) {
+ Option<IndexedRecord> value = fetchRecordByKeyInternal(scanner,
sortedKeyIterator.next(), writerSchema, readerSchema);
if (value.isPresent()) {
next = value.get();
return true;
}
}
-
- // Move on to the next key and start iterating over its records
- while (sortedKeyIterator.hasNext()) {
- currentRecordIterator = getRecordByKeyIteratorInternal(scanner,
sortedKeyIterator.next(), writerSchema, readerSchema);
- if (currentRecordIterator.hasNext()) {
- Option<IndexedRecord> value =
Option.of(currentRecordIterator.next());
- if (value.isPresent()) {
- next = value.get();
- return true;
- }
- }
- }
-
- // No more keys or records
return false;
} catch (IOException e) {
throw new HoodieIOException("unable to read next record from hfile ",
e);
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
index e3743b5255a..f34034d0a35 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHBaseHFileReaderWriter.java
@@ -28,7 +28,6 @@ import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
@@ -42,20 +41,14 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
-import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
-import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
import static
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.apache.hudi.common.util.CollectionUtils.toStream;
import static org.apache.hudi.io.hfile.TestHFileReader.KEY_CREATOR;
@@ -147,59 +140,4 @@ public class TestHoodieHBaseHFileReaderWriter extends
TestHoodieHFileReaderWrite
VALUE_CREATOR,
uniqueKeys);
}
-
- /**
- * Test HFile reader with duplicates.
- * HFile can have duplicates in case of secondary index for instance.
- */
- @Test
- public void testHFileReaderWriterWithDuplicates() throws Exception {
- Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class,
"/exampleSchema.avsc");
- HoodieAvroHFileWriter writer = createWriter(avroSchema, false);
- List<String> keys = new ArrayList<>();
- Map<String, List<GenericRecord>> recordMap = new TreeMap<>();
- for (int i = 0; i < 50; i++) {
- // If i is a multiple of 10, select the previous key for duplication
- String key = i != 0 && i % 10 == 0 ? String.format("%s%04d", "key", i -
1) : String.format("%s%04d", "key", i);
-
- // Create a list of records for each key to handle duplicates
- if (!recordMap.containsKey(key)) {
- recordMap.put(key, new ArrayList<>());
- }
-
- // Create the record
- GenericRecord record = new GenericData.Record(avroSchema);
- record.put("_row_key", key);
- record.put("time", Integer.toString(RANDOM.nextInt()));
- record.put("number", i);
- writer.writeAvro(key, record);
-
- // Add to the record map and key list
- recordMap.get(key).add(record);
- keys.add(key);
- }
- writer.close();
-
- try (HoodieAvroHFileReaderImplBase hFileReader =
(HoodieAvroHFileReaderImplBase)
- createReader(HoodieTestUtils.getStorage(getFilePath()))) {
- List<IndexedRecord> records =
HoodieAvroHFileReaderImplBase.readAllRecords(hFileReader);
-
assertEquals(recordMap.values().stream().flatMap(List::stream).collect(Collectors.toList()),
records);
- }
-
- for (int i = 0; i < 2; i++) {
- int randomRowstoFetch = 5 + RANDOM.nextInt(10);
- Set<String> rowsToFetch = getRandomKeys(randomRowstoFetch, keys);
-
- List<String> rowsList = new ArrayList<>(rowsToFetch);
- Collections.sort(rowsList);
-
- List<GenericRecord> expectedRecords = rowsList.stream().flatMap(row ->
recordMap.get(row).stream()).collect(Collectors.toList());
-
- try (HoodieAvroHFileReaderImplBase hFileReader =
(HoodieAvroHFileReaderImplBase)
- createReader(HoodieTestUtils.getStorage(getFilePath()))) {
- List<GenericRecord> result =
HoodieAvroHFileReaderImplBase.readRecords(hFileReader, rowsList).stream().map(r
-> (GenericRecord) r).collect(Collectors.toList());
- assertEquals(expectedRecords, result);
- }
- }
- }
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
index caf09bb94e4..3ed3343e93c 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriterBase.java
@@ -476,7 +476,7 @@ public abstract class TestHoodieHFileReaderWriterBase
extends TestHoodieReaderWr
content, hfilePrefix, false,
HFileBootstrapIndex.HoodieKVComparator.class, 4);
}
- Set<String> getRandomKeys(int count, List<String> keys) {
+ private Set<String> getRandomKeys(int count, List<String> keys) {
Set<String> rowKeys = new HashSet<>();
int totalKeys = keys.size();
while (rowKeys.size() < count) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index 4014cf1a078..fd4484b5db5 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -76,10 +76,8 @@ class RecordLevelIndexSupport(spark: SparkSession,
val recordKeyLocationsMap =
metadataTable.readRecordIndex(JavaConverters.seqAsJavaListConverter(recordKeys).asJava)
val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty
val candidateFiles: mutable.Set[String] = mutable.Set.empty
- for (locations <-
JavaConverters.collectionAsScalaIterableConverter(recordKeyLocationsMap.values()).asScala)
{
- for (location <-
JavaConverters.collectionAsScalaIterableConverter(locations).asScala) {
- fileIdToPartitionMap.put(location.getFileId, location.getPartitionPath)
- }
+ for (location <-
JavaConverters.collectionAsScalaIterableConverter(recordKeyLocationsMap.values()).asScala)
{
+ fileIdToPartitionMap.put(location.getFileId, location.getPartitionPath)
}
for (file <- allFiles) {
val fileId = FSUtils.getFileIdFromFilePath(file)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
index 45b626f04bb..e0b0ebca7c8 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SecondaryIndexSupport.scala
@@ -80,11 +80,7 @@ class SecondaryIndexSupport(spark: SparkSession,
val recordKeyLocationsMap =
metadataTable.readSecondaryIndex(JavaConverters.seqAsJavaListConverter(secondaryKeys).asJava,
secondaryIndexName)
val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty
val candidateFiles: mutable.Set[String] = mutable.Set.empty
- for (locations <-
JavaConverters.collectionAsScalaIterableConverter(recordKeyLocationsMap.values()).asScala)
{
- for (location <-
JavaConverters.collectionAsScalaIterableConverter(locations).asScala) {
- fileIdToPartitionMap.put(location.getFileId, location.getPartitionPath)
- }
- }
+ recordKeyLocationsMap.values().forEach(location =>
fileIdToPartitionMap.put(location.getFileId, location.getPartitionPath))
for (file <- allFiles) {
val fileId = FSUtils.getFileIdFromFilePath(file)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index fd2b328bc2c..26d8d5519e3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -1169,7 +1169,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
HoodieTableMetadata metadataReader = HoodieTableMetadata.create(
context, storage, writeConfig.getMetadataConfig(),
writeConfig.getBasePath());
- Map<String, List<HoodieRecordGlobalLocation>> result = metadataReader
+ Map<String, HoodieRecordGlobalLocation> result = metadataReader
.readRecordIndex(records1.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
assertEquals(0, result.size(), "RI should not return entries that are
rolled back.");
result = metadataReader
@@ -2007,7 +2007,6 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// 1 partition and 2 commits. total entries should be 2.
assertEquals(result.size(), 2);
result.forEach(entry -> {
- // LOG.warn("Prefix search entries for record key col and first
partition : " + entry.getRecordKey().toString() + " :: " +
entry.getData().getColumnStatMetadata().get().toString());
HoodieMetadataColumnStats metadataColumnStats =
entry.getData().getColumnStatMetadata().get();
// for commit time column, min max should be the same since we disable
small files, every commit will create a new file
assertEquals(metadataColumnStats.getMinValue(),
metadataColumnStats.getMaxValue());
@@ -3512,9 +3511,9 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// RI should have created mappings for all the records inserted above
HoodieTableMetadata metadataReader = HoodieTableMetadata.create(
context, storage, writeConfig.getMetadataConfig(),
writeConfig.getBasePath());
- Map<String, List<HoodieRecordGlobalLocation>> result = metadataReader
+ Map<String, HoodieRecordGlobalLocation> result = metadataReader
.readRecordIndex(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
- assertEquals(allRecords.size(), (int)
result.values().stream().flatMap(List::stream).count(), "RI should have mapping
for all the records in firstCommit");
+ assertEquals(allRecords.size(), result.size(), "RI should have mapping
for all the records in firstCommit");
// Delete some records from each commit. This should also remove the RI
mapping.
recordsToDelete = firstBatchOfrecords.subList(0, 3);
@@ -3546,7 +3545,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// RI should not return mappings for deleted records
metadataReader = HoodieTableMetadata.create(context, storage,
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
- Map<String, List<HoodieRecordGlobalLocation>> result =
metadataReader.readRecordIndex(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
+ Map<String, HoodieRecordGlobalLocation> result =
metadataReader.readRecordIndex(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
assertEquals(allRecords.size() - keysToDelete.size(), result.size(), "RI
should not have mapping for deleted records");
result.keySet().forEach(mappingKey ->
assertFalse(keysToDelete.contains(mappingKey), "RI should not have mapping for
deleted records"));
@@ -3557,9 +3556,9 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
// New mappings should have been created for re-inserted records and
should map to the new commit time
metadataReader = HoodieTableMetadata.create(context, storage,
writeConfig.getMetadataConfig(), writeConfig.getBasePath());
result =
metadataReader.readRecordIndex(allRecords.stream().map(HoodieRecord::getRecordKey).collect(Collectors.toList()));
- assertEquals(allRecords.size(), (int)
result.values().stream().flatMap(List::stream).count(), "RI should have
mappings for re-inserted records");
+ assertEquals(allRecords.size(), result.size(), "RI should have mappings
for re-inserted records");
for (String reInsertedKey : keysToDelete) {
- assertTrue(result.get(reInsertedKey).stream().anyMatch(location ->
location.getInstantTime().equals(reinsertTime)), "RI mapping for re-inserted
keys should have new commit time");
+ assertEquals(reinsertTime, result.get(reInsertedKey).getInstantTime(),
"RI mapping for re-inserted keys should have new commit time");
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index 872bca957a7..cbd01eab0e2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -130,10 +130,7 @@ class RecordLevelIndexTestBase extends
HoodieStatsIndexTestBase {
val recordKey: String = row.getAs("_hoodie_record_key")
val partitionPath: String = row.getAs("_hoodie_partition_path")
val fileName: String = row.getAs("_hoodie_file_name")
- val recordLocations = recordIndexMap.get(recordKey)
- assertFalse(recordLocations.isEmpty)
- // assuming no duplicate keys for now
- val recordLocation = recordLocations.get(0)
+ val recordLocation = recordIndexMap.get(recordKey)
assertEquals(partitionPath, recordLocation.getPartitionPath)
assertTrue(fileName.startsWith(recordLocation.getFileId), fileName + "
should start with " + recordLocation.getFileId)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
index b71ba02b993..73ae93317c2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
@@ -186,10 +186,7 @@ class TestMetadataRecordIndex extends
HoodieSparkClientTestBase {
val recordKey: String = row.getAs("_hoodie_record_key")
val partitionPath: String = row.getAs("_hoodie_partition_path")
val fileName: String = row.getAs("_hoodie_file_name")
- val recordLocations = recordIndexMap.get(recordKey)
- assertFalse(recordLocations.isEmpty)
- // assuming no duplicate keys for now
- val recordLocation = recordLocations.get(0)
+ val recordLocation = recordIndexMap.get(recordKey)
assertEquals(partitionPath, recordLocation.getPartitionPath)
if (!writeConfig.inlineClusteringEnabled &&
!writeConfig.isAsyncClusteringEnabled) {
// The file id changes after clustering, so only assert it for usual
upsert and compaction operations