This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 662f0822bd4 [HUDI-7838] Remove the option hoodie.schema.cache.enable
and always do the cache (#11444)
662f0822bd4 is described below
commit 662f0822bd480d6ec8ee5c5d565a6802decbebc3
Author: Vova Kolmakov <[email protected]>
AuthorDate: Mon Jun 17 15:13:44 2024 +0700
[HUDI-7838] Remove the option hoodie.schema.cache.enable and always do the
cache (#11444)
Co-authored-by: Vova Kolmakov <[email protected]>
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 19 -------------------
.../table/log/AbstractHoodieLogRecordReader.java | 10 ++++------
.../table/read/HoodieBaseFileGroupRecordBuffer.java | 3 +--
.../apache/hudi/common/util/InternalSchemaCache.java | 10 ++--------
.../apache/hudi/hadoop/SchemaEvolutionContext.java | 18 +++++++++---------
.../scala/org/apache/hudi/IncrementalRelation.scala | 2 +-
.../src/main/scala/org/apache/hudi/Iterators.scala | 2 --
7 files changed, 17 insertions(+), 47 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 11e43b304b3..ac728ec8de6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -241,12 +241,6 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Schema string representing the latest schema of the
table. Hudi passes this to "
+ "implementations of evolution of schema");
- public static final ConfigProperty<Boolean> ENABLE_INTERNAL_SCHEMA_CACHE =
ConfigProperty
- .key("hoodie.schema.cache.enable")
- .defaultValue(false)
- .markAdvanced()
- .withDocumentation("cache query internalSchemas in driver/executor
side");
-
public static final ConfigProperty<String> AVRO_SCHEMA_VALIDATE_ENABLE =
ConfigProperty
.key("hoodie.avro.schema.validate")
.defaultValue("false")
@@ -1260,18 +1254,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getString(INTERNAL_SCHEMA_STRING);
}
- public boolean getInternalSchemaCacheEnable() {
- return getBoolean(ENABLE_INTERNAL_SCHEMA_CACHE);
- }
-
public void setInternalSchemaString(String internalSchemaString) {
setValue(INTERNAL_SCHEMA_STRING, internalSchemaString);
}
- public void setInternalSchemaCacheEnable(boolean enable) {
- setValue(ENABLE_INTERNAL_SCHEMA_CACHE, String.valueOf(enable));
- }
-
public boolean getSchemaEvolutionEnable() {
return getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE);
}
@@ -2827,11 +2813,6 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
- public Builder withInternalSchemaCacheEnable(boolean enable) {
- writeConfig.setValue(ENABLE_INTERNAL_SCHEMA_CACHE,
String.valueOf(enable));
- return this;
- }
-
public Builder withAvroSchemaValidate(boolean enable) {
writeConfig.setValue(AVRO_SCHEMA_VALIDATE_ENABLE,
String.valueOf(enable));
return this;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index d61531ecea7..4c38d11467a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -458,7 +458,7 @@ public abstract class AbstractHoodieLogRecordReader {
case PARQUET_DATA_BLOCK:
case DELETE_BLOCK:
List<HoodieLogBlock> logBlocksList =
instantToBlocksMap.getOrDefault(instantTime, new ArrayList<>());
- if (logBlocksList.size() == 0) {
+ if (logBlocksList.isEmpty()) {
// Keep a track of instant Times in the order of arrival.
orderedInstantsList.add(instantTime);
}
@@ -473,8 +473,7 @@ public abstract class AbstractHoodieLogRecordReader {
// Rollback blocks contain information of instants that are
failed, collect them in a set..
if
(commandBlock.getType().equals(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK))
{
totalRollbacks.incrementAndGet();
- String targetInstantForCommandBlock =
- logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
+ String targetInstantForCommandBlock =
logBlock.getLogBlockHeader().get(TARGET_INSTANT_TIME);
targetRollbackInstants.add(targetInstantForCommandBlock);
orderedInstantsList.remove(targetInstantForCommandBlock);
instantToBlocksMap.remove(targetInstantForCommandBlock);
@@ -507,7 +506,7 @@ public abstract class AbstractHoodieLogRecordReader {
for (int i = orderedInstantsList.size() - 1; i >= 0; i--) {
String instantTime = orderedInstantsList.get(i);
List<HoodieLogBlock> instantsBlocks =
instantToBlocksMap.get(instantTime);
- if (instantsBlocks.size() == 0) {
+ if (instantsBlocks.isEmpty()) {
throw new HoodieException("Data corrupted while writing. Found zero
blocks for an instant " + instantTime);
}
HoodieLogBlock firstBlock = instantsBlocks.get(0);
@@ -809,8 +808,7 @@ public abstract class AbstractHoodieLogRecordReader {
}
long currentInstantTime =
Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
- InternalSchema fileSchema =
InternalSchemaCache.searchSchemaAndCache(currentInstantTime,
- hoodieTableMetaClient, false);
+ InternalSchema fileSchema =
InternalSchemaCache.searchSchemaAndCache(currentInstantTime,
hoodieTableMetaClient);
InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema,
internalSchema,
true, false).mergeSchema();
Schema mergedAvroSchema =
AvroInternalSchemaConverter.convert(mergedInternalSchema,
readerSchema.getFullName());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 9e0763d6116..88ec42673ac 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -353,8 +353,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
}
long currentInstantTime =
Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
- InternalSchema fileSchema =
InternalSchemaCache.searchSchemaAndCache(currentInstantTime,
- hoodieTableMetaClient, false);
+ InternalSchema fileSchema =
InternalSchemaCache.searchSchemaAndCache(currentInstantTime,
hoodieTableMetaClient);
Pair<InternalSchema, Map<String, String>> mergedInternalSchema = new
InternalSchemaMerger(fileSchema, internalSchema,
true, false, false).mergeSchemaGetRenamed();
Schema mergedAvroSchema =
AvroInternalSchemaConverter.convert(mergedInternalSchema.getLeft(),
readerSchema.getFullName());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
index 48763784fca..0865b7cdada 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
@@ -78,15 +78,11 @@ public class InternalSchemaCache {
* @param metaClient current hoodie metaClient
* @return internalSchema
*/
- public static InternalSchema searchSchemaAndCache(long versionID,
HoodieTableMetaClient metaClient, boolean cacheEnable) {
+ public static InternalSchema searchSchemaAndCache(long versionID,
HoodieTableMetaClient metaClient) {
Option<InternalSchema> candidateSchema =
getSchemaByReadingCommitFile(versionID, metaClient);
if (candidateSchema.isPresent()) {
return candidateSchema.get();
}
- if (!cacheEnable) {
- // parse history schema and return directly
- return InternalSchemaUtils.searchSchema(versionID,
getHistoricalSchemas(metaClient));
- }
String tablePath = metaClient.getBasePath().toString();
// use segment lock to reduce competition.
synchronized (lockList[tablePath.hashCode() & (lockList.length - 1)]) {
@@ -206,9 +202,7 @@ public class InternalSchemaCache {
}
} catch (Exception e1) {
// swallow this exception.
- LOG.warn(String.format(
- "Cannot find internal schema from commit file %s. Falling back to
parsing historical internal schema",
- candidateCommitFile.toString()));
+ LOG.warn("Cannot find internal schema from commit file {}. Falling
back to parsing historical internal schema", candidateCommitFile);
}
}
// step2:
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
index ab83d738faa..9c00fc4bcd5 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
@@ -110,7 +110,7 @@ public class SchemaEvolutionContext {
internalSchemaOption = Option.empty();
LOG.warn(String.format("failed to get internal Schema from hudi
tableļ¼%s", metaClient.getBasePath()), e);
}
- LOG.info(String.format("finish init schema evolution for split: %s",
split));
+ LOG.info("finish init schema evolution for split: {}", split);
}
private HoodieTableMetaClient setUpHoodieTableMetaClient() throws
IOException {
@@ -135,7 +135,7 @@ public class SchemaEvolutionContext {
*/
public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader
realtimeRecordReader) throws Exception {
if (!(split instanceof RealtimeSplit)) {
- LOG.warn(String.format("expect realtime split for mor table, but find
other type split %s", split));
+ LOG.warn("expect realtime split for mor table, but find other type split
{}", split);
return;
}
if (internalSchemaOption.isPresent()) {
@@ -159,8 +159,8 @@ public class SchemaEvolutionContext {
realtimeRecordReader.setHiveSchema(hiveSchema);
internalSchemaOption = Option.of(prunedInternalSchema);
RealtimeSplit realtimeSplit = (RealtimeSplit) split;
- LOG.info(String.format("About to read compacted logs %s for base split
%s, projecting cols %s",
- realtimeSplit.getDeltaLogPaths(), realtimeSplit.getPath(),
requiredColumns));
+ LOG.info("About to read compacted logs {} for base split {}, projecting
cols {}",
+ realtimeSplit.getDeltaLogPaths(), realtimeSplit.getPath(),
requiredColumns);
}
}
@@ -181,7 +181,7 @@ public class SchemaEvolutionContext {
prunedSchema =
InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(),
requiredColumns);
InternalSchema querySchema = prunedSchema;
long commitTime =
Long.parseLong(FSUtils.getCommitTime(finalPath.getName()));
- InternalSchema fileSchema =
InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false);
+ InternalSchema fileSchema =
InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient);
InternalSchema mergedInternalSchema = new
InternalSchemaMerger(fileSchema, querySchema, true,
true).mergeSchema();
List<Types.Field> fields = mergedInternalSchema.columns();
@@ -269,7 +269,7 @@ public class SchemaEvolutionContext {
case TIME:
throw new UnsupportedOperationException(String.format("cannot convert
%s type to hive", type));
default:
- LOG.error(String.format("cannot convert unknown type: %s to Hive",
type));
+ LOG.error("cannot convert unknown type: {} to Hive", type);
throw new UnsupportedOperationException(String.format("cannot convert
unknown type: %s to Hive", type));
}
}
@@ -309,8 +309,8 @@ public class SchemaEvolutionContext {
String filterText = filterExpr.getExprString();
String serializedFilterExpr =
SerializationUtilities.serializeExpression(filterExpr);
if (LOG.isDebugEnabled()) {
- LOG.debug("Pushdown initiated with filterText = " + filterText + ",
filterExpr = "
- + filterExpr + ", serializedFilterExpr = " + serializedFilterExpr);
+ LOG.debug("Pushdown initiated with filterText = {}, filterExpr = {},
serializedFilterExpr = {}",
+ filterText, filterExpr, serializedFilterExpr);
}
job.set(TableScanDesc.FILTER_TEXT_CONF_STR, filterText);
job.set(TableScanDesc.FILTER_EXPR_CONF_STR, serializedFilterExpr);
@@ -327,7 +327,7 @@ public class SchemaEvolutionContext {
}
StringBuilder readColumnNames = new StringBuilder();
List<String> tmpColNameList =
Arrays.asList(job.get(serdeConstants.LIST_COLUMNS).split(","));
- List<String> fullColNamelist = new ArrayList<String>(tmpColNameList);
+ List<String> fullColNamelist = new ArrayList<>(tmpColNameList);
for (int index = 0; index < fields.size(); index++) {
String colName = fields.get(index).name();
if (readColumnNames.length() > 0) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 196e38fb690..38f4889a513 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -116,7 +116,7 @@ class IncrementalRelation(val sqlContext: SQLContext,
val iSchema : InternalSchema = if
(!isSchemaEvolutionEnabledOnRead(optParams, sqlContext.sparkSession)) {
InternalSchema.getEmptyInternalSchema
} else if (useEndInstantSchema && !commitsToReturn.isEmpty) {
-
InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong,
metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable)
+
InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong,
metaClient)
} else {
schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null)
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index 52e40669999..6cba2868536 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -37,7 +37,6 @@ import
org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMeta
import org.apache.hudi.metadata.{HoodieBackedTableMetadata,
HoodieTableMetadata}
import org.apache.hudi.storage.{HoodieStorageUtils, StoragePath}
import org.apache.hudi.util.CachingIterator
-
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
@@ -49,7 +48,6 @@ import org.apache.spark.sql.catalyst.expressions.Projection
import org.apache.spark.sql.types.StructType
import java.io.Closeable
-
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable