This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 662f0822bd4 [HUDI-7838] Remove the option hoodie.schema.cache.enable 
and always do the cache (#11444)
662f0822bd4 is described below

commit 662f0822bd480d6ec8ee5c5d565a6802decbebc3
Author: Vova Kolmakov <[email protected]>
AuthorDate: Mon Jun 17 15:13:44 2024 +0700

    [HUDI-7838] Remove the option hoodie.schema.cache.enable and always do the 
cache (#11444)
    
    Co-authored-by: Vova Kolmakov <[email protected]>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java     | 19 -------------------
 .../table/log/AbstractHoodieLogRecordReader.java      | 10 ++++------
 .../table/read/HoodieBaseFileGroupRecordBuffer.java   |  3 +--
 .../apache/hudi/common/util/InternalSchemaCache.java  | 10 ++--------
 .../apache/hudi/hadoop/SchemaEvolutionContext.java    | 18 +++++++++---------
 .../scala/org/apache/hudi/IncrementalRelation.scala   |  2 +-
 .../src/main/scala/org/apache/hudi/Iterators.scala    |  2 --
 7 files changed, 17 insertions(+), 47 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 11e43b304b3..ac728ec8de6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -241,12 +241,6 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Schema string representing the latest schema of the 
table. Hudi passes this to "
           + "implementations of evolution of schema");
 
-  public static final ConfigProperty<Boolean> ENABLE_INTERNAL_SCHEMA_CACHE = 
ConfigProperty
-      .key("hoodie.schema.cache.enable")
-      .defaultValue(false)
-      .markAdvanced()
-      .withDocumentation("cache query internalSchemas in driver/executor 
side");
-
   public static final ConfigProperty<String> AVRO_SCHEMA_VALIDATE_ENABLE = 
ConfigProperty
       .key("hoodie.avro.schema.validate")
       .defaultValue("false")
@@ -1260,18 +1254,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(INTERNAL_SCHEMA_STRING);
   }
 
-  public boolean getInternalSchemaCacheEnable() {
-    return getBoolean(ENABLE_INTERNAL_SCHEMA_CACHE);
-  }
-
   public void setInternalSchemaString(String internalSchemaString) {
     setValue(INTERNAL_SCHEMA_STRING, internalSchemaString);
   }
 
-  public void setInternalSchemaCacheEnable(boolean enable) {
-    setValue(ENABLE_INTERNAL_SCHEMA_CACHE, String.valueOf(enable));
-  }
-
   public boolean getSchemaEvolutionEnable() {
     return getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE);
   }
