This is an automated email from the ASF dual-hosted git repository.

jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f95bb6ab240 add tests for spill map and schema cache. And minor 
refactor (#12430)
f95bb6ab240 is described below

commit f95bb6ab2402afd2962cc23803dc0e8c542184c9
Author: Jon Vexler <[email protected]>
AuthorDate: Mon Dec 23 14:27:04 2024 -0500

    add tests for spill map and schema cache. And minor refactor (#12430)
---
 .../hadoop/TestHoodieFileGroupReaderOnHive.java    |   5 +
 .../hudi/BaseSparkInternalRowReaderContext.java    |   2 +-
 .../hudi/common/engine/HoodieReaderContext.java    |  45 +++++++-
 .../read/HoodieBaseFileGroupRecordBuffer.java      |  32 +++---
 .../common/table/read/HoodieFileGroupReader.java   |   3 +
 .../read/HoodieFileGroupReaderSchemaHandler.java   |  25 +---
 .../table/read/TestHoodieFileGroupReaderBase.java  | 126 ++++++++++++++++-----
 .../hudi/common/util/TestAvroSchemaCache.java      |  66 +++++++++++
 .../hudi/hadoop/HiveHoodieReaderContext.java       |   2 +-
 .../read/TestHoodieFileGroupReaderOnSpark.scala    |  12 +-
 10 files changed, 241 insertions(+), 77 deletions(-)

diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
index 152cb2f79b1..93848f3496a 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java
@@ -249,6 +249,11 @@ public class TestHoodieFileGroupReaderOnHive extends 
TestHoodieFileGroupReaderBa
     }
   }
 
+  @Override
+  public void assertRecordsEqual(Schema schema, ArrayWritable expected, 
ArrayWritable actual) {
+    ArrayWritableTestUtil.assertArrayWritableEqual(schema, expected, actual, 
false);
+  }
+
   private static boolean isLogFileRec(HoodieReaderContext<ArrayWritable> 
readerContext, Schema schema, ArrayWritable record) {
     return !readerContext.getValue(record, schema, 
HoodieRecord.FILENAME_METADATA_FIELD).toString().contains(".parquet");
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index a195f161e1d..8260393f492 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -96,7 +96,7 @@ public abstract class BaseSparkInternalRowReaderContext 
extends HoodieReaderCont
           HoodieRecord.HoodieRecordType.SPARK);
     }
 
-    Schema schema = 
getSchemaHandler().decodeAvroSchema(metadataMap.get(INTERNAL_META_SCHEMA_ID));
+    Schema schema = getSchemaFromMetadata(metadataMap);
     InternalRow row = rowOption.get();
     return new HoodieSparkRecord(row, 
HoodieInternalRowUtils.getCachedSchema(schema));
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 9a49acaf671..db4e23d1bbc 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.read.HoodieFileGroupReaderSchemaHandler;
+import org.apache.hudi.common.util.AvroSchemaCache;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.storage.HoodieStorage;
@@ -33,6 +34,9 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -52,7 +56,7 @@ import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIEL
  * @param <T> The type of engine-specific record representation, e.g.,{@code 
InternalRow} in Spark
  *            and {@code RowData} in Flink.
  */
