bvaradar commented on a change in pull request #4910:
URL: https://github.com/apache/hudi/pull/4910#discussion_r834267350



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkInternalSchemaConverter.java
##########
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.ArrayType$;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.BinaryType$;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.BooleanType$;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.CharType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.DateType$;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DecimalType$;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.DoubleType$;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.FloatType$;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.IntegerType$;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.LongType$;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.MapType$;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StringType$;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.sql.types.TimestampType$;
+import org.apache.spark.sql.types.UserDefinedType;
+import org.apache.spark.sql.types.VarcharType;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class SparkInternalSchemaConverter {
+  private SparkInternalSchemaConverter() {
+
+  }
+
+  public static final String HOODIE_QUERY_SCHEMA = 
"hoodie.schema.internal.querySchema";
+  public static final String HOODIE_TABLE_PATH = "hoodie.tablePath";
+  public static final String HOODIE_VALID_COMMITS_LIST = 
"hoodie.valid.commits.list";
+  /**

Review comment:
       nit: empty line before this

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
##########
@@ -109,6 +115,9 @@
   private final FileSystem fs;
   // Total log files read - for metrics
   private AtomicLong totalLogFiles = new AtomicLong(0);
+  // Internal schema
+  private InternalSchema internalSchema = 
InternalSchema.getEmptyInternalSchema();

Review comment:
       Looks like this is an redundant initialization as internalSchema is 
setup properly in constructor

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Util methods to support evolve old avro schema based on a given schema.
+ */
+public class AvroSchemaEvolutionUtils {
+  private AvroSchemaEvolutionUtils() {

Review comment:
       Can you remove the explicit private constructor. All methods are static. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -237,6 +251,30 @@ protected void commit(HoodieTable table, String 
commitActionType, String instant
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
     // Finalize write
     finalizeWrite(table, instantTime, stats);
+    // do save internal schema to support Implicitly add columns in write 
process
+    if (!metadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+        && metadata.getExtraMetadata().containsKey(SCHEMA_KEY) && 
table.getConfig().getSchemaEvolutionEnable()) {
+      TableSchemaResolver schemaUtil = new 
TableSchemaResolver(table.getMetaClient());
+      String historySchemaStr = 
schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
+      FileBasedInternalSchemaStorageManager schemasManager = new 
FileBasedInternalSchemaStorageManager(table.getMetaClient());
+      if (!historySchemaStr.isEmpty()) {
+        InternalSchema internalSchema = 
InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
+            SerDeHelper.parseSchemas(historySchemaStr));
+        Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new 
Schema.Parser().parse(config.getSchema()));
+        InternalSchema evolutionSchema = 
AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(avroSchema, 
internalSchema);
+        if (evolutionSchema.equals(internalSchema)) {
+          metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, 
SerDeHelper.toJson(evolutionSchema));
+          schemasManager.persistHistorySchemaStr(instantTime, 
historySchemaStr);

Review comment:
       @xiarixiaoyao : Agree, this is a valid concern with the current way for 
storing history schemas. Once, we move to storing schemas to the general 
metadata storage, this problem should not be there and we should stop copying 
the history schema file every commit. Can you add a TODO in this line 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import 
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class InternalSchemaCache {
+  // Use segment lock to reduce competition.
+  // the lock size should be powers of 2 for better hash.
+  private static Object[] lockList = new Object[16];
+
+  static {
+    for (int i = 0; i < lockList.length; i++) {
+      lockList[i] = new Object();
+    }
+  }
+
+  // historySchemas cache maintain a map about (tablePath, HistorySchemas).
+  // this is a Global cache, all threads in one container/executor share the 
same cache.
+  private static final Cache<String, TreeMap<Long, InternalSchema>>
+      HISTORICAL_SCHEMA_CACHE = 
Caffeine.newBuilder().maximumSize(1000).weakValues().build();
+
+  /**
+   * Search internalSchema based on versionID.
+   * first step: try to get internalSchema from hoodie commit files, we no 
need to add lock.
+   * if we cannot get internalSchema by first step, then we try to get 
internalSchema from cache.
+   *
+   * @param versionID schema version_id need to search
+   * @param metaClient current hoodie metaClient
+   * @return internalSchema
+   */
+  public static InternalSchema searchSchemaAndCache(long versionID, 
HoodieTableMetaClient metaClient, boolean cacheEnable) {
+    Option<InternalSchema> candidateSchema = 
getSchemaByReadingCommitFile(versionID, metaClient);
+    if (candidateSchema.isPresent()) {
+      return candidateSchema.get();
+    }
+    if (!cacheEnable) {
+      // parse history schema and return directly
+      return InternalSchemaUtils.searchSchema(versionID, 
getHistoricalSchemas(metaClient));
+    }
+    String tablePath = metaClient.getBasePath();
+    // use segment lock to reduce competition.
+    synchronized (lockList[tablePath.hashCode() & (lockList.length - 1)]) {
+      TreeMap<Long, InternalSchema> historicalSchemas = 
HISTORICAL_SCHEMA_CACHE.getIfPresent(tablePath);
+      if (historicalSchemas == null || 
InternalSchemaUtils.searchSchema(versionID, historicalSchemas) == null) {
+        historicalSchemas = getHistoricalSchemas(metaClient);
+        HISTORICAL_SCHEMA_CACHE.put(tablePath, historicalSchemas);
+      } else {
+        long maxVersionId = 
historicalSchemas.keySet().stream().max(Long::compareTo).get();
+        if (versionID > maxVersionId) {
+          historicalSchemas = getHistoricalSchemas(metaClient);
+          HISTORICAL_SCHEMA_CACHE.put(tablePath, historicalSchemas);
+        }
+      }
+      return InternalSchemaUtils.searchSchema(versionID, historicalSchemas);
+    }
+  }
+
+  private static TreeMap<Long, InternalSchema> 
getHistoricalSchemas(HoodieTableMetaClient metaClient) {
+    TreeMap<Long, InternalSchema> result = new TreeMap<>();
+    FileBasedInternalSchemaStorageManager schemasManager = new 
FileBasedInternalSchemaStorageManager(metaClient);
+    String historySchemaStr = schemasManager.getHistorySchemaStr();
+    if (!StringUtils.isNullOrEmpty(historySchemaStr)) {
+      result = SerDeHelper.parseSchemas(historySchemaStr);
+    }
+    return result;
+  }
+
+  private static Option<InternalSchema> getSchemaByReadingCommitFile(long 
versionID, HoodieTableMetaClient metaClient) {
+    try {
+      HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      List<HoodieInstant> instants = timeline.getInstants().filter(f -> 
f.getTimestamp().equals(String.valueOf(versionID))).collect(Collectors.toList());
+      if (instants.isEmpty()) {
+        return Option.empty();
+      }
+      byte[] data = timeline.getInstantDetails(instants.get(0)).get();
+      HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, 
HoodieCommitMetadata.class);
+      String latestInternalSchemaStr = 
metadata.getMetadata(SerDeHelper.LATEST_SCHEMA);
+      return SerDeHelper.fromJson(latestInternalSchemaStr);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to read schema from commit metadata", 
e);
+    }
+  }
+
+  /**
+   * Get internalSchema and avroSchema for compaction/cluster operation.
+   *
+   * @param metaClient current hoodie metaClient
+   * @param compactionAndClusteringInstant first instant before current 
compaction/cluster instant
+   * @return (internalSchemaStrOpt, avroSchemaStrOpt) a pair of 
InternalSchema/avroSchema
+   */
+  public static Pair<Option<String>, Option<String>> 
getInternalSchemaAndAvroSchemaForClusteringAndCompaction(HoodieTableMetaClient 
metaClient, String compactionAndClusteringInstant) {
+    // try to load internalSchema to support Schema Evolution
+    HoodieTimeline timelineBeforeCurrentCompaction = 
metaClient.getCommitsAndCompactionTimeline().findInstantsBefore(compactionAndClusteringInstant).filterCompletedInstants();
+    Option<HoodieInstant> lastInstantBeforeCurrentCompaction =  
timelineBeforeCurrentCompaction.lastInstant();
+    if (lastInstantBeforeCurrentCompaction.isPresent()) {
+      // try to find internalSchema
+      byte[] data = 
timelineBeforeCurrentCompaction.getInstantDetails(lastInstantBeforeCurrentCompaction.get()).get();
+      HoodieCommitMetadata metadata;
+      try {
+        metadata = HoodieCommitMetadata.fromBytes(data, 
HoodieCommitMetadata.class);
+      } catch (Exception e) {
+        throw new HoodieException(String.format("cannot read metadata from 
commit: %s", lastInstantBeforeCurrentCompaction.get()), e);
+      }
+      String internalSchemaStr = 
metadata.getMetadata(SerDeHelper.LATEST_SCHEMA);
+      if (internalSchemaStr != null) {
+        String existingSchemaStr = 
metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
+        return Pair.of(Option.of(internalSchemaStr), 
Option.of(existingSchemaStr));
+      }
+    }
+    return Pair.of(Option.empty(), Option.empty());
+  }
+
+  /**
+   * Give a schema versionId return its internalSchema.
+   * This method will be called by spark tasks, we should minimize time cost.
+   * We try our best to not use metaClient, since the initialization of 
metaClient is time cost
+   * step1:
+   * try to parser internalSchema from HoodieInstant directly
+   * step2:
+   * if we cannot parser internalSchema in step1,
+   * try to find internalSchema in historySchema.
+   *
+   * @param versionId the internalSchema version to be search.
+   * @param tablePath table path
+   * @param hadoopConf conf
+   * @param validCommits current validate commits, use to make up the commit 
file path/verify the validity of the history schema files
+   * @return a internalSchema.
+   */
+  public static InternalSchema getInternalSchemaByVersionId(long versionId, 
String tablePath, Configuration hadoopConf, String validCommits) {

Review comment:
       I don't see this method being used. Can we remove it ?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java
##########
@@ -78,12 +90,41 @@ public void runMerge(HoodieTable<T, 
HoodieData<HoodieRecord<T>>, HoodieData<Hood
 
     BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
     HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, 
mergeHandle.getOldFilePath());
+
+    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(table.getConfig().getInternalSchema());
+    boolean needToReWriteRecord = false;
+    // TODO support bootstrap

Review comment:
       Regarding Bootstrap support, what needs to be done in addition to 
support it? Is there a subsequent PR coming ? If we are not supporting it, can 
we explicitly add a check during config checking time to error out if we see a 
bootstrapped table with hoodie.schema.evolution.enable set to true.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Util methods to support evolve old avro schema based on a given schema.

Review comment:
       -> "Utility methods to support evolving old avro schema based on a given 
internal schema."

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
##########
@@ -147,17 +159,26 @@ private RecordIterator(Schema readerSchema, Schema 
writerSchema, byte[] content)
       int version = this.dis.readInt();
       HoodieAvroDataBlockVersion logBlockVersion = new 
HoodieAvroDataBlockVersion(version);
 
-      this.reader = new GenericDatumReader<>(writerSchema, readerSchema);
+      Schema finalReadSchema = readerSchema;
+      if (!internalSchema.isEmptySchema()) {

Review comment:
       Is there an unit/functional test that would cover this case ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import 
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class InternalSchemaCache {
+  // Use segment lock to reduce competition.
+  // the lock size should be powers of 2 for better hash.
+  private static Object[] lockList = new Object[16];
+
+  static {
+    for (int i = 0; i < lockList.length; i++) {
+      lockList[i] = new Object();
+    }
+  }
+
+  // historySchemas cache maintain a map about (tablePath, HistorySchemas).
+  // this is a Global cache, all threads in one container/executor share the 
same cache.
+  private static final Cache<String, TreeMap<Long, InternalSchema>>
+      HISTORICAL_SCHEMA_CACHE = 
Caffeine.newBuilder().maximumSize(1000).weakValues().build();

Review comment:
       Can you explain why weakValues are used instead of softValues ?

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -107,8 +113,21 @@ class DefaultSource extends RelationProvider
         case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
              (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
              (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
-          new BaseFileOnlyRelation(sqlContext, metaClient, parameters, 
userSchema, globPaths)
-
+          val internalSchema = new 
TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata
+          val sparkSchema = SchemaConverters.toSqlType(new 
TableSchemaResolver(metaClient).getTableAvroSchema).dataType.asInstanceOf[StructType]
+          val newParameters = parameters ++ 
Map(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA -> 
SerDeHelper.toJson(internalSchema.orElse(null)),
+            SparkInternalSchemaConverter.HOODIE_TABLE_PATH -> 
metaClient.getBasePath)
+          if (internalSchema.isPresent) {
+            // Use the HoodieFileIndex only if the 'path' is not globbed.
+            // Or else we use the original way to read hoodie table.

Review comment:
       Sounds good.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import 
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class InternalSchemaCache {
+  // Use segment lock to reduce competition.
+  // the lock size should be powers of 2 for better hash.
+  private static Object[] lockList = new Object[16];
+
+  static {
+    for (int i = 0; i < lockList.length; i++) {
+      lockList[i] = new Object();
+    }
+  }
+
+  // historySchemas cache maintain a map about (tablePath, HistorySchemas).
+  // this is a Global cache, all threads in one container/executor share the 
same cache.
+  private static final Cache<String, TreeMap<Long, InternalSchema>>
+      HISTORICAL_SCHEMA_CACHE = 
Caffeine.newBuilder().maximumSize(1000).weakValues().build();
+
+  /**
+   * Search internalSchema based on versionID.
+   * first step: try to get internalSchema from hoodie commit files, we no 
need to add lock.
+   * if we cannot get internalSchema by first step, then we try to get 
internalSchema from cache.
+   *
+   * @param versionID schema version_id need to search
+   * @param metaClient current hoodie metaClient
+   * @return internalSchema
+   */
+  public static InternalSchema searchSchemaAndCache(long versionID, 
HoodieTableMetaClient metaClient, boolean cacheEnable) {
+    Option<InternalSchema> candidateSchema = 
getSchemaByReadingCommitFile(versionID, metaClient);
+    if (candidateSchema.isPresent()) {
+      return candidateSchema.get();
+    }
+    if (!cacheEnable) {
+      // parse history schema and return directly
+      return InternalSchemaUtils.searchSchema(versionID, 
getHistoricalSchemas(metaClient));
+    }
+    String tablePath = metaClient.getBasePath();
+    // use segment lock to reduce competition.
+    synchronized (lockList[tablePath.hashCode() & (lockList.length - 1)]) {
+      TreeMap<Long, InternalSchema> historicalSchemas = 
HISTORICAL_SCHEMA_CACHE.getIfPresent(tablePath);
+      if (historicalSchemas == null || 
InternalSchemaUtils.searchSchema(versionID, historicalSchemas) == null) {
+        historicalSchemas = getHistoricalSchemas(metaClient);
+        HISTORICAL_SCHEMA_CACHE.put(tablePath, historicalSchemas);
+      } else {
+        long maxVersionId = 
historicalSchemas.keySet().stream().max(Long::compareTo).get();
+        if (versionID > maxVersionId) {
+          historicalSchemas = getHistoricalSchemas(metaClient);
+          HISTORICAL_SCHEMA_CACHE.put(tablePath, historicalSchemas);
+        }
+      }
+      return InternalSchemaUtils.searchSchema(versionID, historicalSchemas);
+    }
+  }
+
+  private static TreeMap<Long, InternalSchema> 
getHistoricalSchemas(HoodieTableMetaClient metaClient) {
+    TreeMap<Long, InternalSchema> result = new TreeMap<>();
+    FileBasedInternalSchemaStorageManager schemasManager = new 
FileBasedInternalSchemaStorageManager(metaClient);
+    String historySchemaStr = schemasManager.getHistorySchemaStr();
+    if (!StringUtils.isNullOrEmpty(historySchemaStr)) {
+      result = SerDeHelper.parseSchemas(historySchemaStr);
+    }
+    return result;
+  }
+
+  private static Option<InternalSchema> getSchemaByReadingCommitFile(long 
versionID, HoodieTableMetaClient metaClient) {
+    try {
+      HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      List<HoodieInstant> instants = timeline.getInstants().filter(f -> 
f.getTimestamp().equals(String.valueOf(versionID))).collect(Collectors.toList());
+      if (instants.isEmpty()) {
+        return Option.empty();
+      }
+      byte[] data = timeline.getInstantDetails(instants.get(0)).get();
+      HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, 
HoodieCommitMetadata.class);
+      String latestInternalSchemaStr = 
metadata.getMetadata(SerDeHelper.LATEST_SCHEMA);
+      return SerDeHelper.fromJson(latestInternalSchemaStr);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to read schema from commit metadata", 
e);
+    }
+  }
+
+  /**
+   * Get internalSchema and avroSchema for compaction/cluster operation.
+   *
+   * @param metaClient current hoodie metaClient
+   * @param compactionAndClusteringInstant first instant before current 
compaction/cluster instant
+   * @return (internalSchemaStrOpt, avroSchemaStrOpt) a pair of 
InternalSchema/avroSchema
+   */
+  public static Pair<Option<String>, Option<String>> 
getInternalSchemaAndAvroSchemaForClusteringAndCompaction(HoodieTableMetaClient 
metaClient, String compactionAndClusteringInstant) {
+    // try to load internalSchema to support Schema Evolution
+    HoodieTimeline timelineBeforeCurrentCompaction = 
metaClient.getCommitsAndCompactionTimeline().findInstantsBefore(compactionAndClusteringInstant).filterCompletedInstants();
+    Option<HoodieInstant> lastInstantBeforeCurrentCompaction =  
timelineBeforeCurrentCompaction.lastInstant();
+    if (lastInstantBeforeCurrentCompaction.isPresent()) {
+      // try to find internalSchema
+      byte[] data = 
timelineBeforeCurrentCompaction.getInstantDetails(lastInstantBeforeCurrentCompaction.get()).get();
+      HoodieCommitMetadata metadata;
+      try {
+        metadata = HoodieCommitMetadata.fromBytes(data, 
HoodieCommitMetadata.class);
+      } catch (Exception e) {
+        throw new HoodieException(String.format("cannot read metadata from 
commit: %s", lastInstantBeforeCurrentCompaction.get()), e);
+      }
+      String internalSchemaStr = 
metadata.getMetadata(SerDeHelper.LATEST_SCHEMA);
+      if (internalSchemaStr != null) {
+        String existingSchemaStr = 
metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
+        return Pair.of(Option.of(internalSchemaStr), 
Option.of(existingSchemaStr));
+      }
+    }
+    return Pair.of(Option.empty(), Option.empty());
+  }
+
+  /**
+   * Give a schema versionId return its internalSchema.
+   * This method will be called by spark tasks, we should minimize time cost.
+   * We try our best to not use metaClient, since the initialization of 
metaClient is time cost
+   * step1:
+   * try to parser internalSchema from HoodieInstant directly
+   * step2:
+   * if we cannot parser internalSchema in step1,
+   * try to find internalSchema in historySchema.
+   *
+   * @param versionId the internalSchema version to be search.
+   * @param tablePath table path
+   * @param hadoopConf conf
+   * @param validCommits current validate commits, use to make up the commit 
file path/verify the validity of the history schema files
+   * @return a internalSchema.
+   */
+  public static InternalSchema getInternalSchemaByVersionId(long versionId, 
String tablePath, Configuration hadoopConf, String validCommits) {

Review comment:
       This seems to be the only place where MetaClient is not passed in the 
constructor which could cause repeated .hoodie listing

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SerDeHelper.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SerDeHelper {
+  private SerDeHelper() {
+
+  }
+
+  public static final String LATEST_SCHEMA = "latest_Schema";

Review comment:
       latest_Schema should be latest_schema

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -177,6 +192,70 @@ class DefaultSource extends RelationProvider
 
   override def shortName(): String = "hudi_v1"
 
+  private def getBaseFileOnlyView(useHoodieFileIndex: Boolean,

Review comment:
       @xiarixiaoyao : Are we regressing to poor performance with this change ? 
@YannByron mentions that this code has been discarded.  

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.action;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * auxiliary class.
+ * help to merge file schema and query schema to produce final read schema for 
avro/parquet file
+ */
+public class InternalSchemaMerger {
+  private final InternalSchema fileSchema;
+  private final InternalSchema querySchema;
+  // now there exist some bugs when we use spark update/merge api,
+  // those operation will change col nullability from optional to required 
which is wrong.
+  // Before that bug is fixed, we need to do adapt.
+  // if mergeRequiredFiledForce is true, we will ignore the col's required 
attribute.
+  private final boolean ignoreRequiredAttribute;
+  // Whether to use column Type from file schema to read files when we find 
some column type has changed.

Review comment:
       Thanks for the explanation. Can you add the comment in the code itself.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/io/AbstractInternalSchemaStorageManager.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.io;
+
+import org.apache.hudi.common.util.Option;
+
+import java.util.List;
+
+abstract class AbstractInternalSchemaStorageManager {
+
+  /**
+   * Persist history schema str.
+   */
+  public abstract void persistHistorySchemaStr(String instantTime, String 
historySchemaStr);
+
+  /**
+   * Get latest history schema string.
+   */
+  public abstract String getHistorySchemaStr();
+
+  /**
+   * Get latest history schema string.
+   * Using give validCommits to validate all legal histroy Schema files, and 
return the latest one.
+   */

Review comment:
       Add the line: If the passed valid commits is null or empty, valid 
instants will be fetched from the file-system and used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to