@@ -2827,11 +2813,6 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
-    public Builder withInternalSchemaCacheEnable(boolean enable) {
-      writeConfig.setValue(ENABLE_INTERNAL_SCHEMA_CACHE, 
String.valueOf(enable));
-      return this;
-    }
-
     public Builder withAvroSchemaValidate(boolean enable) {
       writeConfig.setValue(AVRO_SCHEMA_VALIDATE_ENABLE, 
String.valueOf(enable));
       return this;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index d61531ecea7..4c38d11467a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -458,7 +458,7 @@ public abstract class AbstractHoodieLogRecordReader {
           case PARQUET_DATA_BLOCK:
           case DELETE_BLOCK:
             List<HoodieLogBlock> logBlocksList = 
instantToBlocksMap.getOrDefault(instantTime, new ArrayList<>());
-            if (logBlocksList.size() == 0) {
+            if (logBlocksList.isEmpty()) {
               // Keep a track of instant Times in the order of arrival.
               orderedInstantsList.add(instantTime);
             }
@@ -473,8 +473,7 @@ public abstract class AbstractHoodieLogRecordReader {
             // Rollback blocks contain information of instants that are 
failed, collect them in a set..
             if 
(commandBlock.getType().equals(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK))
 {
               totalRollbacks.incrementAndGet();
-              String targetInstantForCommandBlock =
-                  logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
+              String targetInstantForCommandBlock = 
logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
               targetRollbackInstants.add(targetInstantForCommandBlock);
               orderedInstantsList.remove(targetInstantForCommandBlock);
               instantToBlocksMap.remove(targetInstantForCommandBlock);
@@ -507,7 +506,7 @@ public abstract class AbstractHoodieLogRecordReader {
       for (int i = orderedInstantsList.size() - 1; i >= 0; i--) {
         String instantTime = orderedInstantsList.get(i);
         List<HoodieLogBlock> instantsBlocks = 
instantToBlocksMap.get(instantTime);
-        if (instantsBlocks.size() == 0) {
+        if (instantsBlocks.isEmpty()) {
           throw new HoodieException("Data corrupted while writing. Found zero 
blocks for an instant " + instantTime);
         }
         HoodieLogBlock firstBlock = instantsBlocks.get(0);
@@ -809,8 +808,7 @@ public abstract class AbstractHoodieLogRecordReader {
     }
 
     long currentInstantTime = 
Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
-    InternalSchema fileSchema = 
InternalSchemaCache.searchSchemaAndCache(currentInstantTime,
-        hoodieTableMetaClient, false);
+    InternalSchema fileSchema = 
InternalSchemaCache.searchSchemaAndCache(currentInstantTime, 
hoodieTableMetaClient);
     InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, 
internalSchema,
         true, false).mergeSchema();
     Schema mergedAvroSchema = 
AvroInternalSchemaConverter.convert(mergedInternalSchema, 
readerSchema.getFullName());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 9e0763d6116..88ec42673ac 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -353,8 +353,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     }
 
     long currentInstantTime = 
Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
-    InternalSchema fileSchema = 
InternalSchemaCache.searchSchemaAndCache(currentInstantTime,
-        hoodieTableMetaClient, false);
+    InternalSchema fileSchema = 
InternalSchemaCache.searchSchemaAndCache(currentInstantTime, 
hoodieTableMetaClient);
     Pair<InternalSchema, Map<String, String>> mergedInternalSchema = new 
InternalSchemaMerger(fileSchema, internalSchema,
         true, false, false).mergeSchemaGetRenamed();
     Schema mergedAvroSchema = 
AvroInternalSchemaConverter.convert(mergedInternalSchema.getLeft(), 
readerSchema.getFullName());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
index 48763784fca..0865b7cdada 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
@@ -78,15 +78,11 @@ public class InternalSchemaCache {
    * @param metaClient current hoodie metaClient
    * @return internalSchema
    */
-  public static InternalSchema searchSchemaAndCache(long versionID, 
HoodieTableMetaClient metaClient, boolean cacheEnable) {
+  public static InternalSchema searchSchemaAndCache(long versionID, 
HoodieTableMetaClient metaClient) {
     Option<InternalSchema> candidateSchema = 
getSchemaByReadingCommitFile(versionID, metaClient);
     if (candidateSchema.isPresent()) {
       return candidateSchema.get();
     }
-    if (!cacheEnable) {
-      // parse history schema and return directly
-      return InternalSchemaUtils.searchSchema(versionID, 
getHistoricalSchemas(metaClient));
-    }
     String tablePath = metaClient.getBasePath().toString();
     // use segment lock to reduce competition.
     synchronized (lockList[tablePath.hashCode() & (lockList.length - 1)]) {
@@ -206,9 +202,7 @@ public class InternalSchemaCache {
         }
       } catch (Exception e1) {
         // swallow this exception.
-        LOG.warn(String.format(
-            "Cannot find internal schema from commit file %s. Falling back to 
parsing historical internal schema",
-            candidateCommitFile.toString()));
+        LOG.warn("Cannot find internal schema from commit file {}. Falling 
back to parsing historical internal schema", candidateCommitFile);
       }
     }
     // step2:
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
index ab83d738faa..9c00fc4bcd5 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
@@ -110,7 +110,7 @@ public class SchemaEvolutionContext {
       internalSchemaOption = Option.empty();
       LOG.warn(String.format("failed to get internal Schema from hudi 
table:%s", metaClient.getBasePath()), e);
     }
-    LOG.info(String.format("finish init schema evolution for split: %s", 
split));
+    LOG.info("finish init schema evolution for split: {}", split);
   }
 
   private HoodieTableMetaClient setUpHoodieTableMetaClient() throws 
IOException {
@@ -135,7 +135,7 @@ public class SchemaEvolutionContext {
    */
   public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader 
realtimeRecordReader) throws Exception {
     if (!(split instanceof RealtimeSplit)) {
-      LOG.warn(String.format("expect realtime split for mor table, but find 
other type split %s", split));
+      LOG.warn("expect realtime split for mor table, but find other type split 
{}", split);
       return;
     }
     if (internalSchemaOption.isPresent()) {
@@ -159,8 +159,8 @@ public class SchemaEvolutionContext {
       realtimeRecordReader.setHiveSchema(hiveSchema);
       internalSchemaOption = Option.of(prunedInternalSchema);
       RealtimeSplit realtimeSplit = (RealtimeSplit) split;
-      LOG.info(String.format("About to read compacted logs %s for base split 
%s, projecting cols %s",
-          realtimeSplit.getDeltaLogPaths(), realtimeSplit.getPath(), 
requiredColumns));
+      LOG.info("About to read compacted logs {} for base split {}, projecting 
cols {}",
+          realtimeSplit.getDeltaLogPaths(), realtimeSplit.getPath(), 
requiredColumns);
     }
   }
 
@@ -181,7 +181,7 @@ public class SchemaEvolutionContext {
         prunedSchema = 
InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), 
requiredColumns);
         InternalSchema querySchema = prunedSchema;
         long commitTime = 
Long.parseLong(FSUtils.getCommitTime(finalPath.getName()));
-        InternalSchema fileSchema = 
InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false);
+        InternalSchema fileSchema = 
InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient);
         InternalSchema mergedInternalSchema = new 
InternalSchemaMerger(fileSchema, querySchema, true,
             true).mergeSchema();
         List<Types.Field> fields = mergedInternalSchema.columns();
@@ -269,7 +269,7 @@ public class SchemaEvolutionContext {
       case TIME:
         throw new UnsupportedOperationException(String.format("cannot convert 
%s type to hive", type));
       default:
-        LOG.error(String.format("cannot convert unknown type: %s to Hive", 
type));
+        LOG.error("cannot convert unknown type: {} to Hive", type);
         throw new UnsupportedOperationException(String.format("cannot convert 
unknown type: %s to Hive", type));
     }
   }
@@ -309,8 +309,8 @@ public class SchemaEvolutionContext {
       String filterText = filterExpr.getExprString();
       String serializedFilterExpr = 
SerializationUtilities.serializeExpression(filterExpr);
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Pushdown initiated with filterText = " + filterText + ", 
filterExpr = "
-            + filterExpr + ", serializedFilterExpr = " + serializedFilterExpr);
+        LOG.debug("Pushdown initiated with filterText = {}, filterExpr = {}, 
serializedFilterExpr = {}",
+            filterText, filterExpr, serializedFilterExpr);
       }
       job.set(TableScanDesc.FILTER_TEXT_CONF_STR, filterText);
       job.set(TableScanDesc.FILTER_EXPR_CONF_STR, serializedFilterExpr);
@@ -327,7 +327,7 @@ public class SchemaEvolutionContext {
     }
     StringBuilder readColumnNames = new StringBuilder();
     List<String> tmpColNameList = 
Arrays.asList(job.get(serdeConstants.LIST_COLUMNS).split(","));
-    List<String> fullColNamelist = new ArrayList<String>(tmpColNameList);
+    List<String> fullColNamelist = new ArrayList<>(tmpColNameList);
     for (int index = 0; index < fields.size(); index++) {
       String colName = fields.get(index).name();
       if (readColumnNames.length() > 0) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 196e38fb690..38f4889a513 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -116,7 +116,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
     val iSchema : InternalSchema = if 
(!isSchemaEvolutionEnabledOnRead(optParams, sqlContext.sparkSession)) {
       InternalSchema.getEmptyInternalSchema
     } else if (useEndInstantSchema && !commitsToReturn.isEmpty) {
-      
InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong,
 metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable)
+      
InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong,
 metaClient)
     } else {
       schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null)
     }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index 52e40669999..6cba2868536 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -37,7 +37,6 @@ import 
org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMeta
 import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieTableMetadata}
 import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
 import org.apache.hudi.util.CachingIterator
-
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.conf.Configuration
@@ -49,7 +48,6 @@ import org.apache.spark.sql.catalyst.expressions.Projection
 import org.apache.spark.sql.types.StructType
 
 import java.io.Closeable
-
 import scala.annotation.tailrec
 import scala.collection.JavaConverters._
 import scala.collection.mutable

Reply via email to