-public abstract class HoodieReaderContext<T> {
+public abstract class HoodieReaderContext<T> implements Closeable {
 
   private HoodieFileGroupReaderSchemaHandler<T> schemaHandler = null;
   private String tablePath = null;
@@ -63,6 +67,9 @@ public abstract class HoodieReaderContext<T> {
   private Boolean needsBootstrapMerge = null;
   private Boolean shouldMergeUseRecordPosition = null;
 
+  // for encoding and decoding schemas to the spillable map
+  private final AvroSchemaCache avroSchemaCache = 
AvroSchemaCache.getInstance();
+
   // Getter and Setter for schemaHandler
   public HoodieFileGroupReaderSchemaHandler<T> getSchemaHandler() {
     return schemaHandler;
@@ -295,10 +302,20 @@ public abstract class HoodieReaderContext<T> {
   public Map<String, Object> generateMetadataForRecord(T record, Schema 
schema) {
     Map<String, Object> meta = new HashMap<>();
     meta.put(INTERNAL_META_RECORD_KEY, getRecordKey(record, schema));
-    meta.put(INTERNAL_META_SCHEMA_ID, 
this.schemaHandler.encodeAvroSchema(schema));
+    meta.put(INTERNAL_META_SCHEMA_ID, encodeAvroSchema(schema));
     return meta;
   }
 
+  /**
+   * Gets the schema encoded in the metadata map
+   *
+   * @param infoMap The record metadata
+   * @return the avro schema if it is encoded in the metadata map, else null
+   */
+  public Schema getSchemaFromMetadata(Map<String, Object> infoMap) {
+    return decodeAvroSchema(infoMap.get(INTERNAL_META_SCHEMA_ID));
+  }
+
   /**
    * Updates the schema and reset the ordering value in existing metadata 
mapping of a record.
    *
@@ -309,7 +326,7 @@ public abstract class HoodieReaderContext<T> {
   public Map<String, Object> 
updateSchemaAndResetOrderingValInMetadata(Map<String, Object> meta,
                                                                        Schema 
schema) {
     meta.remove(INTERNAL_META_ORDERING_FIELD);
-    meta.put(INTERNAL_META_SCHEMA_ID, 
this.schemaHandler.encodeAvroSchema(schema));
+    meta.put(INTERNAL_META_SCHEMA_ID, encodeAvroSchema(schema));
     return meta;
   }
 
@@ -364,4 +381,26 @@ public abstract class HoodieReaderContext<T> {
   public boolean supportsParquetRowIndex() {
     return false;
   }
+
+  /**
+   * Encodes the given avro schema for efficient serialization.
+   */
+  private Integer encodeAvroSchema(Schema schema) {
+    return this.avroSchemaCache.cacheSchema(schema);
+  }
+
+  /**
+   * Decodes the avro schema with given version ID.
+   */
+  @Nullable
+  private Schema decodeAvroSchema(Object versionId) {
+    return this.avroSchemaCache.getSchema((Integer) versionId).orElse(null);
+  }
+
+  @Override
+  public void close() {
+    if (this.avroSchemaCache != null) {
+      this.avroSchemaCache.close();
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 1f2879a70f9..94794db8bbb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -73,7 +73,6 @@ import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PA
 import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
 import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
 import static 
org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD;
-import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA_ID;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
 
@@ -180,7 +179,6 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
   @Override
   public void close() {
     records.clear();
-    readerContext.getSchemaHandler().close();
   }
 
   /**
@@ -205,10 +203,10 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
         // the `older` in the merge API
         Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = 
recordMerger.get().partialMerge(
             readerContext.constructHoodieRecord(Option.of(record), metadata),
-            
readerContext.getSchemaHandler().decodeAvroSchema(metadata.get(INTERNAL_META_SCHEMA_ID)),
+            readerContext.getSchemaFromMetadata(metadata),
             readerContext.constructHoodieRecord(
                 existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
-            
readerContext.getSchemaHandler().decodeAvroSchema(existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA_ID)),
+            
readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()),
             readerSchema,
             props);
         if (!combinedRecordAndSchemaOpt.isPresent()) {
@@ -238,7 +236,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
             Comparable incomingOrderingValue = readerContext.getOrderingValue(
                 Option.of(record), metadata, readerSchema, orderingFieldName, 
orderingFieldTypeOpt, orderingFieldDefault);
             if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) {
-              return Option.of(Pair.of(isDeleteRecord(Option.of(record), 
readerContext.getSchemaHandler().decodeAvroSchema(metadata.get(INTERNAL_META_SCHEMA_ID)))
+              return Option.of(Pair.of(isDeleteRecord(Option.of(record), 
readerContext.getSchemaFromMetadata(metadata))
                   ? Option.empty() : Option.of(record), metadata));
             }
             return Option.empty();
@@ -262,10 +260,10 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
             } else {
               Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = 
recordMerger.get().merge(
                   readerContext.constructHoodieRecord(Option.of(record), 
metadata),
-                  
readerContext.getSchemaHandler().decodeAvroSchema(metadata.get(INTERNAL_META_SCHEMA_ID)),
+                  readerContext.getSchemaFromMetadata(metadata),
                   readerContext.constructHoodieRecord(
                       existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
-                  
readerContext.getSchemaHandler().decodeAvroSchema(existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA_ID)),
+                  
readerContext.getSchemaFromMetadata(existingRecordMetadataPair.getRight()),
                   props);
 
               if (!combinedRecordAndSchemaOpt.isPresent()) {
@@ -388,15 +386,15 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
   protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
                             Option<T> newer, Map<String, Object> newerInfoMap) 
throws IOException {
     if (!older.isPresent()) {
-      return isDeleteRecord(newer, 
readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID)))
 ? Option.empty() : newer;
+      return isDeleteRecord(newer, 
readerContext.getSchemaFromMetadata(newerInfoMap)) ? Option.empty() : newer;
     }
 
     if (enablePartialMerging) {
       // TODO(HUDI-7843): decouple the merging logic from the merger
       //  and use the record merge mode to control how to merge partial updates
       Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.get().partialMerge(
-          readerContext.constructHoodieRecord(older, olderInfoMap), 
readerContext.getSchemaHandler().decodeAvroSchema(olderInfoMap.get(INTERNAL_META_SCHEMA_ID)),
-          readerContext.constructHoodieRecord(newer, newerInfoMap), 
readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID)),
+          readerContext.constructHoodieRecord(older, olderInfoMap), 
readerContext.getSchemaFromMetadata(olderInfoMap),
+          readerContext.constructHoodieRecord(newer, newerInfoMap), 
readerContext.getSchemaFromMetadata(newerInfoMap),
           readerSchema, props);
 
       if (mergedRecord.isPresent()
@@ -410,7 +408,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     } else {
       switch (recordMergeMode) {
         case COMMIT_TIME_ORDERING:
-          return isDeleteRecord(newer, 
readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID)))
 ? Option.empty() : newer;
+          return isDeleteRecord(newer, 
readerContext.getSchemaFromMetadata(newerInfoMap)) ? Option.empty() : newer;
         case EVENT_TIME_ORDERING:
           Comparable newOrderingValue = readerContext.getOrderingValue(
               newer, newerInfoMap, readerSchema, orderingFieldName, 
orderingFieldTypeOpt, orderingFieldDefault);
@@ -421,9 +419,9 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
               older, olderInfoMap, readerSchema, orderingFieldName, 
orderingFieldTypeOpt, orderingFieldDefault);
           if (!isDeleteRecordWithNaturalOrder(older, oldOrderingValue)
               && oldOrderingValue.compareTo(newOrderingValue) > 0) {
-            return isDeleteRecord(older, 
readerContext.getSchemaHandler().decodeAvroSchema(olderInfoMap.get(INTERNAL_META_SCHEMA_ID)))
 ? Option.empty() : older;
+            return isDeleteRecord(older, 
readerContext.getSchemaFromMetadata(olderInfoMap)) ? Option.empty() : older;
           }
-          return isDeleteRecord(newer, 
readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID)))
 ? Option.empty() : newer;
+          return isDeleteRecord(newer, 
readerContext.getSchemaFromMetadata(newerInfoMap)) ? Option.empty() : newer;
         case CUSTOM:
         default:
           if (payloadClass.isPresent()) {
@@ -443,8 +441,8 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
             return Option.empty();
           } else {
             Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.get().merge(
-                readerContext.constructHoodieRecord(older, olderInfoMap), 
readerContext.getSchemaHandler().decodeAvroSchema(olderInfoMap.get(INTERNAL_META_SCHEMA_ID)),
-                readerContext.constructHoodieRecord(newer, newerInfoMap), 
readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID)),
 props);
+                readerContext.constructHoodieRecord(older, olderInfoMap), 
readerContext.getSchemaFromMetadata(olderInfoMap),
+                readerContext.constructHoodieRecord(newer, newerInfoMap), 
readerContext.getSchemaFromMetadata(newerInfoMap), props);
             if (mergedRecord.isPresent()
                 && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
               if (!mergedRecord.get().getRight().equals(readerSchema)) {
@@ -482,7 +480,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     Schema recordSchema = readerSchema;
     GenericRecord record = null;
     if (recordOption.isPresent()) {
-      recordSchema = 
readerContext.getSchemaHandler().decodeAvroSchema(metadataMap.get(INTERNAL_META_SCHEMA_ID));
+      recordSchema = readerContext.getSchemaFromMetadata(metadataMap);
       record = readerContext.convertToAvroRecord(recordOption.get(), 
recordSchema);
     }
     HoodieKey hoodieKey = new HoodieKey((String) 
metadataMap.get(INTERNAL_META_RECORD_KEY), (String) 
metadataMap.get(INTERNAL_META_PARTITION_PATH));
@@ -495,7 +493,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     if (record.isDelete(readerSchema, props)) {
       return readerSchema;
     }
-    return 
readerContext.getSchemaHandler().decodeAvroSchema(infoMap.get(INTERNAL_META_SCHEMA_ID));
+    return readerContext.getSchemaFromMetadata(infoMap);
   }
 
   protected boolean hasNextBaseRecord(T baseRecord, Pair<Option<T>, 
Map<String, Object>> logRecordInfo) throws IOException {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 003a5ffa3f7..e8cc9133f4d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -297,6 +297,9 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     if (recordBuffer != null) {
       recordBuffer.close();
     }
+    if (readerContext != null) {
+      readerContext.close();
+    }
   }
 
   public HoodieFileGroupReaderIterator<T> getClosableIterator() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
index 6e19a8e7d31..3686fa08ce6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
@@ -35,9 +35,6 @@ import 
org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
 
 import org.apache.avro.Schema;
 
-import javax.annotation.Nullable;
-
-import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -55,7 +52,7 @@ import static 
org.apache.hudi.avro.AvroSchemaUtils.findNestedField;
 /**
  * This class is responsible for handling the schema for the file group reader.
  */
-public class HoodieFileGroupReaderSchemaHandler<T> implements Closeable {
+public class HoodieFileGroupReaderSchemaHandler<T> {
 
   protected final Schema dataSchema;
 
@@ -244,24 +241,4 @@ public class HoodieFileGroupReaderSchemaHandler<T> 
implements Closeable {
     }
     return createNewSchemaFromFieldsWithReference(dataSchema, fields);
   }
-
-  /**
-   * Encodes the given avro schema for efficient serialization.
-   */
-  public Integer encodeAvroSchema(Schema schema) {
-    return this.avroSchemaCache.cacheSchema(schema);
-  }
-
-  /**
-   * Decodes the avro schema with given version ID.
-   */
-  @Nullable
-  public Schema decodeAvroSchema(Object versionId) {
-    return this.avroSchemaCache.getSchema((Integer) versionId).orElse(null);
-  }
-
-  @Override
-  public void close() {
-    this.avroSchemaCache.close();
-  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 8d968c8fad6..e91609ae71e 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -39,6 +39,8 @@ import 
org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
@@ -50,14 +52,20 @@ import org.apache.avro.Schema;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import java.io.Serializable;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_ORDERING_FIELD;
+import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PARTITION_PATH;
+import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
 import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
@@ -67,6 +75,7 @@ import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getLogFileListFromFileSlice;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
 
@@ -101,6 +110,8 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     validateRecordsInFileGroup(tablePath, actualRecordList, schema, fileSlice, 
false);
   }
 
+  public abstract void assertRecordsEqual(Schema schema, T expected, T actual);
+
   private static Stream<Arguments> testArguments() {
     return Stream.of(
         arguments(RecordMergeMode.COMMIT_TIME_ORDERING, "avro"),
@@ -157,6 +168,65 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     }
   }
 
+  @ParameterizedTest
+  @EnumSource(value = ExternalSpillableMap.DiskMapType.class)
+  public void testSpillableMapUsage(ExternalSpillableMap.DiskMapType 
diskMapType) throws Exception {
+    Map<String, String> writeConfigs = new 
HashMap<>(getCommonConfigs(RecordMergeMode.COMMIT_TIME_ORDERING));
+    Option<Schema.Type> orderingFieldType = Option.of(Schema.Type.STRING);
+    try (HoodieTestDataGenerator dataGen = new 
HoodieTestDataGenerator(0xDEEF)) {
+      commitToTable(dataGen.generateInserts("001", 100), INSERT.value(), 
writeConfigs);
+      String baseMapPath = Files.createTempDirectory(null).toString();
+      HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(getStorageConf(), getBasePath());
+      Schema avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema();
+      FileSlice fileSlice = getFileSliceToRead(getStorageConf(), 
getBasePath(), metaClient, dataGen.getPartitionPaths(), true, 0);
+      List<T> records = readRecordsFromFileGroup(getStorageConf(), 
getBasePath(), metaClient,  fileSlice,
+          avroSchema, RecordMergeMode.COMMIT_TIME_ORDERING, false);
+      HoodieReaderContext<T> readerContext = 
getHoodieReaderContext(getBasePath(), avroSchema, getStorageConf());
+      Comparable castedOrderingField = readerContext.castValue(100, 
Schema.Type.STRING);
+      for (Boolean isCompressionEnabled : new boolean[] {true, false}) {
+        try (ExternalSpillableMap<Serializable, Pair<Option<T>, Map<String, 
Object>>> spillableMap =
+                 new ExternalSpillableMap<>(16L, baseMapPath, new 
DefaultSizeEstimator(),
+                     new HoodieRecordSizeEstimator(avroSchema), diskMapType, 
isCompressionEnabled)) {
+          Long position = 0L;
+          for (T record : records) {
+            String recordKey = readerContext.getRecordKey(record, avroSchema);
+            //test key based
+            spillableMap.put(recordKey,
+                Pair.of(
+                    Option.ofNullable(readerContext.seal(record)),
+                    readerContext.generateMetadataForRecord(record, 
avroSchema)));
+
+            //test position based
+            spillableMap.put(position++,
+                Pair.of(
+                    Option.ofNullable(readerContext.seal(record)),
+                    readerContext.generateMetadataForRecord(recordKey,
+                        dataGen.getPartitionPaths()[0], 100, 
orderingFieldType)));
+          }
+
+          assertEquals(records.size() * 2, spillableMap.size());
+          //Validate that everything is correct
+          position = 0L;
+          for (T record : records) {
+            String recordKey = readerContext.getRecordKey(record, avroSchema);
+            Pair<Option<T>, Map<String, Object>> keyBased = 
spillableMap.get(recordKey);
+            assertNotNull(keyBased);
+            Pair<Option<T>, Map<String, Object>> positionBased = 
spillableMap.get(position++);
+            assertNotNull(positionBased);
+            assertRecordsEqual(avroSchema, record, keyBased.getLeft().get());
+            assertRecordsEqual(avroSchema, record, 
positionBased.getLeft().get());
+            assertEquals(keyBased.getRight().get(INTERNAL_META_RECORD_KEY), 
recordKey);
+            
assertEquals(positionBased.getRight().get(INTERNAL_META_RECORD_KEY), recordKey);
+            assertEquals(avroSchema, 
readerContext.getSchemaFromMetadata(keyBased.getRight()));
+            assertEquals(dataGen.getPartitionPaths()[0], 
positionBased.getRight().get(INTERNAL_META_PARTITION_PATH));
+            assertEquals(castedOrderingField, 
positionBased.getRight().get(INTERNAL_META_ORDERING_FIELD));
+          }
+
+        }
+      }
+    }
+  }
+
   private Map<String, String> getCommonConfigs(RecordMergeMode 
recordMergeMode) {
     Map<String, String> configMapping = new HashMap<>();
     configMapping.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
"_row_key");
@@ -185,6 +255,19 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
                                                  RecordMergeMode 
recordMergeMode) throws Exception {
     HoodieTableMetaClient metaClient = 
HoodieTestUtils.createMetaClient(storageConf, tablePath);
     Schema avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema();
+    FileSlice fileSlice = getFileSliceToRead(storageConf, tablePath, 
metaClient, partitionPaths, containsBaseFile, expectedLogFileNum);
+    List<T> actualRecordList = readRecordsFromFileGroup(storageConf, 
tablePath, metaClient, fileSlice, avroSchema, recordMergeMode, false);
+    validateRecordsInFileGroup(tablePath, actualRecordList, avroSchema, 
fileSlice);
+    actualRecordList = readRecordsFromFileGroup(storageConf, tablePath, 
metaClient, fileSlice, avroSchema, recordMergeMode, true);
+    validateRecordsInFileGroup(tablePath, actualRecordList, avroSchema, 
fileSlice, true);
+  }
+
+  private FileSlice getFileSliceToRead(StorageConfiguration<?> storageConf,
+                                       String tablePath,
+                                       HoodieTableMetaClient metaClient,
+                                       String[] partitionPaths,
+                                       boolean containsBaseFile,
+                                       int expectedLogFileNum) {
     HoodieEngineContext engineContext = new 
HoodieLocalEngineContext(storageConf);
     HoodieMetadataConfig metadataConfig = 
HoodieMetadataConfig.newBuilder().build();
     FileSystemViewManager viewManager = 
FileSystemViewManager.createViewManager(
@@ -196,6 +279,17 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     FileSlice fileSlice = 
fsView.getAllFileSlices(partitionPaths[0]).findFirst().get();
     List<String> logFilePathList = getLogFileListFromFileSlice(fileSlice);
     assertEquals(expectedLogFileNum, logFilePathList.size());
+    assertEquals(containsBaseFile, fileSlice.getBaseFile().isPresent());
+    return fileSlice;
+  }
+
+  private List<T> readRecordsFromFileGroup(StorageConfiguration<?> storageConf,
+                                           String tablePath,
+                                           HoodieTableMetaClient metaClient,
+                                           FileSlice fileSlice,
+                                           Schema avroSchema,
+                                           RecordMergeMode recordMergeMode,
+                                           boolean isSkipMerge) throws 
Exception {
 
     List<T> actualRecordList = new ArrayList<>();
     TypedProperties props = new TypedProperties();
@@ -213,7 +307,9 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     if (metaClient.getTableConfig().contains(PARTITION_FIELDS)) {
       props.setProperty(PARTITION_FIELDS.key(), 
metaClient.getTableConfig().getString(PARTITION_FIELDS));
     }
-    assertEquals(containsBaseFile, fileSlice.getBaseFile().isPresent());
+    if (isSkipMerge) {
+      props.setProperty(HoodieReaderConfig.MERGE_TYPE.key(), 
HoodieReaderConfig.REALTIME_SKIP_MERGE);
+    }
     if (shouldValidatePartialRead(fileSlice, avroSchema)) {
       assertThrows(IllegalArgumentException.class, () -> new 
HoodieFileGroupReader<>(
           getHoodieReaderContext(tablePath, avroSchema, storageConf),
@@ -249,33 +345,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
       actualRecordList.add(fileGroupReader.next());
     }
     fileGroupReader.close();
-
-    validateRecordsInFileGroup(tablePath, actualRecordList, avroSchema, 
fileSlice);
-
-    //validate skip merge
-    actualRecordList.clear();
-    props.setProperty(HoodieReaderConfig.MERGE_TYPE.key(), 
HoodieReaderConfig.REALTIME_SKIP_MERGE);
-    fileGroupReader = new HoodieFileGroupReader<>(
-        getHoodieReaderContext(tablePath, avroSchema, storageConf),
-        metaClient.getStorage(),
-        tablePath,
-        metaClient.getActiveTimeline().lastInstant().get().requestedTime(),
-        fileSlice,
-        avroSchema,
-        avroSchema,
-        Option.empty(),
-        metaClient,
-        props,
-        0,
-        fileSlice.getTotalFileSize(),
-        false);
-    fileGroupReader.initRecordIterators();
-    while (fileGroupReader.hasNext()) {
-      actualRecordList.add(fileGroupReader.next());
-    }
-    fileGroupReader.close();
-
-    validateRecordsInFileGroup(tablePath, actualRecordList, avroSchema, 
fileSlice, true);
+    return actualRecordList;
   }
 
   private boolean shouldValidatePartialRead(FileSlice fileSlice, Schema 
requestedSchema) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestAvroSchemaCache.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestAvroSchemaCache.java
new file mode 100644
index 00000000000..37eaa3802d5
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestAvroSchemaCache.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestAvroSchemaCache {
+
+  @Test
+  public void testBasicCacheUsage() {
+    AvroSchemaCache avroSchemaCache = AvroSchemaCache.getInstance();
+    Integer avroSchemaCacheNum = 
avroSchemaCache.cacheSchema(HoodieTestDataGenerator.AVRO_SCHEMA);
+    Integer avroTripSchemaCacheNum = 
avroSchemaCache.cacheSchema(HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
+    Integer flatAvroSchemaCacheNum = 
avroSchemaCache.cacheSchema(HoodieTestDataGenerator.FLATTENED_AVRO_SCHEMA);
+    Integer nestAvroSchemaCacheNum = 
avroSchemaCache.cacheSchema(HoodieTestDataGenerator.NESTED_AVRO_SCHEMA);
+    Set<Integer> uniqueSet = new HashSet<>(Arrays.asList(avroSchemaCacheNum, 
avroTripSchemaCacheNum, flatAvroSchemaCacheNum, nestAvroSchemaCacheNum));
+    assertEquals(4, uniqueSet.size());
+    assertTrue(avroSchemaCache.getSchema(avroSchemaCacheNum).isPresent());
+    assertEquals(HoodieTestDataGenerator.AVRO_SCHEMA, 
avroSchemaCache.getSchema(avroSchemaCacheNum).get());
+    assertTrue(avroSchemaCache.getSchema(avroTripSchemaCacheNum).isPresent());
+    assertEquals(HoodieTestDataGenerator.AVRO_TRIP_SCHEMA, 
avroSchemaCache.getSchema(avroTripSchemaCacheNum).get());
+    assertTrue(avroSchemaCache.getSchema(flatAvroSchemaCacheNum).isPresent());
+    assertEquals(HoodieTestDataGenerator.FLATTENED_AVRO_SCHEMA, 
avroSchemaCache.getSchema(flatAvroSchemaCacheNum).get());
+    assertTrue(avroSchemaCache.getSchema(nestAvroSchemaCacheNum).isPresent());
+    assertEquals(HoodieTestDataGenerator.NESTED_AVRO_SCHEMA, 
avroSchemaCache.getSchema(nestAvroSchemaCacheNum).get());
+  }
+
+  @Test
+  public void testCopiesOfSameSchema() {
+    AvroSchemaCache avroSchemaCache = AvroSchemaCache.getInstance();
+    Schema testSchema1 = new 
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+    Schema testSchema2 = new 
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+    Integer cachenum = avroSchemaCache.cacheSchema(testSchema1);
+    Integer secondSchemaCacheNum = avroSchemaCache.cacheSchema(testSchema2);
+    assertEquals(cachenum, secondSchemaCacheNum);
+    assertTrue(avroSchemaCache.getSchema(cachenum).isPresent());
+    assertEquals(testSchema1, avroSchemaCache.getSchema(cachenum).get());
+  }
+}
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index ef40b8624f9..f1586c55858 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -207,7 +207,7 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
     if (!recordOption.isPresent()) {
       return new HoodieEmptyRecord<>(new HoodieKey((String) 
metadataMap.get(INTERNAL_META_RECORD_KEY), (String) 
metadataMap.get(INTERNAL_META_PARTITION_PATH)), 
HoodieRecord.HoodieRecordType.HIVE);
     }
-    Schema schema = 
getSchemaHandler().decodeAvroSchema(metadataMap.get(INTERNAL_META_SCHEMA_ID));
+    Schema schema = getSchemaFromMetadata(metadataMap);
     ArrayWritable writable = recordOption.get();
     return new HoodieHiveRecord(new HoodieKey((String) 
metadataMap.get(INTERNAL_META_RECORD_KEY), (String) 
metadataMap.get(INTERNAL_META_PARTITION_PATH)), writable, schema, 
objectInspectorCache);
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 2f6f3530213..b9bd6fce50a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -23,7 +23,6 @@ import org.apache.hudi.common.config.HoodieReaderConfig
 import 
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.model.{FileSlice, HoodieRecord, 
WriteOperationType}
-import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.testutils.{HoodieTestUtils, RawTripTestPayload}
 import org.apache.hudi.storage.StorageConfiguration
 import org.apache.hudi.{SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
@@ -32,6 +31,7 @@ import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{Dataset, HoodieInternalRowUtils, 
HoodieUnsafeUtils, Row, SaveMode, SparkSession}
 import org.apache.spark.{HoodieSparkKryoRegistrar, SparkConf}
 import org.junit.jupiter.api.Assertions.assertEquals
@@ -85,8 +85,6 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
 
   override def getHoodieReaderContext(tablePath: String, avroSchema: Schema, 
storageConf: StorageConfiguration[_]): HoodieReaderContext[InternalRow] = {
     val reader = sparkAdapter.createParquetFileReader(vectorized = false, 
spark.sessionState.conf, Map.empty, 
storageConf.unwrapAs(classOf[Configuration]))
-    val metaClient = 
HoodieTableMetaClient.builder().setConf(storageConf).setBasePath(tablePath).build
-    val recordKeyField = metaClient.getTableConfig.getRecordKeyFields.get()(0)
     new SparkFileFormatInternalRowReaderContext(reader, Seq.empty, Seq.empty)
   }
 
@@ -125,4 +123,12 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
   }
 
   override def getCustomPayload: String = 
classOf[CustomPayloadForTesting].getName
+
+  override def assertRecordsEqual(schema: Schema, expected: InternalRow, 
actual: InternalRow): Unit = {
+    assertEquals(expected.numFields, actual.numFields)
+    val expectedStruct = 
sparkAdapter.getAvroSchemaConverters.toSqlType(schema)._1.asInstanceOf[StructType]
+    expected.toSeq(expectedStruct).zip(actual.toSeq(expectedStruct)).foreach( 
converted => {
+      assertEquals(converted._1, converted._2)
+    })
+  }
 }


Reply via email to