This is an automated email from the ASF dual-hosted git repository.
jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f95bb6ab240 add tests for spill map and schema cache. And minor
refactor (#12430)
f95bb6ab240 is described below
commit f95bb6ab2402afd2962cc23803dc0e8c542184c9
Author: Jon Vexler <[email protected]>
AuthorDate: Mon Dec 23 14:27:04 2024 -0500
add tests for spill map and schema cache. And minor refactor (#12430)
---
.../hadoop/TestHoodieFileGroupReaderOnHive.java | 5 +
.../hudi/BaseSparkInternalRowReaderContext.java | 2 +-
.../hudi/common/engine/HoodieReaderContext.java | 45 +++++++-
.../read/HoodieBaseFileGroupRecordBuffer.java | 32 +++---
.../common/table/read/HoodieFileGroupReader.java | 3 +
.../read/HoodieFileGroupReaderSchemaHandler.java | 25 +---
.../table/read/TestHoodieFileGroupReaderBase.java | 126 ++++++++++++++++-----
.../hudi/common/util/TestAvroSchemaCache.java | 66 +++++++++++
.../hudi/hadoop/HiveHoodieReaderContext.java | 2 +-
.../read/TestHoodieFileGroupReaderOnSpark.scala | 12 +-
10 files changed, 241 insertions(+), 77 deletions(-)
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
index 152cb2f79b1..93848f3496a 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
@@ -249,6 +249,11 @@ public class TestHoodieFileGroupReaderOnHive extends
TestHoodieFileGroupReaderBa
}
}
+ @Override
+ public void assertRecordsEqual(Schema schema, ArrayWritable expected,
ArrayWritable actual) {
+ ArrayWritableTestUtil.assertArrayWritableEqual(schema, expected, actual,
false);
+ }
+
private static boolean isLogFileRec(HoodieReaderContext<ArrayWritable>
readerContext, Schema schema, ArrayWritable record) {
return !readerContext.getValue(record, schema,
HoodieRecord.FILENAME_METADATA_FIELD).toString().contains(".parquet");
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index a195f161e1d..8260393f492 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -96,7 +96,7 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
HoodieRecord.HoodieRecordType.SPARK);
}
- Schema schema =
getSchemaHandler().decodeAvroSchema(metadataMap.get(INTERNAL_META_SCHEMA_ID));
+ Schema schema = getSchemaFromMetadata(metadataMap);
InternalRow row = rowOption.get();
return new HoodieSparkRecord(row,
HoodieInternalRowUtils.getCachedSchema(schema));
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 9a49acaf671..db4e23d1bbc 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.read.HoodieFileGroupReaderSchemaHandler;
+import org.apache.hudi.common.util.AvroSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.storage.HoodieStorage;
@@ -33,6 +34,9 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -52,7 +56,7 @@ import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIEL
* @param <T> The type of engine-specific record representation, e.g.,{@code
InternalRow} in Spark
* and {@code RowData} in Flink.
*/
-public abstract class HoodieReaderContext<T> {
+public abstract class HoodieReaderContext<T> implements Closeable {
private HoodieFileGroupReaderSchemaHandler<T> schemaHandler = null;
private String tablePath = null;
@@ -63,6 +67,9 @@ public abstract class HoodieReaderContext<T> {
private Boolean needsBootstrapMerge = null;
private Boolean shouldMergeUseRecordPosition = null;
+ // for encoding and decoding schemas to the spillable map
+ private final AvroSchemaCache avroSchemaCache =
AvroSchemaCache.getInstance();
+
// Getter and Setter for schemaHandler
public HoodieFileGroupReaderSchemaHandler<T> getSchemaHandler() {
return schemaHandler;
@@ -295,10 +302,20 @@ public abstract class HoodieReaderContext<T> {
public Map<String, Object> generateMetadataForRecord(T record, Schema
schema) {
Map<String, Object> meta = new HashMap<>();
meta.put(INTERNAL_META_RECORD_KEY, getRecordKey(record, schema));
- meta.put(INTERNAL_META_SCHEMA_ID,
this.schemaHandler.encodeAvroSchema(schema));
+ meta.put(INTERNAL_META_SCHEMA_ID, encodeAvroSchema(schema));
return meta;
}
+ /**
+ * Gets the schema encoded in the metadata map
+ *
+ * @param infoMap The record metadata
+ * @return the avro schema if it is encoded in the metadata map, else null
+ */
+ public Schema getSchemaFromMetadata(Map<String, Object> infoMap) {
+ return decodeAvroSchema(infoMap.get(INTERNAL_META_SCHEMA_ID));
+ }
+
/**
* Updates the schema and reset the ordering value in existing metadata
mapping of a record.
*
@@ -309,7 +326,7 @@ public abstract class HoodieReaderContext<T> {
public Map<String, Object>
updateSchemaAndResetOrderingValInMetadata(Map<String, Object> meta,
Schema
schema) {
meta.remove(INTERNAL_META_ORDERING_FIELD);
- meta.put(INTERNAL_META_SCHEMA_ID,
this.schemaHandler.encodeAvroSchema(schema));
+ meta.put(INTERNAL_META_SCHEMA_ID, encodeAvroSchema(schema));
return meta;
}
@@ -364,4 +381,26 @@ public abstract class HoodieReaderContext<T> {
public boolean supportsParquetRowIndex() {
return false;
}
+
+ /**
+ * Encodes the given avro schema for efficient serialization.
+ */
+ private Integer encodeAvroSchema(Schema schema) {
+ return this.avroSchemaCache.cacheSchema(schema);
+ }
+
+ /**
+ * Decodes the avro schema with given version ID.
+ */
+ @Nullable
+ private Schema decodeAvroSchema(Object versionId) {
+ return this.avroSchemaCache.getSchema((Integer) versionId).orElse(null);
+ }
+
+ @Override
+ public void close() {
+ if (this.avroSchemaCache != null) {
+ this.avroSchemaCache.close();
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 1f2879a70f9..94794db8bbb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -73,7 +73,6 @@ import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PA
import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
import static
org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
import static
org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD;
-import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA_ID;
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
@@ -180,7 +179,6 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
@Override
public void close() {
records.clear();
- readerContext.getSchemaHandler().close();
}
/**
@@ -205,10 +203,10 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
// the `older` in the merge API
Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
recordMerger.get().partialMerge(
readerContext.constructHoodieRecord(Option.of(record), metadata),
-
readerContext.getSchemaHandler().decodeAvroSchema(metadata.get(INTERNAL_META_SCHEMA_ID)),
+ readerContext.getSchemaFromMetadata(metadata),
readerContext.constructHoodieRecord(
existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
-
readerContext.getSchemaHandler().decodeAvroSchema(existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA_ID)),
+
readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()),
readerSchema,
props);
if (!combinedRecordAndSchemaOpt.isPresent()) {
@@ -238,7 +236,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
Comparable incomingOrderingValue = readerContext.getOrderingValue(
Option.of(record), metadata, readerSchema, orderingFieldName,
orderingFieldTypeOpt, orderingFieldDefault);
if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) {
- return Option.of(Pair.of(isDeleteRecord(Option.of(record),
readerContext.getSchemaHandler().decodeAvroSchema(metadata.get(INTERNAL_META_SCHEMA_ID)))
+ return Option.of(Pair.of(isDeleteRecord(Option.of(record),
readerContext.getSchemaFromMetadata(metadata))
? Option.empty() : Option.of(record), metadata));
}
return Option.empty();
@@ -262,10 +260,10 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
} else {
Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
recordMerger.get().merge(
readerContext.constructHoodieRecord(Option.of(record),
metadata),
-
readerContext.getSchemaHandler().decodeAvroSchema(metadata.get(INTERNAL_META_SCHEMA_ID)),
+ readerContext.getSchemaFromMetadata(metadata),
readerContext.constructHoodieRecord(
existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
-
readerContext.getSchemaHandler().decodeAvroSchema(existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA_ID)),
+
readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()),
props);
if (!combinedRecordAndSchemaOpt.isPresent()) {
@@ -388,15 +386,15 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
Option<T> newer, Map<String, Object> newerInfoMap)
throws IOException {
if (!older.isPresent()) {
- return isDeleteRecord(newer,
readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID)))
? Option.empty() : newer;
+ return isDeleteRecord(newer,
readerContext.getSchemaFromMetadata(newerInfoMap)) ? Option.empty() : newer;
}
if (enablePartialMerging) {
// TODO(HUDI-7843): decouple the merging logic from the merger
// and use the record merge mode to control how to merge partial updates
Option<Pair<HoodieRecord, Schema>> mergedRecord =
recordMerger.get().partialMerge(
- readerContext.constructHoodieRecord(older, olderInfoMap),
readerContext.getSchemaHandler().decodeAvroSchema(olderInfoMap.get(INTERNAL_META_SCHEMA_ID)),
- readerContext.constructHoodieRecord(newer, newerInfoMap),
readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID)),
+ readerContext.constructHoodieRecord(older, olderInfoMap),
readerContext.getSchemaFromMetadata(olderInfoMap),
+ readerContext.constructHoodieRecord(newer, newerInfoMap),
readerContext.getSchemaFromMetadata(newerInfoMap),
readerSchema, props);
if (mergedRecord.isPresent()
@@ -410,7 +408,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
} else {
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
- return isDeleteRecord(newer,
readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID)))
? Option.empty() : newer;
+ return isDeleteRecord(newer,
readerContext.getSchemaFromMetadata(newerInfoMap)) ? Option.empty() : newer;
case EVENT_TIME_ORDERING:
Comparable newOrderingValue = readerContext.getOrderingValue(
newer, newerInfoMap, readerSchema, orderingFieldName,
orderingFieldTypeOpt, orderingFieldDefault);
@@ -421,9 +419,9 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
older, olderInfoMap, readerSchema, orderingFieldName,
orderingFieldTypeOpt, orderingFieldDefault);
if (!isDeleteRecordWithNaturalOrder(older, oldOrderingValue)
&& oldOrderingValue.compareTo(newOrderingValue) > 0) {
- return isDeleteRecord(older,
readerContext.getSchemaHandler().decodeAvroSchema(olderInfoMap.get(INTERNAL_META_SCHEMA_ID)))
? Option.empty() : older;
+ return isDeleteRecord(older,
readerContext.getSchemaFromMetadata(olderInfoMap)) ? Option.empty() : older;
}
- return isDeleteRecord(newer,
readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID)))
? Option.empty() : newer;
+ return isDeleteRecord(newer,
readerContext.getSchemaFromMetadata(newerInfoMap)) ? Option.empty() : newer;
case CUSTOM:
default:
if (payloadClass.isPresent()) {
@@ -443,8 +441,8 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
return Option.empty();
} else {
Option<Pair<HoodieRecord, Schema>> mergedRecord =
recordMerger.get().merge(
- readerContext.constructHoodieRecord(older, olderInfoMap),
readerContext.getSchemaHandler().decodeAvroSchema(olderInfoMap.get(INTERNAL_META_SCHEMA_ID)),
- readerContext.constructHoodieRecord(newer, newerInfoMap),
readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID)),
props);
+ readerContext.constructHoodieRecord(older, olderInfoMap),
readerContext.getSchemaFromMetadata(olderInfoMap),
+ readerContext.constructHoodieRecord(newer, newerInfoMap),
readerContext.getSchemaFromMetadata(newerInfoMap), props);
if (mergedRecord.isPresent()
&&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
if (!mergedRecord.get().getRight().equals(readerSchema)) {
@@ -482,7 +480,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
Schema recordSchema = readerSchema;
GenericRecord record = null;
if (recordOption.isPresent()) {
- recordSchema =
readerContext.getSchemaHandler().decodeAvroSchema(metadataMap.get(INTERNAL_META_SCHEMA_ID));
+ recordSchema = readerContext.getSchemaFromMetadata(metadataMap);
record = readerContext.convertToAvroRecord(recordOption.get(),
recordSchema);
}
HoodieKey hoodieKey = new HoodieKey((String)
metadataMap.get(INTERNAL_META_RECORD_KEY), (String)
metadataMap.get(INTERNAL_META_PARTITION_PATH));
@@ -495,7 +493,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
if (record.isDelete(readerSchema, props)) {
return readerSchema;
}
- return
readerContext.getSchemaHandler().decodeAvroSchema(infoMap.get(INTERNAL_META_SCHEMA_ID));
+ return readerContext.getSchemaFromMetadata(infoMap);
}
protected boolean hasNextBaseRecord(T baseRecord, Pair<Option<T>,
Map<String, Object>> logRecordInfo) throws IOException {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 003a5ffa3f7..e8cc9133f4d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -297,6 +297,9 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
if (recordBuffer != null) {
recordBuffer.close();
}
+ if (readerContext != null) {
+ readerContext.close();
+ }
}
public HoodieFileGroupReaderIterator<T> getClosableIterator() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
index 6e19a8e7d31..3686fa08ce6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
@@ -35,9 +35,6 @@ import
org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.avro.Schema;
-import javax.annotation.Nullable;
-
-import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -55,7 +52,7 @@ import static
org.apache.hudi.avro.AvroSchemaUtils.findNestedField;
/**
* This class is responsible for handling the schema for the file group reader.
*/
-public class HoodieFileGroupReaderSchemaHandler<T> implements Closeable {
+public class HoodieFileGroupReaderSchemaHandler<T> {
protected final Schema dataSchema;
@@ -244,24 +241,4 @@ public class HoodieFileGroupReaderSchemaHandler<T>
implements Closeable {
}
return createNewSchemaFromFieldsWithReference(dataSchema, fields);
}
-
- /**
- * Encodes the given avro schema for efficient serialization.
- */
- public Integer encodeAvroSchema(Schema schema) {
- return this.avroSchemaCache.cacheSchema(schema);
- }
-
- /**
- * Decodes the avro schema with given version ID.
- */
- @Nullable
- public Schema decodeAvroSchema(Object versionId) {
- return this.avroSchemaCache.getSchema((Integer) versionId).orElse(null);
- }
-
- @Override
- public void close() {
- this.avroSchemaCache.close();
- }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 8d968c8fad6..e91609ae71e 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -39,6 +39,8 @@ import
org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
@@ -50,14 +52,20 @@ import org.apache.avro.Schema;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
+import java.io.Serializable;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_ORDERING_FIELD;
+import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PARTITION_PATH;
+import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
@@ -67,6 +75,7 @@ import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.getLogFileListFromFileSlice;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.params.provider.Arguments.arguments;
@@ -101,6 +110,8 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
validateRecordsInFileGroup(tablePath, actualRecordList, schema, fileSlice,
false);
}
+ public abstract void assertRecordsEqual(Schema schema, T expected, T actual);
+
private static Stream<Arguments> testArguments() {
return Stream.of(
arguments(RecordMergeMode.COMMIT_TIME_ORDERING, "avro"),
@@ -157,6 +168,65 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
}
}
+ @ParameterizedTest
+ @EnumSource(value = ExternalSpillableMap.DiskMapType.class)
+ public void testSpillableMapUsage(ExternalSpillableMap.DiskMapType
diskMapType) throws Exception {
+ Map<String, String> writeConfigs = new
HashMap<>(getCommonConfigs(RecordMergeMode.COMMIT_TIME_ORDERING));
+ Option<Schema.Type> orderingFieldType = Option.of(Schema.Type.STRING);
+ try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
+ commitToTable(dataGen.generateInserts("001", 100), INSERT.value(),
writeConfigs);
+ String baseMapPath = Files.createTempDirectory(null).toString();
+ HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(getStorageConf(), getBasePath());
+ Schema avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema();
+ FileSlice fileSlice = getFileSliceToRead(getStorageConf(),
getBasePath(), metaClient, dataGen.getPartitionPaths(), true, 0);
+ List<T> records = readRecordsFromFileGroup(getStorageConf(),
getBasePath(), metaClient, fileSlice,
+ avroSchema, RecordMergeMode.COMMIT_TIME_ORDERING, false);
+ HoodieReaderContext<T> readerContext =
getHoodieReaderContext(getBasePath(), avroSchema, getStorageConf());
+ Comparable castedOrderingField = readerContext.castValue(100,
Schema.Type.STRING);
+ for (Boolean isCompressionEnabled : new boolean[] {true, false}) {
+ try (ExternalSpillableMap<Serializable, Pair<Option<T>, Map<String,
Object>>> spillableMap =
+ new ExternalSpillableMap<>(16L, baseMapPath, new
DefaultSizeEstimator(),
+ new HoodieRecordSizeEstimator(avroSchema), diskMapType,
isCompressionEnabled)) {
+ Long position = 0L;
+ for (T record : records) {
+ String recordKey = readerContext.getRecordKey(record, avroSchema);
+ //test key based
+ spillableMap.put(recordKey,
+ Pair.of(
+ Option.ofNullable(readerContext.seal(record)),
+ readerContext.generateMetadataForRecord(record,
avroSchema)));
+
+ //test position based
+ spillableMap.put(position++,
+ Pair.of(
+ Option.ofNullable(readerContext.seal(record)),
+ readerContext.generateMetadataForRecord(recordKey,
+ dataGen.getPartitionPaths()[0], 100,
orderingFieldType)));
+ }
+
+ assertEquals(records.size() * 2, spillableMap.size());
+ //Validate that everything is correct
+ position = 0L;
+ for (T record : records) {
+ String recordKey = readerContext.getRecordKey(record, avroSchema);
+ Pair<Option<T>, Map<String, Object>> keyBased =
spillableMap.get(recordKey);
+ assertNotNull(keyBased);
+ Pair<Option<T>, Map<String, Object>> positionBased =
spillableMap.get(position++);
+ assertNotNull(positionBased);
+ assertRecordsEqual(avroSchema, record, keyBased.getLeft().get());
+ assertRecordsEqual(avroSchema, record,
positionBased.getLeft().get());
+ assertEquals(keyBased.getRight().get(INTERNAL_META_RECORD_KEY),
recordKey);
+
assertEquals(positionBased.getRight().get(INTERNAL_META_RECORD_KEY), recordKey);
+ assertEquals(avroSchema,
readerContext.getSchemaFromMetadata(keyBased.getRight()));
+ assertEquals(dataGen.getPartitionPaths()[0],
positionBased.getRight().get(INTERNAL_META_PARTITION_PATH));
+ assertEquals(castedOrderingField,
positionBased.getRight().get(INTERNAL_META_ORDERING_FIELD));
+ }
+
+ }
+ }
+ }
+ }
+
private Map<String, String> getCommonConfigs(RecordMergeMode
recordMergeMode) {
Map<String, String> configMapping = new HashMap<>();
configMapping.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"_row_key");
@@ -185,6 +255,19 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
RecordMergeMode
recordMergeMode) throws Exception {
HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(storageConf, tablePath);
Schema avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema();
+ FileSlice fileSlice = getFileSliceToRead(storageConf, tablePath,
metaClient, partitionPaths, containsBaseFile, expectedLogFileNum);
+ List<T> actualRecordList = readRecordsFromFileGroup(storageConf,
tablePath, metaClient, fileSlice, avroSchema, recordMergeMode, false);
+ validateRecordsInFileGroup(tablePath, actualRecordList, avroSchema,
fileSlice);
+ actualRecordList = readRecordsFromFileGroup(storageConf, tablePath,
metaClient, fileSlice, avroSchema, recordMergeMode, true);
+ validateRecordsInFileGroup(tablePath, actualRecordList, avroSchema,
fileSlice, true);
+ }
+
+ private FileSlice getFileSliceToRead(StorageConfiguration<?> storageConf,
+ String tablePath,
+ HoodieTableMetaClient metaClient,
+ String[] partitionPaths,
+ boolean containsBaseFile,
+ int expectedLogFileNum) {
HoodieEngineContext engineContext = new
HoodieLocalEngineContext(storageConf);
HoodieMetadataConfig metadataConfig =
HoodieMetadataConfig.newBuilder().build();
FileSystemViewManager viewManager =
FileSystemViewManager.createViewManager(
@@ -196,6 +279,17 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
FileSlice fileSlice =
fsView.getAllFileSlices(partitionPaths[0]).findFirst().get();
List<String> logFilePathList = getLogFileListFromFileSlice(fileSlice);
assertEquals(expectedLogFileNum, logFilePathList.size());
+ assertEquals(containsBaseFile, fileSlice.getBaseFile().isPresent());
+ return fileSlice;
+ }
+
+ private List<T> readRecordsFromFileGroup(StorageConfiguration<?> storageConf,
+ String tablePath,
+ HoodieTableMetaClient metaClient,
+ FileSlice fileSlice,
+ Schema avroSchema,
+ RecordMergeMode recordMergeMode,
+ boolean isSkipMerge) throws
Exception {
List<T> actualRecordList = new ArrayList<>();
TypedProperties props = new TypedProperties();
@@ -213,7 +307,9 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
if (metaClient.getTableConfig().contains(PARTITION_FIELDS)) {
props.setProperty(PARTITION_FIELDS.key(),
metaClient.getTableConfig().getString(PARTITION_FIELDS));
}
- assertEquals(containsBaseFile, fileSlice.getBaseFile().isPresent());
+ if (isSkipMerge) {
+ props.setProperty(HoodieReaderConfig.MERGE_TYPE.key(),
HoodieReaderConfig.REALTIME_SKIP_MERGE);
+ }
if (shouldValidatePartialRead(fileSlice, avroSchema)) {
assertThrows(IllegalArgumentException.class, () -> new
HoodieFileGroupReader<>(
getHoodieReaderContext(tablePath, avroSchema, storageConf),
@@ -249,33 +345,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
actualRecordList.add(fileGroupReader.next());
}
fileGroupReader.close();
-
- validateRecordsInFileGroup(tablePath, actualRecordList, avroSchema,
fileSlice);
-
- //validate skip merge
- actualRecordList.clear();
- props.setProperty(HoodieReaderConfig.MERGE_TYPE.key(),
HoodieReaderConfig.REALTIME_SKIP_MERGE);
- fileGroupReader = new HoodieFileGroupReader<>(
- getHoodieReaderContext(tablePath, avroSchema, storageConf),
- metaClient.getStorage(),
- tablePath,
- metaClient.getActiveTimeline().lastInstant().get().requestedTime(),
- fileSlice,
- avroSchema,
- avroSchema,
- Option.empty(),
- metaClient,
- props,
- 0,
- fileSlice.getTotalFileSize(),
- false);
- fileGroupReader.initRecordIterators();
- while (fileGroupReader.hasNext()) {
- actualRecordList.add(fileGroupReader.next());
- }
- fileGroupReader.close();
-
- validateRecordsInFileGroup(tablePath, actualRecordList, avroSchema,
fileSlice, true);
+ return actualRecordList;
}
private boolean shouldValidatePartialRead(FileSlice fileSlice, Schema
requestedSchema) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestAvroSchemaCache.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestAvroSchemaCache.java
new file mode 100644
index 00000000000..37eaa3802d5
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestAvroSchemaCache.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestAvroSchemaCache {
+
+ @Test
+ public void testBasicCacheUsage() {
+ AvroSchemaCache avroSchemaCache = AvroSchemaCache.getInstance();
+ Integer avroSchemaCacheNum =
avroSchemaCache.cacheSchema(HoodieTestDataGenerator.AVRO_SCHEMA);
+ Integer avroTripSchemaCacheNum =
avroSchemaCache.cacheSchema(HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
+ Integer flatAvroSchemaCacheNum =
avroSchemaCache.cacheSchema(HoodieTestDataGenerator.FLATTENED_AVRO_SCHEMA);
+ Integer nestAvroSchemaCacheNum =
avroSchemaCache.cacheSchema(HoodieTestDataGenerator.NESTED_AVRO_SCHEMA);
+ Set<Integer> uniqueSet = new HashSet<>(Arrays.asList(avroSchemaCacheNum,
avroTripSchemaCacheNum, flatAvroSchemaCacheNum, nestAvroSchemaCacheNum));
+ assertEquals(4, uniqueSet.size());
+ assertTrue(avroSchemaCache.getSchema(avroSchemaCacheNum).isPresent());
+ assertEquals(HoodieTestDataGenerator.AVRO_SCHEMA,
avroSchemaCache.getSchema(avroSchemaCacheNum).get());
+ assertTrue(avroSchemaCache.getSchema(avroTripSchemaCacheNum).isPresent());
+ assertEquals(HoodieTestDataGenerator.AVRO_TRIP_SCHEMA,
avroSchemaCache.getSchema(avroTripSchemaCacheNum).get());
+ assertTrue(avroSchemaCache.getSchema(flatAvroSchemaCacheNum).isPresent());
+ assertEquals(HoodieTestDataGenerator.FLATTENED_AVRO_SCHEMA,
avroSchemaCache.getSchema(flatAvroSchemaCacheNum).get());
+ assertTrue(avroSchemaCache.getSchema(nestAvroSchemaCacheNum).isPresent());
+ assertEquals(HoodieTestDataGenerator.NESTED_AVRO_SCHEMA,
avroSchemaCache.getSchema(nestAvroSchemaCacheNum).get());
+ }
+
+ @Test
+ public void testCopiesOfSameSchema() {
+ AvroSchemaCache avroSchemaCache = AvroSchemaCache.getInstance();
+ Schema testSchema1 = new
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+ Schema testSchema2 = new
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+ Integer cachenum = avroSchemaCache.cacheSchema(testSchema1);
+ Integer secondSchemaCacheNum = avroSchemaCache.cacheSchema(testSchema2);
+ assertEquals(cachenum, secondSchemaCacheNum);
+ assertTrue(avroSchemaCache.getSchema(cachenum).isPresent());
+ assertEquals(testSchema1, avroSchemaCache.getSchema(cachenum).get());
+ }
+}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index ef40b8624f9..f1586c55858 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -207,7 +207,7 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
if (!recordOption.isPresent()) {
return new HoodieEmptyRecord<>(new HoodieKey((String)
metadataMap.get(INTERNAL_META_RECORD_KEY), (String)
metadataMap.get(INTERNAL_META_PARTITION_PATH)),
HoodieRecord.HoodieRecordType.HIVE);
}
- Schema schema =
getSchemaHandler().decodeAvroSchema(metadataMap.get(INTERNAL_META_SCHEMA_ID));
+ Schema schema = getSchemaFromMetadata(metadataMap);
ArrayWritable writable = recordOption.get();
return new HoodieHiveRecord(new HoodieKey((String)
metadataMap.get(INTERNAL_META_RECORD_KEY), (String)
metadataMap.get(INTERNAL_META_PARTITION_PATH)), writable, schema,
objectInspectorCache);
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 2f6f3530213..b9bd6fce50a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -23,7 +23,6 @@ import org.apache.hudi.common.config.HoodieReaderConfig
import
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.model.{FileSlice, HoodieRecord,
WriteOperationType}
-import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.{HoodieTestUtils, RawTripTestPayload}
import org.apache.hudi.storage.StorageConfiguration
import org.apache.hudi.{SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
@@ -32,6 +31,7 @@ import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Dataset, HoodieInternalRowUtils,
HoodieUnsafeUtils, Row, SaveMode, SparkSession}
import org.apache.spark.{HoodieSparkKryoRegistrar, SparkConf}
import org.junit.jupiter.api.Assertions.assertEquals
@@ -85,8 +85,6 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
override def getHoodieReaderContext(tablePath: String, avroSchema: Schema,
storageConf: StorageConfiguration[_]): HoodieReaderContext[InternalRow] = {
val reader = sparkAdapter.createParquetFileReader(vectorized = false,
spark.sessionState.conf, Map.empty,
storageConf.unwrapAs(classOf[Configuration]))
- val metaClient =
HoodieTableMetaClient.builder().setConf(storageConf).setBasePath(tablePath).build
- val recordKeyField = metaClient.getTableConfig.getRecordKeyFields.get()(0)
new SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty)
}
@@ -125,4 +123,12 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
}
override def getCustomPayload: String =
classOf[CustomPayloadForTesting].getName
+
+ override def assertRecordsEqual(schema: Schema, expected: InternalRow,
actual: InternalRow): Unit = {
+ assertEquals(expected.numFields, actual.numFields)
+ val expectedStruct =
sparkAdapter.getAvroSchemaConverters.toSqlType(schema)._1.asInstanceOf[StructType]
+ expected.toSeq(expectedStruct).zip(actual.toSeq(expectedStruct)).foreach(
converted => {
+ assertEquals(converted._1, converted._2)
+ })
+ }
}