This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c12bac435a8 [HUDI-7692] Extract metadata record type to
MetadataPartitionType enum (#11597)
c12bac435a8 is described below
commit c12bac435a82c3c0dc69195f723b0c6335edc3a3
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Aug 6 19:36:44 2024 +0530
[HUDI-7692] Extract metadata record type to MetadataPartitionType enum
(#11597)
---
.../metadata/HoodieBackedTableMetadataWriter.java | 2 +-
.../java/org/apache/hudi/table/HoodieTable.java | 2 +-
.../hudi/table/upgrade/TestUpgradeDowngrade.java | 4 +-
.../hudi/metadata/HoodieBackedTableMetadata.java | 2 +-
.../hudi/metadata/HoodieMetadataPayload.java | 376 ++++-----------------
.../hudi/metadata/HoodieTableMetadataUtil.java | 118 +++++++
.../hudi/metadata/MetadataPartitionType.java | 252 +++++++++++++-
.../hudi/metadata/TestMetadataPartitionType.java | 16 +-
.../hudi/dml/TestHoodieTableValuedFunction.scala | 7 +-
9 files changed, 442 insertions(+), 337 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index ffbffc104b1..4d8fec87556 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -229,7 +229,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
Option<String>
inflightInstantTimestamp) throws IOException {
HoodieTimer timer = HoodieTimer.start();
- List<MetadataPartitionType> partitionsToInit = new
ArrayList<>(MetadataPartitionType.values().length);
+ List<MetadataPartitionType> partitionsToInit = new
ArrayList<>(MetadataPartitionType.getValidValues().length);
try {
boolean exists = metadataTableExists(dataMetaClient);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 9e51ef99ae4..a70f5e7edba 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -1002,7 +1002,7 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
* Deletes the metadata partition if the writer disables any metadata index.
*/
public void deleteMetadataIndexIfNecessary() {
- Stream.of(MetadataPartitionType.values()).forEach(partitionType -> {
+ Stream.of(MetadataPartitionType.getValidValues()).forEach(partitionType ->
{
if (shouldDeleteMetadataPartition(partitionType)) {
try {
LOG.info("Deleting metadata partition because it is disabled in
writer: " + partitionType.name());
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index 4e3d98f52bb..22c9d9eeb61 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -544,10 +544,10 @@ public class TestUpgradeDowngrade extends
HoodieClientTestBase {
.withMetadataIndexBloomFilter(true)
.withEnableRecordIndex(true).build())
.build();
- for (MetadataPartitionType partitionType : MetadataPartitionType.values())
{
+ for (MetadataPartitionType partitionType :
MetadataPartitionType.getValidValues()) {
metaClient.getTableConfig().setMetadataPartitionState(metaClient,
partitionType.getPartitionPath(), true);
}
- metaClient.getTableConfig().setMetadataPartitionsInflight(metaClient,
MetadataPartitionType.values());
+ metaClient.getTableConfig().setMetadataPartitionsInflight(metaClient,
MetadataPartitionType.getValidValues());
String metadataTableBasePath = Paths.get(basePath,
METADATA_TABLE_FOLDER_PATH).toString();
HoodieTableMetaClient metadataTableMetaClient =
HoodieTestUtils.init(metadataTableBasePath, MERGE_ON_READ);
HoodieMetadataTestTable.of(metadataTableMetaClient)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 0ef17e9e233..da21749a9d8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -750,7 +750,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
}
public Map<String, String> stats() {
- Set<String> allMetadataPartitionPaths =
Arrays.stream(MetadataPartitionType.values()).map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
+ Set<String> allMetadataPartitionPaths =
Arrays.stream(MetadataPartitionType.getValidValues()).map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet());
return metrics.map(m -> m.getStats(true, metadataMetaClient, this,
allMetadataPartitionPaths)).orElseGet(HashMap::new);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index bbdf6be5a65..e4a3420aeba 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -47,8 +47,6 @@ import org.apache.hudi.util.Lazy;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -56,7 +54,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -66,12 +63,16 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
-import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getBloomFilterIndexPartitionIdentifier;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getLocationFromRecordIndexInfo;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionStatsIndexKey;
/**
* MetadataTable records are persisted with the schema defined in
HoodieMetadata.avsc.
@@ -98,19 +99,6 @@ import static
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_L
* During compaction on the table, the deletions are merged with additions and
hence records are pruned.
*/
public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadataPayload> {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodieMetadataPayload.class);
- /**
- * Type of the record. This can be an enum in the schema but Avro1.8
- * has a bug - <a
href="https://issues.apache.org/jira/browse/AVRO-1810">...</a>
- */
- private static final int METADATA_TYPE_PARTITION_LIST = 1;
- private static final int METADATA_TYPE_FILE_LIST = 2;
- private static final int METADATA_TYPE_COLUMN_STATS = 3;
- private static final int METADATA_TYPE_BLOOM_FILTER = 4;
- private static final int METADATA_TYPE_RECORD_INDEX = 5;
- private static final int METADATA_TYPE_PARTITION_STATS = 6;
- private static final int METADATA_TYPE_SECONDARY_INDEX = 7;
-
/**
* HoodieMetadata schema field ids
*/
@@ -125,11 +113,11 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
/**
* HoodieMetadata bloom filter payload field ids
*/
- private static final String FIELD_IS_DELETED = "isDeleted";
- private static final String BLOOM_FILTER_FIELD_TYPE = "type";
- private static final String BLOOM_FILTER_FIELD_TIMESTAMP = "timestamp";
- private static final String BLOOM_FILTER_FIELD_BLOOM_FILTER = "bloomFilter";
- private static final String BLOOM_FILTER_FIELD_IS_DELETED = FIELD_IS_DELETED;
+ public static final String FIELD_IS_DELETED = "isDeleted";
+ public static final String BLOOM_FILTER_FIELD_TYPE = "type";
+ public static final String BLOOM_FILTER_FIELD_TIMESTAMP = "timestamp";
+ public static final String BLOOM_FILTER_FIELD_BLOOM_FILTER = "bloomFilter";
+ public static final String BLOOM_FILTER_FIELD_IS_DELETED = FIELD_IS_DELETED;
/**
* HoodieMetadata column stats payload field ids
@@ -184,15 +172,15 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
* <p>
* You can find more details in HUDI-3834.
*/
- private static final Lazy<HoodieMetadataColumnStats.Builder>
METADATA_COLUMN_STATS_BUILDER_STUB =
Lazy.lazily(HoodieMetadataColumnStats::newBuilder);
+ public static final Lazy<HoodieMetadataColumnStats.Builder>
METADATA_COLUMN_STATS_BUILDER_STUB =
Lazy.lazily(HoodieMetadataColumnStats::newBuilder);
private static final HoodieMetadataFileInfo DELETE_FILE_METADATA = new
HoodieMetadataFileInfo(0L, true);
- private String key = null;
- private int type = 0;
- private Map<String, HoodieMetadataFileInfo> filesystemMetadata = null;
- private HoodieMetadataBloomFilter bloomFilterMetadata = null;
- private HoodieMetadataColumnStats columnStatMetadata = null;
- private HoodieRecordIndexInfo recordIndexMetadata;
- private HoodieSecondaryIndexInfo secondaryIndexMetadata;
+ protected String key = null;
+ protected int type = 0;
+ protected Map<String, HoodieMetadataFileInfo> filesystemMetadata = null;
+ protected HoodieMetadataBloomFilter bloomFilterMetadata = null;
+ protected HoodieMetadataColumnStats columnStatMetadata = null;
+ protected HoodieRecordIndexInfo recordIndexMetadata;
+ protected HoodieSecondaryIndexInfo secondaryIndexMetadata;
private boolean isDeletedRecord = false;
public HoodieMetadataPayload(@Nullable GenericRecord record, Comparable<?>
orderingVal) {
@@ -210,97 +198,30 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
// in any (read-)projected schema
key = record.get(KEY_FIELD_NAME).toString();
type = (int) record.get(SCHEMA_FIELD_NAME_TYPE);
-
- if (type == METADATA_TYPE_FILE_LIST || type ==
METADATA_TYPE_PARTITION_LIST) {
- Map<String, HoodieMetadataFileInfo> metadata =
getNestedFieldValue(record, SCHEMA_FIELD_NAME_METADATA);
- if (metadata != null) {
- filesystemMetadata = metadata;
- filesystemMetadata.keySet().forEach(k -> {
- GenericRecord v = filesystemMetadata.get(k);
- filesystemMetadata.put(k, new HoodieMetadataFileInfo((Long)
v.get("size"), (Boolean) v.get("isDeleted")));
- });
- }
- } else if (type == METADATA_TYPE_BLOOM_FILTER) {
- GenericRecord bloomFilterRecord = getNestedFieldValue(record,
SCHEMA_FIELD_ID_BLOOM_FILTER);
- // NOTE: Only legitimate reason for {@code BloomFilterMetadata} to not
be present is when
- // it's not been read from the storage (ie it's not been a part
of projected schema).
- // Otherwise, it has to be present or the record would be
considered invalid
- if (bloomFilterRecord == null) {
-
checkArgument(record.getSchema().getField(SCHEMA_FIELD_ID_BLOOM_FILTER) == null,
- String.format("Valid %s record expected for type: %s",
SCHEMA_FIELD_ID_BLOOM_FILTER, METADATA_TYPE_BLOOM_FILTER));
- } else {
- bloomFilterMetadata = new HoodieMetadataBloomFilter(
- (String) bloomFilterRecord.get(BLOOM_FILTER_FIELD_TYPE),
- (String) bloomFilterRecord.get(BLOOM_FILTER_FIELD_TIMESTAMP),
- (ByteBuffer)
bloomFilterRecord.get(BLOOM_FILTER_FIELD_BLOOM_FILTER),
- (Boolean) bloomFilterRecord.get(BLOOM_FILTER_FIELD_IS_DELETED)
- );
- }
- } else if (type == METADATA_TYPE_COLUMN_STATS || type ==
METADATA_TYPE_PARTITION_STATS) {
- GenericRecord columnStatsRecord = getNestedFieldValue(record,
SCHEMA_FIELD_ID_COLUMN_STATS);
- // NOTE: Only legitimate reason for {@code ColumnStatsMetadata} to not
be present is when
- // it's not been read from the storage (ie it's not been a part
of projected schema).
- // Otherwise, it has to be present or the record would be
considered invalid
- if (columnStatsRecord == null) {
-
checkArgument(record.getSchema().getField(SCHEMA_FIELD_ID_COLUMN_STATS) == null,
- String.format("Valid %s record expected for type: %s",
SCHEMA_FIELD_ID_COLUMN_STATS, METADATA_TYPE_COLUMN_STATS));
- } else {
- columnStatMetadata =
HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
- .setFileName((String)
columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME))
- .setColumnName((String)
columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME))
- // AVRO-2377 1.9.2 Modified the type of
org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet.
- // This causes Kryo to fail when deserializing a GenericRecord,
See HUDI-5484.
- // We should avoid using GenericRecord and convert GenericRecord
into a serializable type.
-
.setMinValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))))
-
.setMaxValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))))
- .setValueCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
- .setNullCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
- .setTotalSize((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
- .setTotalUncompressedSize((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE))
- .setIsDeleted((Boolean)
columnStatsRecord.get(COLUMN_STATS_FIELD_IS_DELETED))
- .build();
- }
- } else if (type == METADATA_TYPE_RECORD_INDEX) {
- GenericRecord recordIndexRecord = getNestedFieldValue(record,
SCHEMA_FIELD_ID_RECORD_INDEX);
- Object recordIndexPosition =
recordIndexRecord.get(RECORD_INDEX_FIELD_POSITION);
- recordIndexMetadata = new
HoodieRecordIndexInfo(recordIndexRecord.get(RECORD_INDEX_FIELD_PARTITION).toString(),
-
Long.parseLong(recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID_HIGH_BITS).toString()),
-
Long.parseLong(recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID_LOW_BITS).toString()),
-
Integer.parseInt(recordIndexRecord.get(RECORD_INDEX_FIELD_FILE_INDEX).toString()),
- recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID).toString(),
-
Long.parseLong(recordIndexRecord.get(RECORD_INDEX_FIELD_INSTANT_TIME).toString()),
-
Integer.parseInt(recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID_ENCODING).toString()),
- recordIndexPosition != null ?
Long.parseLong(recordIndexPosition.toString()) : null);
- } else if (type == METADATA_TYPE_SECONDARY_INDEX) {
- GenericRecord secondaryIndexRecord = getNestedFieldValue(record,
SCHEMA_FIELD_ID_SECONDARY_INDEX);
- checkState(secondaryIndexRecord != null, "Valid SecondaryIndexMetadata
record expected for type: " + METADATA_TYPE_SECONDARY_INDEX);
- secondaryIndexMetadata = new HoodieSecondaryIndexInfo(
-
secondaryIndexRecord.get(SECONDARY_INDEX_FIELD_RECORD_KEY).toString(),
- (Boolean)
secondaryIndexRecord.get(SECONDARY_INDEX_FIELD_IS_DELETED));
- }
+ MetadataPartitionType.get(type).constructMetadataPayload(this, record);
} else {
this.isDeletedRecord = true;
}
}
- private HoodieMetadataPayload(String key, int type, Map<String,
HoodieMetadataFileInfo> filesystemMetadata) {
+ protected HoodieMetadataPayload(String key, int type, Map<String,
HoodieMetadataFileInfo> filesystemMetadata) {
this(key, type, filesystemMetadata, null, null, null, null);
}
- private HoodieMetadataPayload(String key, HoodieMetadataBloomFilter
metadataBloomFilter) {
- this(key, METADATA_TYPE_BLOOM_FILTER, null, metadataBloomFilter, null,
null, null);
+ protected HoodieMetadataPayload(String key, HoodieMetadataBloomFilter
metadataBloomFilter) {
+ this(key, MetadataPartitionType.BLOOM_FILTERS.getRecordType(), null,
metadataBloomFilter, null, null, null);
}
- private HoodieMetadataPayload(String key, HoodieMetadataColumnStats
columnStats) {
- this(key, METADATA_TYPE_COLUMN_STATS, null, null, columnStats, null, null);
+ protected HoodieMetadataPayload(String key, HoodieMetadataColumnStats
columnStats) {
+ this(key, MetadataPartitionType.COLUMN_STATS.getRecordType(), null, null,
columnStats, null, null);
}
private HoodieMetadataPayload(String key, HoodieRecordIndexInfo
recordIndexMetadata) {
- this(key, METADATA_TYPE_RECORD_INDEX, null, null, null,
recordIndexMetadata, null);
+ this(key, MetadataPartitionType.RECORD_INDEX.getRecordType(), null, null,
null, recordIndexMetadata, null);
}
private HoodieMetadataPayload(String key, HoodieSecondaryIndexInfo
secondaryIndexMetadata) {
- this(key, METADATA_TYPE_SECONDARY_INDEX, null, null, null, null,
secondaryIndexMetadata);
+ this(key, MetadataPartitionType.SECONDARY_INDEX.getRecordType(), null,
null, null, null, secondaryIndexMetadata);
}
protected HoodieMetadataPayload(String key, int type,
@@ -334,11 +255,10 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
*/
public static HoodieRecord<HoodieMetadataPayload>
createPartitionListRecord(List<String> partitions, boolean isDeleted) {
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
- partitions.forEach(partition ->
fileInfo.put(HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(partition),
new HoodieMetadataFileInfo(0L, isDeleted)));
+ partitions.forEach(partition ->
fileInfo.put(getPartitionIdentifierForFilesPartition(partition), new
HoodieMetadataFileInfo(0L, isDeleted)));
- HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.FILES.getPartitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
- fileInfo);
+ HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST,
MetadataPartitionType.ALL_PARTITIONS.getPartitionPath());
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(),
MetadataPartitionType.ALL_PARTITIONS.getRecordType(), fileInfo);
return new HoodieAvroRecord<>(key, payload);
}
@@ -352,7 +272,7 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
public static HoodieRecord<HoodieMetadataPayload>
createPartitionFilesRecord(String partition,
Map<String, Long> filesAdded,
List<String> filesDeleted) {
- String partitionIdentifier =
HoodieTableMetadataUtil.getPartitionIdentifierForFilesPartition(partition);
+ String partitionIdentifier =
getPartitionIdentifierForFilesPartition(partition);
int size = filesAdded.size() + filesDeleted.size();
Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>(size, 1);
filesAdded.forEach((fileName, fileSize) -> fileInfo.put(fileName, new
HoodieMetadataFileInfo(fileSize, false)));
@@ -360,7 +280,7 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
filesDeleted.forEach(fileName -> fileInfo.put(fileName,
DELETE_FILE_METADATA));
HoodieKey key = new HoodieKey(partitionIdentifier,
MetadataPartitionType.FILES.getPartitionPath());
- HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(key.getRecordKey(),
MetadataPartitionType.FILES.getRecordType(), fileInfo);
return new HoodieAvroRecord<>(key, payload);
}
@@ -409,54 +329,14 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
checkArgument(previousRecord.key.equals(key),
"Cannot combine " + previousRecord.key + " with " + key + " as the
keys differ");
- switch (type) {
- case METADATA_TYPE_PARTITION_LIST:
- case METADATA_TYPE_FILE_LIST:
- Map<String, HoodieMetadataFileInfo> combinedFileInfo =
combineFileSystemMetadata(previousRecord);
- return new HoodieMetadataPayload(key, type, combinedFileInfo);
- case METADATA_TYPE_BLOOM_FILTER:
- HoodieMetadataBloomFilter combineBloomFilterMetadata =
combineBloomFilterMetadata(previousRecord);
- return new HoodieMetadataPayload(key, combineBloomFilterMetadata);
- case METADATA_TYPE_COLUMN_STATS:
- return new HoodieMetadataPayload(key,
combineColumnStatsMetadata(previousRecord));
- case METADATA_TYPE_RECORD_INDEX:
- // There is always a single mapping and the latest mapping is
maintained.
- // Mappings in record index can change in two scenarios:
- // 1. A key deleted from dataset and then added again (new filedID)
- // 2. A key moved to a different file due to clustering
-
- // No need to merge with previous record index, always pick the latest
payload.
- return this;
- case METADATA_TYPE_SECONDARY_INDEX:
- // Secondary Index combine()/merge() always returns the current
(*this*)
- // record and discards the prevRecord. Based on the 'isDeleted' marker
in the payload,
- // the merger running on top takes the right action (discard current
or retain current record).
- return this;
- default:
- throw new HoodieMetadataException("Unknown type of
HoodieMetadataPayload: " + type);
- }
+ return
MetadataPartitionType.get(type).combineMetadataPayloads(previousRecord, this);
}
private static String getBloomFilterRecordKey(String partitionName, String
fileName) {
- return new
PartitionIndexID(HoodieTableMetadataUtil.getBloomFilterIndexPartitionIdentifier(partitionName)).asBase64EncodedString()
+ return new
PartitionIndexID(getBloomFilterIndexPartitionIdentifier(partitionName)).asBase64EncodedString()
.concat(new FileIndexID(fileName).asBase64EncodedString());
}
- private HoodieMetadataBloomFilter
combineBloomFilterMetadata(HoodieMetadataPayload previousRecord) {
- // Bloom filters are always additive. No need to merge with previous bloom
filter
- return this.bloomFilterMetadata;
- }
-
- private HoodieMetadataColumnStats
combineColumnStatsMetadata(HoodieMetadataPayload previousRecord) {
- checkArgument(previousRecord.getColumnStatMetadata().isPresent());
- checkArgument(getColumnStatMetadata().isPresent());
-
- HoodieMetadataColumnStats previousColStatsRecord =
previousRecord.getColumnStatMetadata().get();
- HoodieMetadataColumnStats newColumnStatsRecord =
getColumnStatMetadata().get();
-
- return mergeColumnStatsRecords(previousColStatsRecord,
newColumnStatsRecord);
- }
-
public static Option<HoodieRecord<HoodieMetadataPayload>>
combineSecondaryIndexRecord(
HoodieRecord<HoodieMetadataPayload> oldRecord,
HoodieRecord<HoodieMetadataPayload> newRecord) {
@@ -555,66 +435,6 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
return filesystemMetadata.entrySet().stream().filter(e ->
e.getValue().getIsDeleted() == isDeleted);
}
- private Map<String, HoodieMetadataFileInfo>
combineFileSystemMetadata(HoodieMetadataPayload previousRecord) {
- Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>();
-
- // First, add all files listed in the previous record
- if (previousRecord.filesystemMetadata != null) {
- combinedFileInfo.putAll(previousRecord.filesystemMetadata);
- }
-
- // Second, merge in the files listed in the new record
- if (filesystemMetadata != null) {
- validatePayload(type, filesystemMetadata);
-
- filesystemMetadata.forEach((key, fileInfo) -> {
- combinedFileInfo.merge(key, fileInfo,
- // Combine previous record w/ the new one, new records taking
precedence over
- // the old one
- //
- // NOTE: That if previous listing contains the file that is being
deleted by the tombstone
- // record (`IsDeleted` = true) in the new one, we simply
delete the file from the resulting
- // listing as well as drop the tombstone itself.
- // However, if file is not present in the previous record we
have to persist tombstone
- // record in the listing to make sure we carry forward
information that this file
- // was deleted. This special case could occur since the
merging flow is 2-stage:
- // - First we merge records from all of the delta
log-files
- // - Then we merge records from base-files with the delta
ones (coming as a result
- // of the previous step)
- (oldFileInfo, newFileInfo) -> {
- // NOTE: We can’t assume that MT update records will be ordered
the same way as actual
- // FS operations (since they are not atomic), therefore MT
record merging should be a
- // _commutative_ & _associative_ operation (ie one that
would work even in case records
- // will get re-ordered), which is
- // - Possible for file-sizes (since file-sizes will
ever grow, we can simply
- // take max of the old and new records)
- // - Not possible for is-deleted flags*
- //
- // *However, we’re assuming that the case of concurrent
write and deletion of the same
- // file is _impossible_ -- it would only be possible with
concurrent upsert and
- // rollback operation (affecting the same log-file), which
is implausible, b/c either
- // of the following have to be true:
- // - We’re appending to failed log-file (then the other
writer is trying to
- // rollback it concurrently, before it’s own write)
- // - Rollback (of completed instant) is running
concurrently with append (meaning
- // that restore is running concurrently with a write,
which is also nut supported
- // currently)
- if (newFileInfo.getIsDeleted()) {
- if (oldFileInfo.getIsDeleted()) {
- LOG.warn("A file is repeatedly deleted in the files
partition of the metadata table: " + key);
- return newFileInfo;
- }
- return null;
- }
- return new HoodieMetadataFileInfo(
- Math.max(newFileInfo.getSize(), oldFileInfo.getSize()),
false);
- });
- });
- }
-
- return combinedFileInfo;
- }
-
/**
* Get bloom filter index key.
*
@@ -649,8 +469,7 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
* @return Column stats index key
*/
public static String getColumnStatsIndexKey(String partitionName,
HoodieColumnRangeMetadata<Comparable> columnRangeMetadata) {
-
- final PartitionIndexID partitionIndexID = new
PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionName));
+ final PartitionIndexID partitionIndexID = new
PartitionIndexID(getColumnStatsIndexPartitionIdentifier(partitionName));
final FileIndexID fileIndexID = new FileIndexID(new
StoragePath(columnRangeMetadata.getFilePath()).getName());
final ColumnIndexID columnIndexID = new
ColumnIndexID(columnRangeMetadata.getColumnName());
return getColumnStatsIndexKey(partitionIndexID, fileIndexID,
columnIndexID);
@@ -703,54 +522,6 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
});
}
- public static String getPartitionStatsIndexKey(String partitionPath, String
columnName) {
- final PartitionIndexID partitionIndexID = new
PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionPath));
- final ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
- return
columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString());
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private static HoodieMetadataColumnStats
mergeColumnStatsRecords(HoodieMetadataColumnStats prevColumnStats,
-
HoodieMetadataColumnStats newColumnStats) {
- checkArgument(Objects.equals(prevColumnStats.getColumnName(),
newColumnStats.getColumnName()));
-
- // We're handling 2 cases in here
- // - New record is a tombstone: in this case it simply overwrites
previous state
- // - Previous record is a tombstone: in that case new proper record would
also
- // be simply overwriting previous state
- if (newColumnStats.getIsDeleted() || prevColumnStats.getIsDeleted()) {
- return newColumnStats;
- }
-
- Comparable minValue =
- (Comparable) Stream.of(
- (Comparable)
unwrapAvroValueWrapper(prevColumnStats.getMinValue()),
- (Comparable)
unwrapAvroValueWrapper(newColumnStats.getMinValue()))
- .filter(Objects::nonNull)
- .min(Comparator.naturalOrder())
- .orElse(null);
-
- Comparable maxValue =
- (Comparable) Stream.of(
- (Comparable)
unwrapAvroValueWrapper(prevColumnStats.getMaxValue()),
- (Comparable)
unwrapAvroValueWrapper(newColumnStats.getMaxValue()))
- .filter(Objects::nonNull)
- .max(Comparator.naturalOrder())
- .orElse(null);
-
- return
HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
- .setFileName(newColumnStats.getFileName())
- .setColumnName(newColumnStats.getColumnName())
- .setMinValue(wrapValueIntoAvro(minValue))
- .setMaxValue(wrapValueIntoAvro(maxValue))
- .setValueCount(prevColumnStats.getValueCount() +
newColumnStats.getValueCount())
- .setNullCount(prevColumnStats.getNullCount() +
newColumnStats.getNullCount())
- .setTotalSize(prevColumnStats.getTotalSize() +
newColumnStats.getTotalSize())
- .setTotalUncompressedSize(prevColumnStats.getTotalUncompressedSize() +
newColumnStats.getTotalUncompressedSize())
- .setIsDeleted(newColumnStats.getIsDeleted())
- .build();
- }
-
/**
* Create and return a {@code HoodieMetadataPayload} to insert or update an
entry for the record index.
* <p>
@@ -796,7 +567,7 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits(),
fileIndex,
- "",
+ EMPTY_STRING,
instantTimeMillis,
0,
null));
@@ -821,9 +592,9 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
* <p>
* Each entry maps the secondary key of a single record in HUDI to its
record (or primary) key
*
- * @param recordKey Primary key of the record
- * @param secondaryKey Secondary key of the record
- * @param isDeleted true if this record is deleted
+ * @param recordKey Primary key of the record
+ * @param secondaryKey Secondary key of the record
+ * @param isDeleted true if this record is deleted
*/
public static HoodieRecord<HoodieMetadataPayload>
createSecondaryIndex(String recordKey, String secondaryKey, String
partitionPath, Boolean isDeleted) {
@@ -854,7 +625,7 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
* If this is a record-level index entry, returns the file to which this is
mapped.
*/
public HoodieRecordGlobalLocation getRecordGlobalLocation() {
- return
HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(recordIndexMetadata);
+ return getLocationFromRecordIndexInfo(recordIndexMetadata);
}
public boolean isDeleted() {
@@ -890,54 +661,29 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
sb.append(KEY_FIELD_NAME + "=").append(key).append(", ");
sb.append(SCHEMA_FIELD_NAME_TYPE + "=").append(type).append(", ");
- switch (type) {
- case METADATA_TYPE_PARTITION_LIST:
- case METADATA_TYPE_FILE_LIST:
- sb.append("Files: {");
-
sb.append("creations=").append(Arrays.toString(getFilenames().toArray())).append(",
");
-
sb.append("deletions=").append(Arrays.toString(getDeletions().toArray())).append(",
");
- sb.append("}");
- break;
- case METADATA_TYPE_BLOOM_FILTER:
- checkState(getBloomFilterMetadata().isPresent());
- sb.append("BloomFilter: {");
- sb.append("bloom size:
").append(getBloomFilterMetadata().get().getBloomFilter().array().length).append(",
");
- sb.append("timestamp:
").append(getBloomFilterMetadata().get().getTimestamp()).append(", ");
- sb.append("deleted:
").append(getBloomFilterMetadata().get().getIsDeleted());
- sb.append("}");
- break;
- case METADATA_TYPE_COLUMN_STATS:
- checkState(getColumnStatMetadata().isPresent());
- sb.append("ColStats: {");
- sb.append(getColumnStatMetadata().get());
- sb.append("}");
- break;
- case METADATA_TYPE_RECORD_INDEX:
- sb.append("RecordIndex: {");
- sb.append("location=").append(getRecordGlobalLocation());
- sb.append("}");
- break;
- default:
- break;
+ if (type == MetadataPartitionType.FILES.getRecordType() || type ==
MetadataPartitionType.ALL_PARTITIONS.getRecordType()) {
+ sb.append("Files: {");
+
sb.append("creations=").append(Arrays.toString(getFilenames().toArray())).append(",
");
+
sb.append("deletions=").append(Arrays.toString(getDeletions().toArray())).append(",
");
+ sb.append("}");
+ } else if (type == MetadataPartitionType.BLOOM_FILTERS.getRecordType()) {
+ checkState(getBloomFilterMetadata().isPresent());
+ sb.append("BloomFilter: {");
+ sb.append("bloom size:
").append(getBloomFilterMetadata().get().getBloomFilter().array().length).append(",
");
+ sb.append("timestamp:
").append(getBloomFilterMetadata().get().getTimestamp()).append(", ");
+ sb.append("deleted:
").append(getBloomFilterMetadata().get().getIsDeleted());
+ sb.append("}");
+ } else if (type == MetadataPartitionType.COLUMN_STATS.getRecordType()) {
+ checkState(getColumnStatMetadata().isPresent());
+ sb.append("ColStats: {");
+ sb.append(getColumnStatMetadata().get());
+ sb.append("}");
+ } else if (type == MetadataPartitionType.RECORD_INDEX.getRecordType()) {
+ sb.append("RecordIndex: {");
+ sb.append("location=").append(getRecordGlobalLocation());
+ sb.append("}");
}
sb.append('}');
return sb.toString();
}
-
- private static void validatePayload(int type, Map<String,
HoodieMetadataFileInfo> filesystemMetadata) {
- if (type == METADATA_TYPE_FILE_LIST) {
- filesystemMetadata.forEach((fileName, fileInfo) -> {
- checkState(fileInfo.getIsDeleted() || fileInfo.getSize() > 0,
"Existing files should have size > 0");
- });
- }
- }
-
- private static <T> T getNestedFieldValue(GenericRecord record, String
fieldName) {
- // NOTE: This routine is more lightweight than {@code
HoodieAvroUtils.getNestedFieldVal}
- if (record.getSchema().getField(fieldName) == null) {
- return null;
- }
-
- return unsafeCast(record.get(fieldName));
- }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 817fc2f535f..cbfb42d104a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -26,6 +26,7 @@ import org.apache.hudi.avro.model.DoubleWrapper;
import org.apache.hudi.avro.model.FloatWrapper;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
+import org.apache.hudi.avro.model.HoodieMetadataFileInfo;
import org.apache.hudi.avro.model.HoodieRecordIndexInfo;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -79,6 +80,8 @@ import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Tuple3;
+import org.apache.hudi.common.util.hash.ColumnIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
@@ -118,6 +121,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.function.BiFunction;
@@ -131,6 +135,7 @@ import static
org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
import static
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
import static org.apache.hudi.avro.HoodieAvroUtils.getSchemaForFields;
import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
+import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
import static
org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
import static
org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION;
@@ -140,6 +145,7 @@ import static
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_O
import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_MISSING_FILEINDEX_FALLBACK;
import static
org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME;
@@ -2141,6 +2147,118 @@ public class HoodieTableMetadataUtil {
return getFileStatsRangeMetadata(writeStat.getPartitionPath(),
writeStat.getPath(), datasetMetaClient, columnsToIndex, false);
}
+ public static String getPartitionStatsIndexKey(String partitionPath, String
columnName) {
+ final PartitionIndexID partitionIndexID = new
PartitionIndexID(getColumnStatsIndexPartitionIdentifier(partitionPath));
+ final ColumnIndexID columnIndexID = new ColumnIndexID(columnName);
+ return
columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString());
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public static HoodieMetadataColumnStats
mergeColumnStatsRecords(HoodieMetadataColumnStats prevColumnStats,
+
HoodieMetadataColumnStats newColumnStats) {
+ checkArgument(Objects.equals(prevColumnStats.getColumnName(),
newColumnStats.getColumnName()));
+
+ // We're handling 2 cases in here
+ // - New record is a tombstone: in this case it simply overwrites
previous state
+ // - Previous record is a tombstone: in that case new proper record would
also
+ // be simply overwriting previous state
+ if (newColumnStats.getIsDeleted() || prevColumnStats.getIsDeleted()) {
+ return newColumnStats;
+ }
+
+ Comparable minValue =
+ (Comparable) Stream.of(
+ (Comparable)
unwrapAvroValueWrapper(prevColumnStats.getMinValue()),
+ (Comparable)
unwrapAvroValueWrapper(newColumnStats.getMinValue()))
+ .filter(Objects::nonNull)
+ .min(Comparator.naturalOrder())
+ .orElse(null);
+
+ Comparable maxValue =
+ (Comparable) Stream.of(
+ (Comparable)
unwrapAvroValueWrapper(prevColumnStats.getMaxValue()),
+ (Comparable)
unwrapAvroValueWrapper(newColumnStats.getMaxValue()))
+ .filter(Objects::nonNull)
+ .max(Comparator.naturalOrder())
+ .orElse(null);
+
+ return
HoodieMetadataColumnStats.newBuilder(HoodieMetadataPayload.METADATA_COLUMN_STATS_BUILDER_STUB.get())
+ .setFileName(newColumnStats.getFileName())
+ .setColumnName(newColumnStats.getColumnName())
+ .setMinValue(wrapValueIntoAvro(minValue))
+ .setMaxValue(wrapValueIntoAvro(maxValue))
+ .setValueCount(prevColumnStats.getValueCount() +
newColumnStats.getValueCount())
+ .setNullCount(prevColumnStats.getNullCount() +
newColumnStats.getNullCount())
+ .setTotalSize(prevColumnStats.getTotalSize() +
newColumnStats.getTotalSize())
+ .setTotalUncompressedSize(prevColumnStats.getTotalUncompressedSize() +
newColumnStats.getTotalUncompressedSize())
+ .setIsDeleted(newColumnStats.getIsDeleted())
+ .build();
+ }
+
+ public static Map<String, HoodieMetadataFileInfo>
combineFileSystemMetadata(HoodieMetadataPayload older, HoodieMetadataPayload
newer) {
+ Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>();
+ // First, add all files listed in the previous record
+ if (older.filesystemMetadata != null) {
+ combinedFileInfo.putAll(older.filesystemMetadata);
+ }
+
+ // Second, merge in the files listed in the new record
+ if (newer.filesystemMetadata != null) {
+ validatePayload(newer.type, newer.filesystemMetadata);
+
+ newer.filesystemMetadata.forEach((key, fileInfo) -> {
+ combinedFileInfo.merge(key, fileInfo,
+ // Combine previous record w/ the new one, new records taking
precedence over
+ // the old one
+ //
+ // NOTE: That if previous listing contains the file that is being
deleted by the tombstone
+ // record (`IsDeleted` = true) in the new one, we simply
delete the file from the resulting
+ // listing as well as drop the tombstone itself.
+ // However, if file is not present in the previous record we
have to persist tombstone
+ // record in the listing to make sure we carry forward
information that this file
+ // was deleted. This special case could occur since the
merging flow is 2-stage:
+ // - First we merge records from all of the delta
log-files
+ // - Then we merge records from base-files with the delta
ones (coming as a result
+ // of the previous step)
+ (oldFileInfo, newFileInfo) -> {
+ // NOTE: We can’t assume that MT update records will be ordered
the same way as actual
+ // FS operations (since they are not atomic), therefore MT
record merging should be a
+ // _commutative_ & _associative_ operation (ie one that
would work even in case records
+ // will get re-ordered), which is
+ // - Possible for file-sizes (since file-sizes will
ever grow, we can simply
+ // take max of the old and new records)
+ // - Not possible for is-deleted flags*
+ //
+ // *However, we’re assuming that the case of concurrent
write and deletion of the same
+ // file is _impossible_ -- it would only be possible with
concurrent upsert and
+ // rollback operation (affecting the same log-file), which
is implausible, b/c either
+ // of the following have to be true:
+ // - We’re appending to failed log-file (then the other
writer is trying to
+ // rollback it concurrently, before it’s own write)
+ // - Rollback (of completed instant) is running
concurrently with append (meaning
+ // that restore is running concurrently with a write,
which is also nut supported
+ // currently)
+ if (newFileInfo.getIsDeleted()) {
+ if (oldFileInfo.getIsDeleted()) {
+ LOG.warn("A file is repeatedly deleted in the files
partition of the metadata table: {}", key);
+ return newFileInfo;
+ }
+ return null;
+ }
+ return new HoodieMetadataFileInfo(
+ Math.max(newFileInfo.getSize(), oldFileInfo.getSize()),
false);
+ });
+ });
+ }
+ return combinedFileInfo;
+ }
+
+ private static void validatePayload(int type, Map<String,
HoodieMetadataFileInfo> filesystemMetadata) {
+ if (type == MetadataPartitionType.FILES.getRecordType()) {
+ filesystemMetadata.forEach((fileName, fileInfo) ->
checkState(fileInfo.getIsDeleted() || fileInfo.getSize() > 0, "Existing files
should have size > 0"));
+ }
+ }
+
/**
* A class which represents a directory and the files and directories inside
it.
* <p>
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
index d5854d4ec6f..a6223150bcc 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -18,15 +18,27 @@
package org.apache.hudi.metadata;
+import org.apache.hudi.avro.model.HoodieMetadataBloomFilter;
+import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
+import org.apache.hudi.avro.model.HoodieMetadataFileInfo;
+import org.apache.hudi.avro.model.HoodieRecordIndexInfo;
+import org.apache.hudi.avro.model.HoodieSecondaryIndexInfo;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.avro.generic.GenericRecord;
+
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
+import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS;
import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER;
@@ -37,36 +49,135 @@ import static
org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.apache.hudi.common.util.StringUtils.nonEmpty;
+import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.BLOOM_FILTER_FIELD_BLOOM_FILTER;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.BLOOM_FILTER_FIELD_IS_DELETED;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.BLOOM_FILTER_FIELD_TIMESTAMP;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.BLOOM_FILTER_FIELD_TYPE;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_COLUMN_NAME;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_IS_DELETED;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_TOTAL_SIZE;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.METADATA_COLUMN_STATS_BUILDER_STUB;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_ENCODING;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_HIGH_BITS;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_LOW_BITS;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_FIELD_FILE_INDEX;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_FIELD_INSTANT_TIME;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_FIELD_PARTITION;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_FIELD_POSITION;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.SCHEMA_FIELD_ID_BLOOM_FILTER;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.SCHEMA_FIELD_ID_RECORD_INDEX;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.SCHEMA_FIELD_ID_SECONDARY_INDEX;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.SCHEMA_FIELD_NAME_METADATA;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.SECONDARY_INDEX_FIELD_IS_DELETED;
+import static
org.apache.hudi.metadata.HoodieMetadataPayload.SECONDARY_INDEX_FIELD_RECORD_KEY;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.combineFileSystemMetadata;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.mergeColumnStatsRecords;
/**
* Partition types for metadata table.
*/
public enum MetadataPartitionType {
- FILES(HoodieTableMetadataUtil.PARTITION_NAME_FILES, "files-") {
+ FILES(HoodieTableMetadataUtil.PARTITION_NAME_FILES, "files-", 2) {
@Override
public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
return getBooleanWithAltKeys(writeConfig, ENABLE);
}
+
+ @Override
+ public void constructMetadataPayload(HoodieMetadataPayload payload,
GenericRecord record) {
+ constructFilesMetadataPayload(payload, record);
+ }
+
+ @Override
+ public HoodieMetadataPayload combineMetadataPayloads(HoodieMetadataPayload
older, HoodieMetadataPayload newer) {
+ return new HoodieMetadataPayload(newer.key, newer.type,
combineFileSystemMetadata(older, newer));
+ }
},
- COLUMN_STATS(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS,
"col-stats-") {
+ COLUMN_STATS(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS,
"col-stats-", 3) {
@Override
public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
return getBooleanWithAltKeys(writeConfig,
ENABLE_METADATA_INDEX_COLUMN_STATS);
}
+
+ @Override
+ public void constructMetadataPayload(HoodieMetadataPayload payload,
GenericRecord record) {
+ constructColumnStatsMetadataPayload(payload, record);
+ }
+
+ @Override
+ public HoodieMetadataPayload combineMetadataPayloads(HoodieMetadataPayload
older, HoodieMetadataPayload newer) {
+ checkArgument(older.getColumnStatMetadata().isPresent());
+ checkArgument(newer.getColumnStatMetadata().isPresent());
+
+ HoodieMetadataColumnStats previousColStatsRecord =
older.getColumnStatMetadata().get();
+ HoodieMetadataColumnStats newColumnStatsRecord =
newer.getColumnStatMetadata().get();
+
+ return new HoodieMetadataPayload(newer.key,
mergeColumnStatsRecords(previousColStatsRecord, newColumnStatsRecord));
+ }
},
- BLOOM_FILTERS(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS,
"bloom-filters-") {
+ BLOOM_FILTERS(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS,
"bloom-filters-", 4) {
@Override
public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
return getBooleanWithAltKeys(writeConfig,
ENABLE_METADATA_INDEX_BLOOM_FILTER);
}
+
+ @Override
+ public void constructMetadataPayload(HoodieMetadataPayload payload,
GenericRecord record) {
+ GenericRecord bloomFilterRecord = getNestedFieldValue(record,
SCHEMA_FIELD_ID_BLOOM_FILTER);
+ // NOTE: Only legitimate reason for {@code BloomFilterMetadata} to not
be present is when
+ // it's not been read from the storage (ie it's not been a part of
projected schema).
+ // Otherwise, it has to be present or the record would be
considered invalid
+ if (bloomFilterRecord == null) {
+
checkArgument(record.getSchema().getField(SCHEMA_FIELD_ID_BLOOM_FILTER) == null,
+ String.format("Valid %s record expected for type: %s",
SCHEMA_FIELD_ID_BLOOM_FILTER,
MetadataPartitionType.BLOOM_FILTERS.getRecordType()));
+ } else {
+ payload.bloomFilterMetadata = new HoodieMetadataBloomFilter(
+ (String) bloomFilterRecord.get(BLOOM_FILTER_FIELD_TYPE),
+ (String) bloomFilterRecord.get(BLOOM_FILTER_FIELD_TIMESTAMP),
+ (ByteBuffer)
bloomFilterRecord.get(BLOOM_FILTER_FIELD_BLOOM_FILTER),
+ (Boolean) bloomFilterRecord.get(BLOOM_FILTER_FIELD_IS_DELETED)
+ );
+ }
+ }
+
+ @Override
+ public HoodieMetadataPayload combineMetadataPayloads(HoodieMetadataPayload
older, HoodieMetadataPayload newer) {
+ // Bloom filters are always additive. No need to merge with previous
bloom filter
+ return new HoodieMetadataPayload(newer.key, newer.bloomFilterMetadata);
+ }
},
- RECORD_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX,
"record-index-") {
+ RECORD_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX,
"record-index-", 5) {
@Override
public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
return getBooleanWithAltKeys(writeConfig, RECORD_INDEX_ENABLE_PROP);
}
+
+ @Override
+ public void constructMetadataPayload(HoodieMetadataPayload payload,
GenericRecord record) {
+ GenericRecord recordIndexRecord = getNestedFieldValue(record,
SCHEMA_FIELD_ID_RECORD_INDEX);
+ Object recordIndexPosition =
recordIndexRecord.get(RECORD_INDEX_FIELD_POSITION);
+ payload.recordIndexMetadata = new
HoodieRecordIndexInfo(recordIndexRecord.get(RECORD_INDEX_FIELD_PARTITION).toString(),
+
Long.parseLong(recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID_HIGH_BITS).toString()),
+
Long.parseLong(recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID_LOW_BITS).toString()),
+
Integer.parseInt(recordIndexRecord.get(RECORD_INDEX_FIELD_FILE_INDEX).toString()),
+ recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID).toString(),
+
Long.parseLong(recordIndexRecord.get(RECORD_INDEX_FIELD_INSTANT_TIME).toString()),
+
Integer.parseInt(recordIndexRecord.get(RECORD_INDEX_FIELD_FILEID_ENCODING).toString()),
+ recordIndexPosition != null ?
Long.parseLong(recordIndexPosition.toString()) : null);
+ }
},
-
FUNCTIONAL_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX,
"func-index-") {
+
FUNCTIONAL_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_FUNCTIONAL_INDEX_PREFIX,
"func-index-", -1) {
@Override
public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
// Functional index is created via sql and not via write path.
@@ -83,7 +194,7 @@ public enum MetadataPartitionType {
return false;
}
},
-
SECONDARY_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX,
"secondary-index-") {
+
SECONDARY_INDEX(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX,
"secondary-index-", 7) {
@Override
public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
// Secondary index is created via sql and not via write path.
@@ -99,18 +210,96 @@ public enum MetadataPartitionType {
}
return false;
}
+
+ @Override
+ public void constructMetadataPayload(HoodieMetadataPayload payload,
GenericRecord record) {
+ GenericRecord secondaryIndexRecord = getNestedFieldValue(record,
SCHEMA_FIELD_ID_SECONDARY_INDEX);
+ checkState(secondaryIndexRecord != null, "Valid SecondaryIndexMetadata
record expected for type: " +
MetadataPartitionType.SECONDARY_INDEX.getRecordType());
+ payload.secondaryIndexMetadata = new HoodieSecondaryIndexInfo(
+
secondaryIndexRecord.get(SECONDARY_INDEX_FIELD_RECORD_KEY).toString(),
+ (Boolean)
secondaryIndexRecord.get(SECONDARY_INDEX_FIELD_IS_DELETED));
+ }
},
- PARTITION_STATS(HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS,
"partition-stats-") {
+ PARTITION_STATS(HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS,
"partition-stats-", 6) {
@Override
public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
return getBooleanWithAltKeys(writeConfig,
ENABLE_METADATA_INDEX_PARTITION_STATS) &&
nonEmpty(getStringWithAltKeys(writeConfig, COLUMN_STATS_INDEX_FOR_COLUMNS,
EMPTY_STRING));
}
+
+ @Override
+ public void constructMetadataPayload(HoodieMetadataPayload payload,
GenericRecord record) {
+ constructColumnStatsMetadataPayload(payload, record);
+ }
+ },
+ // ALL_PARTITIONS is just another record type in FILES partition
+ ALL_PARTITIONS(HoodieTableMetadataUtil.PARTITION_NAME_FILES, "files-", 1) {
+ @Override
+ public boolean isMetadataPartitionEnabled(TypedProperties writeConfig) {
+ return getBooleanWithAltKeys(writeConfig, ENABLE);
+ }
+
+ @Override
+ public void constructMetadataPayload(HoodieMetadataPayload payload,
GenericRecord record) {
+ MetadataPartitionType.constructFilesMetadataPayload(payload, record);
+ }
+
+ @Override
+ public HoodieMetadataPayload combineMetadataPayloads(HoodieMetadataPayload
older, HoodieMetadataPayload newer) {
+ return new HoodieMetadataPayload(newer.key, newer.type,
combineFileSystemMetadata(older, newer));
+ }
};
+ private static <T> T getNestedFieldValue(GenericRecord record, String
fieldName) {
+ // NOTE: This routine is more lightweight than {@code
HoodieAvroUtils.getNestedFieldVal}
+ if (record.getSchema().getField(fieldName) == null) {
+ return null;
+ }
+
+ return unsafeCast(record.get(fieldName));
+ }
+
+ private static void constructFilesMetadataPayload(HoodieMetadataPayload
payload, GenericRecord record) {
+ Map<String, HoodieMetadataFileInfo> metadata = getNestedFieldValue(record,
SCHEMA_FIELD_NAME_METADATA);
+ if (metadata != null) {
+ payload.filesystemMetadata = metadata;
+ payload.filesystemMetadata.keySet().forEach(k -> {
+ GenericRecord v = payload.filesystemMetadata.get(k);
+ payload.filesystemMetadata.put(k, new HoodieMetadataFileInfo((Long)
v.get("size"), (Boolean) v.get("isDeleted")));
+ });
+ }
+ }
+
+ private static void
constructColumnStatsMetadataPayload(HoodieMetadataPayload payload,
GenericRecord record) {
+ GenericRecord columnStatsRecord = getNestedFieldValue(record,
SCHEMA_FIELD_ID_COLUMN_STATS);
+ // NOTE: Only legitimate reason for {@code ColumnStatsMetadata} to not be
present is when
+ // it's not been read from the storage (ie it's not been a part of
projected schema).
+ // Otherwise, it has to be present or the record would be considered
invalid
+ if (columnStatsRecord == null) {
+ checkArgument(record.getSchema().getField(SCHEMA_FIELD_ID_COLUMN_STATS)
== null,
+ String.format("Valid %s record expected for type: %s",
SCHEMA_FIELD_ID_COLUMN_STATS,
MetadataPartitionType.COLUMN_STATS.getRecordType()));
+ } else {
+ payload.columnStatMetadata =
HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
+ .setFileName((String)
columnStatsRecord.get(COLUMN_STATS_FIELD_FILE_NAME))
+ .setColumnName((String)
columnStatsRecord.get(COLUMN_STATS_FIELD_COLUMN_NAME))
+ // AVRO-2377 1.9.2 Modified the type of
org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet.
+ // This causes Kryo to fail when deserializing a GenericRecord, See
HUDI-5484.
+ // We should avoid using GenericRecord and convert GenericRecord
into a serializable type.
+
.setMinValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))))
+
.setMaxValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))))
+ .setValueCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
+ .setNullCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
+ .setTotalSize((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
+ .setTotalUncompressedSize((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE))
+ .setIsDeleted((Boolean)
columnStatsRecord.get(COLUMN_STATS_FIELD_IS_DELETED))
+ .build();
+ }
+ }
+
// Partition path in metadata table.
private final String partitionPath;
// FileId prefix used for all file groups in this partition.
private final String fileIdPrefix;
+ private final int recordType;
/**
* Check if the metadata partition is enabled based on the metadata config.
@@ -124,9 +313,10 @@ public enum MetadataPartitionType {
return metaClient.getTableConfig().isMetadataPartitionAvailable(this);
}
- MetadataPartitionType(final String partitionPath, final String fileIdPrefix)
{
+ MetadataPartitionType(final String partitionPath, final String fileIdPrefix,
final int recordType) {
this.partitionPath = partitionPath;
this.fileIdPrefix = fileIdPrefix;
+ this.recordType = recordType;
}
public String getPartitionPath() {
@@ -137,6 +327,37 @@ public enum MetadataPartitionType {
return fileIdPrefix;
}
+ public int getRecordType() {
+ return recordType;
+ }
+
+ /**
+ * Construct metadata payload from the given record.
+ */
+ public void constructMetadataPayload(HoodieMetadataPayload payload,
GenericRecord record) {
+ throw new UnsupportedOperationException("MetadataPayload construction not
supported for partition type: " + this);
+ }
+
+ /**
+ * Merge old and new metadata payloads. By default, it returns the newer
payload.
+ * Implementations can override this method to merge the payloads depending
on the partition type.
+ */
+ public HoodieMetadataPayload combineMetadataPayloads(HoodieMetadataPayload
older, HoodieMetadataPayload newer) {
+ return newer;
+ }
+
+ /**
+ * Get the metadata partition type for the given record type.
+ */
+ public static MetadataPartitionType get(int type) {
+ for (MetadataPartitionType partitionType : values()) {
+ if (partitionType.getRecordType() == type) {
+ return partitionType;
+ }
+ }
+ throw new IllegalArgumentException("No MetadataPartitionType for record
type: " + type);
+ }
+
/**
* Returns the list of metadata table partitions which require WriteStatus
to track written records.
* <p>
@@ -150,22 +371,31 @@ public enum MetadataPartitionType {
* Returns the set of all metadata partition names.
*/
public static Set<String> getAllPartitionPaths() {
- return Arrays.stream(values())
+ return Arrays.stream(getValidValues())
.map(MetadataPartitionType::getPartitionPath)
.collect(Collectors.toSet());
}
+ /**
+ * Returns the set of all valid metadata partition types. Prefer using this
method over {@link #values()}.
+ */
+ public static MetadataPartitionType[] getValidValues() {
+ // ALL_PARTITIONS is just another record type in FILES partition
+ return EnumSet.complementOf(EnumSet.of(
+ ALL_PARTITIONS)).toArray(new MetadataPartitionType[0]);
+ }
+
/**
* Returns the list of metadata partition types enabled based on the
metadata config and table config.
*/
public static List<MetadataPartitionType>
getEnabledPartitions(TypedProperties writeConfig, HoodieTableMetaClient
metaClient) {
- return Arrays.stream(values())
+ return Arrays.stream(getValidValues())
.filter(partitionType ->
partitionType.isMetadataPartitionEnabled(writeConfig) ||
partitionType.isMetadataPartitionAvailable(metaClient))
.collect(Collectors.toList());
}
public static MetadataPartitionType fromPartitionPath(String partitionPath) {
- for (MetadataPartitionType partitionType : values()) {
+ for (MetadataPartitionType partitionType : getValidValues()) {
if (partitionPath.equals(partitionType.getPartitionPath()) ||
partitionPath.startsWith(partitionType.getPartitionPath())) {
return partitionType;
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestMetadataPartitionType.java
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestMetadataPartitionType.java
index 31a98d875b2..c214b23458c 100644
---
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestMetadataPartitionType.java
+++
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestMetadataPartitionType.java
@@ -57,6 +57,7 @@ public class TestMetadataPartitionType {
int expectedEnabledPartitions;
switch (partitionType) {
case FILES:
+ case ALL_PARTITIONS:
case FUNCTIONAL_INDEX:
case SECONDARY_INDEX:
metadataConfigBuilder.enable(true);
@@ -86,11 +87,11 @@ public class TestMetadataPartitionType {
// Verify partition type is enabled due to config
if (partitionType == MetadataPartitionType.FUNCTIONAL_INDEX ||
partitionType == MetadataPartitionType.SECONDARY_INDEX) {
- assertEquals(1, enabledPartitions.size(), "FUNCTIONAL_INDEX should be
enabled by SQL, only FILES is enabled in this case.");
+ assertEquals(1, enabledPartitions.size(), "FUNCTIONAL_INDEX or
SECONDARY_INDEX should be enabled by SQL, only FILES is enabled in this case.");
assertTrue(enabledPartitions.contains(MetadataPartitionType.FILES));
} else {
assertEquals(expectedEnabledPartitions, enabledPartitions.size());
- assertTrue(enabledPartitions.contains(partitionType));
+ assertTrue(enabledPartitions.contains(partitionType) ||
MetadataPartitionType.ALL_PARTITIONS.equals(partitionType));
}
}
@@ -171,4 +172,15 @@ public class TestMetadataPartitionType {
assertEquals(MetadataPartitionType.PARTITION_STATS,
MetadataPartitionType.fromPartitionPath("partition_stats"));
assertThrows(IllegalArgumentException.class, () ->
MetadataPartitionType.fromPartitionPath("unknown"));
}
+
+ @Test
+ public void testGetMetadataPartitionRecordType() {
+ assertEquals(1, MetadataPartitionType.ALL_PARTITIONS.getRecordType());
+ assertEquals(2, MetadataPartitionType.FILES.getRecordType());
+ assertEquals(3, MetadataPartitionType.COLUMN_STATS.getRecordType());
+ assertEquals(4, MetadataPartitionType.BLOOM_FILTERS.getRecordType());
+ assertEquals(5, MetadataPartitionType.RECORD_INDEX.getRecordType());
+ assertEquals(6, MetadataPartitionType.PARTITION_STATS.getRecordType());
+ assertEquals(7, MetadataPartitionType.SECONDARY_INDEX.getRecordType());
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
index 9090b1c8a5d..2409d783de3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestHoodieTableValuedFunction.scala
@@ -19,8 +19,7 @@ package org.apache.spark.sql.hudi.dml
import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION
import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.metadata.HoodieMetadataPayload.getPartitionStatsIndexKey
-
+import
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionStatsIndexKey
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
@@ -243,8 +242,8 @@ class TestHoodieTableValuedFunction extends
HoodieSparkSqlTestBase {
val result1DF = spark.sql(s"select * from
hudi_filesystem_view('$identifier', 'price*')")
result1DF.show(false)
val result1Array = result1DF.select(
- col("Partition_Path")
- ).orderBy("Partition_Path").take(10)
+ col("Partition_Path")
+ ).orderBy("Partition_Path").take(10)
checkAnswer(result1Array)(
Seq("price=10.0"),
Seq("price=10.0"),