This is an automated email from the ASF dual-hosted git repository.

satish pushed a commit to branch release-0.12.2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 60973dcf56fff61172f76cf5e8bc6d95970ea698
Author: Alexander Trushev <[email protected]>
AuthorDate: Wed Nov 30 15:37:53 2022 +0700

    [HUDI-3981] Flink engine support for comprehensive schema evolution (#5830)
---
 .../org/apache/hudi/table/HoodieFlinkTable.java    |  13 +
 .../apache/hudi/configuration/OptionsResolver.java |   8 +
 .../org/apache/hudi/table/HoodieTableSource.java   |  16 +-
 .../java/org/apache/hudi/table/format/CastMap.java | 220 ++++++++++
 .../org/apache/hudi/table/format/FormatUtils.java  |   8 +-
 .../format/HoodieParquetEvolvedSplitReader.java    |  53 +++
 .../hudi/table/format/HoodieParquetReader.java     |  99 +++++
 .../table/format/HoodieParquetSplitReader.java     |  51 +++
 .../hudi/table/format/InternalSchemaManager.java   | 170 ++++++++
 .../table/format/cow/CopyOnWriteInputFormat.java   |  14 +-
 .../table/format/mor/MergeOnReadInputFormat.java   | 180 +++++---
 .../apache/hudi/util/RowDataCastProjection.java    |  49 +++
 .../org/apache/hudi/util/RowDataProjection.java    |   9 +-
 .../apache/hudi/table/ITTestSchemaEvolution.java   | 461 +++++++++++++++++++++
 .../org/apache/hudi/table/format/TestCastMap.java  | 120 ++++++
 .../org/apache/hudi/utils/TestConfigurations.java  |  23 +
 16 files changed, 1436 insertions(+), 58 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 4e7dbe36c43..40a13c14d5b 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -26,12 +26,15 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.FlinkHoodieIndexFactory;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
 import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -57,6 +60,9 @@ public abstract class HoodieFlinkTable<T extends 
HoodieRecordPayload>
             
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
             .setLayoutVersion(Option.of(new 
TimelineLayoutVersion(config.getTimelineLayoutVersion())))
             
.setFileSystemRetryConfig(config.getFileSystemRetryConfig()).build();
+    if (config.getSchemaEvolutionEnable()) {
+      setLatestInternalSchema(config, metaClient);
+    }
     return HoodieFlinkTable.create(config, context, metaClient);
   }
 
@@ -102,4 +108,11 @@ public abstract class HoodieFlinkTable<T extends 
HoodieRecordPayload>
       return Option.empty();
     }
   }
+
+  private static void setLatestInternalSchema(HoodieWriteConfig config, 
HoodieTableMetaClient metaClient) {
+    Option<InternalSchema> internalSchema = new 
TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata();
+    if (internalSchema.isPresent()) {
+      config.setInternalSchemaString(SerDeHelper.toJson(internalSchema.get()));
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 0dd31ee7538..dd272e17fb0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.configuration;
 
+import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.StringUtils;
@@ -202,6 +203,13 @@ public class OptionsResolver {
     return !conf.contains(FlinkOptions.READ_START_COMMIT) && 
!conf.contains(FlinkOptions.READ_END_COMMIT);
   }
 
+  /**
+   * Returns whether comprehensive schema evolution enabled.
+   */
+  public static boolean isSchemaEvolutionEnabled(Configuration conf) {
+    return conf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), 
false);
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 4ea14c413cc..57c8b235a81 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -43,6 +43,7 @@ import org.apache.hudi.source.IncrementalInputSplits;
 import org.apache.hudi.source.StreamReadMonitoringFunction;
 import org.apache.hudi.source.StreamReadOperator;
 import org.apache.hudi.table.format.FilePathUtils;
+import org.apache.hudi.table.format.InternalSchemaManager;
 import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
 import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
@@ -123,6 +124,7 @@ public class HoodieTableSource implements
   private final String defaultPartName;
   private final Configuration conf;
   private final FileIndex fileIndex;
+  private final InternalSchemaManager internalSchemaManager;
 
   private int[] requiredPos;
   private long limit;
@@ -135,7 +137,7 @@ public class HoodieTableSource implements
       List<String> partitionKeys,
       String defaultPartName,
       Configuration conf) {
-    this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, 
null, null);
+    this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, 
null, null, null);
   }
 
   public HoodieTableSource(
@@ -148,7 +150,8 @@ public class HoodieTableSource implements
       @Nullable List<Map<String, String>> requiredPartitions,
       @Nullable int[] requiredPos,
       @Nullable Long limit,
-      @Nullable HoodieTableMetaClient metaClient) {
+      @Nullable HoodieTableMetaClient metaClient,
+      @Nullable InternalSchemaManager internalSchemaManager) {
     this.schema = schema;
     this.tableRowType = (RowType) 
schema.toPhysicalRowDataType().notNull().getLogicalType();
     this.path = path;
@@ -166,6 +169,9 @@ public class HoodieTableSource implements
     this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
     this.metaClient = metaClient == null ? 
StreamerUtil.metaClientForReader(conf, hadoopConf) : metaClient;
     this.maxCompactionMemoryInBytes = 
StreamerUtil.getMaxCompactionMemoryInBytes(conf);
+    this.internalSchemaManager = internalSchemaManager == null
+        ? InternalSchemaManager.get(this.conf, this.metaClient)
+        : internalSchemaManager;
   }
 
   @Override
@@ -215,7 +221,7 @@ public class HoodieTableSource implements
   @Override
   public DynamicTableSource copy() {
     return new HoodieTableSource(schema, path, partitionKeys, defaultPartName,
-        conf, fileIndex, requiredPartitions, requiredPos, limit, metaClient);
+        conf, fileIndex, requiredPartitions, requiredPos, limit, metaClient, 
internalSchemaManager);
   }
 
   @Override
@@ -439,6 +445,7 @@ public class HoodieTableSource implements
         .defaultPartName(conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME))
         .limit(this.limit)
         .emitDelete(emitDelete)
+        .internalSchemaManager(internalSchemaManager)
         .build();
   }
 
@@ -462,7 +469,8 @@ public class HoodieTableSource implements
         this.conf.getString(FlinkOptions.PARTITION_DEFAULT_NAME),
         this.limit == NO_LIMIT_CONSTANT ? Long.MAX_VALUE : this.limit, // 
ParquetInputFormat always uses the limit value
         getParquetConf(this.conf, this.hadoopConf),
-        this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE)
+        this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
+        this.internalSchemaManager
     );
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java
new file mode 100644
index 00000000000..5f29e85adc2
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.util.RowDataCastProjection;
+import org.apache.hudi.util.RowDataProjection;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DATE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
+import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
+
+/**
+ * CastMap is responsible for conversion of flink types when full schema 
evolution enabled.
+ * Supported cast conversions:
+ * Integer => Long, Float, Double, Decimal, String
+ * Long => Float, Double, Decimal, String
+ * Float => Double, Decimal, String
+ * Double => Decimal, String
+ * Decimal => Decimal, String
+ * String => Decimal, Date
+ * Date => String
+ */
+public final class CastMap implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  // Maps position to corresponding cast
+  private final Map<Integer, Cast> castMap = new HashMap<>();
+
+  private DataType[] fileFieldTypes;
+
+  public Option<RowDataProjection> toRowDataProjection(int[] selectedFields) {
+    if (castMap.isEmpty()) {
+      return Option.empty();
+    }
+    LogicalType[] requiredType = new LogicalType[selectedFields.length];
+    for (int i = 0; i < selectedFields.length; i++) {
+      requiredType[i] = fileFieldTypes[selectedFields[i]].getLogicalType();
+    }
+    return Option.of(new RowDataCastProjection(requiredType, this));
+  }
+
+  public Object castIfNeeded(int pos, Object val) {
+    Cast cast = castMap.get(pos);
+    if (cast == null) {
+      return val;
+    }
+    return cast.convert(val);
+  }
+
+  public DataType[] getFileFieldTypes() {
+    return fileFieldTypes;
+  }
+
+  public void setFileFieldTypes(DataType[] fileFieldTypes) {
+    this.fileFieldTypes = fileFieldTypes;
+  }
+
+  @VisibleForTesting
+  void add(int pos, LogicalType fromType, LogicalType toType) {
+    Function<Object, Object> conversion = getConversion(fromType, toType);
+    if (conversion == null) {
+      throw new IllegalArgumentException(String.format("Cannot create cast %s 
=> %s at pos %s", fromType, toType, pos));
+    }
+    add(pos, new Cast(fromType, toType, conversion));
+  }
+
+  private @Nullable Function<Object, Object> getConversion(LogicalType 
fromType, LogicalType toType) {
+    LogicalTypeRoot from = fromType.getTypeRoot();
+    LogicalTypeRoot to = toType.getTypeRoot();
+    switch (to) {
+      case BIGINT: {
+        if (from == INTEGER) {
+          return val -> ((Number) val).longValue();
+        }
+        break;
+      }
+      case FLOAT: {
+        if (from == INTEGER || from == BIGINT) {
+          return val -> ((Number) val).floatValue();
+        }
+        break;
+      }
+      case DOUBLE: {
+        if (from == INTEGER || from == BIGINT) {
+          return val -> ((Number) val).doubleValue();
+        }
+        if (from == FLOAT) {
+          return val -> Double.parseDouble(val.toString());
+        }
+        break;
+      }
+      case DECIMAL: {
+        if (from == INTEGER || from == BIGINT || from == DOUBLE) {
+          return val -> toDecimalData((Number) val, toType);
+        }
+        if (from == FLOAT) {
+          return val -> toDecimalData(Double.parseDouble(val.toString()), 
toType);
+        }
+        if (from == VARCHAR) {
+          return val -> toDecimalData(Double.parseDouble(val.toString()), 
toType);
+        }
+        if (from == DECIMAL) {
+          return val -> toDecimalData(((DecimalData) val).toBigDecimal(), 
toType);
+        }
+        break;
+      }
+      case VARCHAR: {
+        if (from == INTEGER
+            || from == BIGINT
+            || from == FLOAT
+            || from == DOUBLE
+            || from == DECIMAL) {
+          return val -> new BinaryStringData(String.valueOf(val));
+        }
+        if (from == DATE) {
+          return val -> new BinaryStringData(LocalDate.ofEpochDay(((Integer) 
val).longValue()).toString());
+        }
+        break;
+      }
+      case DATE: {
+        if (from == VARCHAR) {
+          return val -> (int) LocalDate.parse(val.toString()).toEpochDay();
+        }
+        break;
+      }
+      default:
+    }
+    return null;
+  }
+
+  private void add(int pos, Cast cast) {
+    castMap.put(pos, cast);
+  }
+
+  private DecimalData toDecimalData(Number val, LogicalType decimalType) {
+    BigDecimal valAsDecimal = BigDecimal.valueOf(val.doubleValue());
+    return toDecimalData(valAsDecimal, decimalType);
+  }
+
+  private DecimalData toDecimalData(BigDecimal valAsDecimal, LogicalType 
decimalType) {
+    return DecimalData.fromBigDecimal(
+        valAsDecimal,
+        ((DecimalType) decimalType).getPrecision(),
+        ((DecimalType) decimalType).getScale());
+  }
+
+  /**
+   * Fields {@link Cast#from} and {@link Cast#to} are redundant due to {@link 
Cast#convert(Object)} determines conversion.
+   * However, it is convenient to debug {@link CastMap} when {@link 
Cast#toString()} prints types.
+   */
+  private static final class Cast implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final LogicalType from;
+    private final LogicalType to;
+    private final Function<Object, Object> conversion;
+
+    Cast(LogicalType from, LogicalType to, Function<Object, Object> 
conversion) {
+      this.from = from;
+      this.to = to;
+      this.conversion = conversion;
+    }
+
+    Object convert(Object val) {
+      return conversion.apply(val);
+    }
+
+    @Override
+    public String toString() {
+      return from + " => " + to;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return castMap.entrySet().stream()
+        .map(e -> e.getKey() + ": " + e.getValue())
+        .collect(Collectors.joining(", ", "{", "}"));
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 6357b898d49..672acf2b430 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -32,6 +32,7 @@ import 
org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.StreamerUtil;
@@ -122,6 +123,7 @@ public class FormatUtils {
   public static HoodieMergedLogRecordScanner logScanner(
       MergeOnReadInputSplit split,
       Schema logSchema,
+      InternalSchema internalSchema,
       org.apache.flink.configuration.Configuration flinkConf,
       Configuration hadoopConf) {
     HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(flinkConf);
@@ -131,6 +133,7 @@ public class FormatUtils {
         .withBasePath(split.getTablePath())
         .withLogFilePaths(split.getLogPaths().get())
         .withReaderSchema(logSchema)
+        .withInternalSchema(internalSchema)
         .withLatestInstantTime(split.getLatestCommit())
         .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled())
         .withReverseReader(false)
@@ -147,6 +150,7 @@ public class FormatUtils {
   private static HoodieUnMergedLogRecordScanner unMergedLogScanner(
       MergeOnReadInputSplit split,
       Schema logSchema,
+      InternalSchema internalSchema,
       org.apache.flink.configuration.Configuration flinkConf,
       Configuration hadoopConf,
       HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) {
@@ -156,6 +160,7 @@ public class FormatUtils {
         .withBasePath(split.getTablePath())
         .withLogFilePaths(split.getLogPaths().get())
         .withReaderSchema(logSchema)
+        .withInternalSchema(internalSchema)
         .withLatestInstantTime(split.getLatestCommit())
         .withReadBlocksLazily(
             string2Boolean(
@@ -186,6 +191,7 @@ public class FormatUtils {
     public BoundedMemoryRecords(
         MergeOnReadInputSplit split,
         Schema logSchema,
+        InternalSchema internalSchema,
         Configuration hadoopConf,
         org.apache.flink.configuration.Configuration flinkConf) {
       this.executor = new BoundedInMemoryExecutor<>(
@@ -197,7 +203,7 @@ public class FormatUtils {
           Functions.noop());
       // Consumer of this record reader
       this.iterator = this.executor.getQueue().iterator();
-      this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, 
flinkConf, hadoopConf,
+      this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, 
internalSchema, flinkConf, hadoopConf,
           record -> executor.getQueue().insertRecord(record));
       // Start reading and buffering
       this.executor.startProducers();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetEvolvedSplitReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetEvolvedSplitReader.java
new file mode 100644
index 00000000000..037a3776359
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetEvolvedSplitReader.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format;
+
+import org.apache.hudi.util.RowDataProjection;
+
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+
+/**
+ * Decorates origin hoodie parquet reader with cast projection.
+ */
+public final class HoodieParquetEvolvedSplitReader implements 
HoodieParquetReader {
+  private final HoodieParquetReader originReader;
+  private final RowDataProjection castProjection;
+
+  public HoodieParquetEvolvedSplitReader(HoodieParquetReader originReader, 
RowDataProjection castProjection) {
+    this.originReader = originReader;
+    this.castProjection = castProjection;
+  }
+
+  @Override
+  public boolean reachedEnd() throws IOException {
+    return originReader.reachedEnd();
+  }
+
+  @Override
+  public RowData nextRecord() {
+    return castProjection.project(originReader.nextRecord());
+  }
+
+  @Override
+  public void close() throws IOException {
+    originReader.close();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetReader.java
new file mode 100644
index 00000000000..e762f03e983
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetReader.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
+import org.apache.hudi.util.RowDataProjection;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Base interface for hoodie parquet readers.
+ */
+public interface HoodieParquetReader extends Closeable {
+
+  boolean reachedEnd() throws IOException;
+
+  RowData nextRecord();
+
+  static HoodieParquetReader getReader(
+      InternalSchemaManager internalSchemaManager,
+      boolean utcTimestamp,
+      boolean caseSensitive,
+      Configuration conf,
+      String[] fieldNames,
+      DataType[] fieldTypes,
+      Map<String, Object> partitionSpec,
+      int[] selectedFields,
+      int batchSize,
+      Path path,
+      long splitStart,
+      long splitLength) throws IOException {
+    Option<RowDataProjection> castProjection;
+    InternalSchema fileSchema = 
internalSchemaManager.getFileSchema(path.getName());
+    if (fileSchema.isEmptySchema()) {
+      return new HoodieParquetSplitReader(
+          ParquetSplitReaderUtil.genPartColumnarRowReader(
+              utcTimestamp,
+              caseSensitive,
+              conf,
+              fieldNames,
+              fieldTypes,
+              partitionSpec,
+              selectedFields,
+              batchSize,
+              path,
+              splitStart,
+              splitLength));
+    } else {
+      CastMap castMap = internalSchemaManager.getCastMap(fileSchema, 
fieldNames, fieldTypes, selectedFields);
+      castProjection = castMap.toRowDataProjection(selectedFields);
+      fieldNames = internalSchemaManager.getFileFieldNames(fileSchema, 
fieldNames);
+      fieldTypes = castMap.getFileFieldTypes();
+    }
+    HoodieParquetReader reader = new HoodieParquetSplitReader(
+        ParquetSplitReaderUtil.genPartColumnarRowReader(
+          utcTimestamp,
+          caseSensitive,
+          conf,
+          fieldNames,
+          fieldTypes,
+          partitionSpec,
+          selectedFields,
+          batchSize,
+          path,
+          splitStart,
+          splitLength));
+    if (castProjection.isPresent()) {
+      return new HoodieParquetEvolvedSplitReader(reader, castProjection.get());
+    } else {
+      return reader;
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetSplitReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetSplitReader.java
new file mode 100644
index 00000000000..d13c6c7c21a
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetSplitReader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format;
+
+import 
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
+
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+
+/**
+ * Hoodie wrapper for flink parquet reader.
+ */
+public final class HoodieParquetSplitReader implements HoodieParquetReader {
+  private final ParquetColumnarRowSplitReader reader;
+
+  public HoodieParquetSplitReader(ParquetColumnarRowSplitReader reader) {
+    this.reader = reader;
+  }
+
+  @Override
+  public boolean reachedEnd() throws IOException {
+    return reader.reachedEnd();
+  }
+
+  @Override
+  public RowData nextRecord() {
+    return reader.nextRecord();
+  }
+
+  @Override
+  public void close() throws IOException {
+    reader.close();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
new file mode 100644
index 00000000000..abd405469d8
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.InternalSchemaCache;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.configuration.HadoopConfigurations;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.util.AvroSchemaConverter;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * This class is responsible for calculating names and types of fields that 
are actual at a certain point in time.
+ * If field is renamed in queried schema, its old name will be returned, which 
is relevant at the provided time.
+ * If type of field is changed, its old type will be returned, and projection 
will be created that will convert the old type to the queried one.
+ */
+public class InternalSchemaManager implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  public static final InternalSchemaManager DISABLED = new 
InternalSchemaManager(null, InternalSchema.getEmptyInternalSchema(), null, 
null);
+
+  private final Configuration conf;
+  private final InternalSchema querySchema;
+  private final String validCommits;
+  private final String tablePath;
+  private transient org.apache.hadoop.conf.Configuration hadoopConf;
+
+  public static InternalSchemaManager get(Configuration conf, 
HoodieTableMetaClient metaClient) {
+    if (!OptionsResolver.isSchemaEvolutionEnabled(conf)) {
+      return DISABLED;
+    }
+    Option<InternalSchema> internalSchema = new 
TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata();
+    if (!internalSchema.isPresent() || internalSchema.get().isEmptySchema()) {
+      return DISABLED;
+    }
+    String validCommits = metaClient
+        .getCommitsAndCompactionTimeline()
+        .filterCompletedInstants()
+        .getInstantsAsStream()
+        .map(HoodieInstant::getFileName)
+        .collect(Collectors.joining(","));
+    return new InternalSchemaManager(conf, internalSchema.get(), validCommits, 
metaClient.getBasePathV2().toString());
+  }
+
+  public InternalSchemaManager(Configuration conf, InternalSchema querySchema, 
String validCommits, String tablePath) {
+    this.conf = conf;
+    this.querySchema = querySchema;
+    this.validCommits = validCommits;
+    this.tablePath = tablePath;
+  }
+
+  public InternalSchema getQuerySchema() {
+    return querySchema;
+  }
+
+  InternalSchema getFileSchema(String fileName) {
+    if (querySchema.isEmptySchema()) {
+      return querySchema;
+    }
+    long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(fileName));
+    InternalSchema fileSchemaUnmerged = 
InternalSchemaCache.getInternalSchemaByVersionId(
+        commitInstantTime, tablePath, getHadoopConf(), validCommits);
+    if (querySchema.equals(fileSchemaUnmerged)) {
+      return InternalSchema.getEmptyInternalSchema();
+    }
+    return new InternalSchemaMerger(fileSchemaUnmerged, querySchema, true, 
true).mergeSchema();
+  }
+
+  CastMap getCastMap(InternalSchema fileSchema, String[] queryFieldNames, 
DataType[] queryFieldTypes, int[] selectedFields) {
+    Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema 
cannot be empty");
+    Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema 
cannot be empty");
+
+    CastMap castMap = new CastMap();
+    Map<Integer, Integer> posProxy = getPosProxy(fileSchema, queryFieldNames);
+    if (posProxy.isEmpty()) {
+      castMap.setFileFieldTypes(queryFieldTypes);
+      return castMap;
+    }
+    List<Integer> selectedFieldList = 
IntStream.of(selectedFields).boxed().collect(Collectors.toList());
+    List<DataType> fileSchemaAsDataTypes = 
AvroSchemaConverter.convertToDataType(
+        AvroInternalSchemaConverter.convert(fileSchema, 
"tableName")).getChildren();
+    DataType[] fileFieldTypes = new DataType[queryFieldTypes.length];
+    for (int i = 0; i < queryFieldTypes.length; i++) {
+      Integer posOfChangedType = posProxy.get(i);
+      if (posOfChangedType == null) {
+        fileFieldTypes[i] = queryFieldTypes[i];
+      } else {
+        DataType fileType = fileSchemaAsDataTypes.get(posOfChangedType);
+        fileFieldTypes[i] = fileType;
+        int selectedPos = selectedFieldList.indexOf(i);
+        if (selectedPos != -1) {
+          castMap.add(selectedPos, fileType.getLogicalType(), 
queryFieldTypes[i].getLogicalType());
+        }
+      }
+    }
+    castMap.setFileFieldTypes(fileFieldTypes);
+    return castMap;
+  }
+
+  String[] getFileFieldNames(InternalSchema fileSchema, String[] 
queryFieldNames) {
+    Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema 
cannot be empty");
+    Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema 
cannot be empty");
+
+    Map<String, String> renamedCols = 
InternalSchemaUtils.collectRenameCols(fileSchema, querySchema);
+    if (renamedCols.isEmpty()) {
+      return queryFieldNames;
+    }
+    return Arrays.stream(queryFieldNames).map(name -> 
renamedCols.getOrDefault(name, name)).toArray(String[]::new);
+  }
+
+  private Map<Integer, Integer> getPosProxy(InternalSchema fileSchema, 
String[] queryFieldNames) {
+    Map<Integer, Pair<Type, Type>> changedCols = 
InternalSchemaUtils.collectTypeChangedCols(querySchema, fileSchema);
+    HashMap<Integer, Integer> posProxy = new HashMap<>(changedCols.size());
+    List<String> fieldNameList = Arrays.asList(queryFieldNames);
+    List<Types.Field> columns = querySchema.columns();
+    changedCols.forEach((posInSchema, typePair) -> {
+      String name = columns.get(posInSchema).name();
+      int posInType = fieldNameList.indexOf(name);
+      posProxy.put(posInType, posInSchema);
+    });
+    return Collections.unmodifiableMap(posProxy);
+  }
+
+  private org.apache.hadoop.conf.Configuration getHadoopConf() {
+    if (hadoopConf == null) {
+      hadoopConf = HadoopConfigurations.getHadoopConf(conf);
+    }
+    return hadoopConf;
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
index c5ea3d4ab98..453d0fee232 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
@@ -20,7 +20,8 @@ package org.apache.hudi.table.format.cow;
 
 import java.util.Comparator;
 import org.apache.hudi.common.fs.FSUtils;
-import 
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
+import org.apache.hudi.table.format.HoodieParquetReader;
+import org.apache.hudi.table.format.InternalSchemaManager;
 import org.apache.hudi.util.DataTypeUtils;
 
 import org.apache.flink.api.common.io.FileInputFormat;
@@ -74,7 +75,7 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
   private final SerializableConfiguration conf;
   private final long limit;
 
-  private transient ParquetColumnarRowSplitReader reader;
+  private transient HoodieParquetReader reader;
   private transient long currentReadCount;
 
   /**
@@ -82,6 +83,8 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
    */
   private FilePathFilter localFilesFilter = new GlobFilePathFilter();
 
+  private final InternalSchemaManager internalSchemaManager;
+
   public CopyOnWriteInputFormat(
       Path[] paths,
       String[] fullFieldNames,
@@ -90,7 +93,8 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
       String partDefaultName,
       long limit,
       Configuration conf,
-      boolean utcTimestamp) {
+      boolean utcTimestamp,
+      InternalSchemaManager internalSchemaManager) {
     super.setFilePaths(paths);
     this.limit = limit;
     this.partDefaultName = partDefaultName;
@@ -99,6 +103,7 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
     this.selectedFields = selectedFields;
     this.conf = new SerializableConfiguration(conf);
     this.utcTimestamp = utcTimestamp;
+    this.internalSchemaManager = internalSchemaManager;
   }
 
   @Override
@@ -123,7 +128,8 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
       }
     });
 
-    this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
+    this.reader = HoodieParquetReader.getReader(
+        internalSchemaManager,
         utcTimestamp,
         true,
         conf.conf(),
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index c9b6561bdef..3103e27e857 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -29,11 +29,12 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.table.format.FilePathUtils;
 import org.apache.hudi.table.format.FormatUtils;
-import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
-import 
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
+import org.apache.hudi.table.format.HoodieParquetReader;
+import org.apache.hudi.table.format.InternalSchemaManager;
 import org.apache.hudi.util.AvroToRowDataConverters;
 import org.apache.hudi.util.DataTypeUtils;
 import org.apache.hudi.util.RowDataProjection;
@@ -66,6 +67,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.IntStream;
 
 import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS;
@@ -137,13 +139,16 @@ public class MergeOnReadInputFormat
    */
   private boolean closed = true;
 
-  private MergeOnReadInputFormat(
+  private final InternalSchemaManager internalSchemaManager;
+
+  protected MergeOnReadInputFormat(
       Configuration conf,
       MergeOnReadTableState tableState,
       List<DataType> fieldTypes,
       String defaultPartName,
       long limit,
-      boolean emitDelete) {
+      boolean emitDelete,
+      InternalSchemaManager internalSchemaManager) {
     this.conf = conf;
     this.tableState = tableState;
     this.fieldNames = tableState.getRowType().getFieldNames();
@@ -154,6 +159,7 @@ public class MergeOnReadInputFormat
     this.requiredPos = tableState.getRequiredPositions();
     this.limit = limit;
     this.emitDelete = emitDelete;
+    this.internalSchemaManager = internalSchemaManager;
   }
 
   /**
@@ -199,6 +205,7 @@ public class MergeOnReadInputFormat
           this.tableState.getRequiredRowType(),
           new Schema.Parser().parse(this.tableState.getAvroSchema()),
           new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
+          internalSchemaManager.getQuerySchema(),
           this.requiredPos,
           this.emitDelete,
           this.tableState.getOperationPos(),
@@ -284,15 +291,19 @@ public class MergeOnReadInputFormat
     }
   }
 
-  private ParquetColumnarRowSplitReader getFullSchemaReader(String path) 
throws IOException {
-    return getReader(path, IntStream.range(0, 
this.tableState.getRowType().getFieldCount()).toArray());
+  protected HoodieParquetReader getFullSchemaReader(String path) {
+    try {
+      return getReader(path, IntStream.range(0, 
this.tableState.getRowType().getFieldCount()).toArray());
+    } catch (IOException e) {
+      throw new HoodieException("Get reader error for path: " + path);
+    }
   }
 
-  private ParquetColumnarRowSplitReader getRequiredSchemaReader(String path) 
throws IOException {
+  protected HoodieParquetReader getRequiredSchemaReader(String path) throws 
IOException {
     return getReader(path, this.requiredPos);
   }
 
-  private ParquetColumnarRowSplitReader getReader(String path, int[] 
requiredPos) throws IOException {
+  private HoodieParquetReader getReader(String path, int[] requiredPos) throws 
IOException {
     // generate partition specs.
     LinkedHashMap<String, String> partSpec = 
FilePathUtils.extractPartitionKeyValues(
         new org.apache.hadoop.fs.Path(path).getParent(),
@@ -314,7 +325,8 @@ public class MergeOnReadInputFormat
       }
     });
 
-    return ParquetSplitReaderUtil.genPartColumnarRowReader(
+    return HoodieParquetReader.getReader(
+        internalSchemaManager,
         this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
         true,
         HadoopConfigurations.getParquetConf(this.conf, hadoopConf),
@@ -334,7 +346,7 @@ public class MergeOnReadInputFormat
     final GenericRecordBuilder recordBuilder = new 
GenericRecordBuilder(requiredSchema);
     final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter =
         
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
-    final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, 
tableSchema, conf, hadoopConf);
+    final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, 
tableSchema, internalSchemaManager.getQuerySchema(), conf, hadoopConf);
     final Iterator<String> logRecordsKeyIterator = 
scanner.getRecords().keySet().iterator();
     final int[] pkOffset = tableState.getPkOffsetsInRequired();
     // flag saying whether the pk semantics has been dropped by user specified
@@ -414,7 +426,7 @@ public class MergeOnReadInputFormat
     final GenericRecordBuilder recordBuilder = new 
GenericRecordBuilder(requiredSchema);
     final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter =
         
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
-    final FormatUtils.BoundedMemoryRecords records = new 
FormatUtils.BoundedMemoryRecords(split, tableSchema, hadoopConf, conf);
+    final FormatUtils.BoundedMemoryRecords records = new 
FormatUtils.BoundedMemoryRecords(split, tableSchema, 
internalSchemaManager.getQuerySchema(), hadoopConf, conf);
     final Iterator<HoodieRecord<?>> recordsIterator = 
records.getRecordsIterator();
 
     return new ClosableIterator<RowData>() {
@@ -457,6 +469,59 @@ public class MergeOnReadInputFormat
     };
   }
 
+  protected ClosableIterator<RowData> 
getFullLogFileIterator(MergeOnReadInputSplit split) {
+    final Schema tableSchema = new 
Schema.Parser().parse(tableState.getAvroSchema());
+    final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter =
+        AvroToRowDataConverters.createRowConverter(tableState.getRowType());
+    final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, 
tableSchema, InternalSchema.getEmptyInternalSchema(), conf, hadoopConf);
+    final Iterator<String> logRecordsKeyIterator = 
scanner.getRecords().keySet().iterator();
+
+    return new ClosableIterator<RowData>() {
+      private RowData currentRecord;
+
+      @Override
+      public boolean hasNext() {
+        while (logRecordsKeyIterator.hasNext()) {
+          String curAvroKey = logRecordsKeyIterator.next();
+          Option<IndexedRecord> curAvroRecord = null;
+          final HoodieAvroRecord<?> hoodieRecord = (HoodieAvroRecord) 
scanner.getRecords().get(curAvroKey);
+          try {
+            curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
+          } catch (IOException e) {
+            throw new HoodieException("Get avro insert value error for key: " 
+ curAvroKey, e);
+          }
+          if (curAvroRecord.isPresent()) {
+            final IndexedRecord avroRecord = curAvroRecord.get();
+            final RowKind rowKind = FormatUtils.getRowKindSafely(avroRecord, 
tableState.getOperationPos());
+            if (rowKind == RowKind.DELETE) {
+              // skip the delete record
+              continue;
+            }
+            currentRecord = (RowData) 
avroToRowDataConverter.convert(avroRecord);
+            currentRecord.setRowKind(rowKind);
+            return true;
+          }
+          // else:
+          // delete record found
+          // skipping if the condition is unsatisfied
+          // continue;
+
+        }
+        return false;
+      }
+
+      @Override
+      public RowData next() {
+        return currentRecord;
+      }
+
+      @Override
+      public void close() {
+        scanner.close();
+      }
+    };
+  }
+
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------
@@ -470,9 +535,9 @@ public class MergeOnReadInputFormat
 
   static class BaseFileOnlyIterator implements RecordIterator {
     // base file reader
-    private final ParquetColumnarRowSplitReader reader;
+    private final HoodieParquetReader reader;
 
-    BaseFileOnlyIterator(ParquetColumnarRowSplitReader reader) {
+    public BaseFileOnlyIterator(HoodieParquetReader reader) {
       this.reader = reader;
     }
 
@@ -499,7 +564,7 @@ public class MergeOnReadInputFormat
    */
   static class BaseFileOnlyFilteringIterator implements RecordIterator {
     // base file reader
-    private final ParquetColumnarRowSplitReader reader;
+    private final HoodieParquetReader reader;
     private final InstantRange instantRange;
     private final RowDataProjection projection;
 
@@ -508,7 +573,7 @@ public class MergeOnReadInputFormat
     BaseFileOnlyFilteringIterator(
         Option<InstantRange> instantRange,
         RowType requiredRowType,
-        ParquetColumnarRowSplitReader reader) {
+        HoodieParquetReader reader) {
       this.reader = reader;
       this.instantRange = instantRange.orElse(null);
       int[] positions = IntStream.range(1, 1 + 
requiredRowType.getFieldCount()).toArray();
@@ -573,7 +638,7 @@ public class MergeOnReadInputFormat
 
   static class SkipMergeIterator implements RecordIterator {
     // base file reader
-    private final ParquetColumnarRowSplitReader reader;
+    private final HoodieParquetReader reader;
     // iterator for log files
     private final ClosableIterator<RowData> iterator;
 
@@ -584,7 +649,7 @@ public class MergeOnReadInputFormat
 
     private RowData currentRecord;
 
-    SkipMergeIterator(ParquetColumnarRowSplitReader reader, 
ClosableIterator<RowData> iterator) {
+    SkipMergeIterator(HoodieParquetReader reader, ClosableIterator<RowData> 
iterator) {
       this.reader = reader;
       this.iterator = iterator;
     }
@@ -621,22 +686,20 @@ public class MergeOnReadInputFormat
 
   static class MergeIterator implements RecordIterator {
     // base file reader
-    private final ParquetColumnarRowSplitReader reader;
+    private final HoodieParquetReader reader;
     // log keys used for merging
     private final Iterator<String> logKeysIterator;
     // scanner
     private final HoodieMergedLogRecordScanner scanner;
 
     private final Schema tableSchema;
-    private final Schema requiredSchema;
-    private final int[] requiredPos;
     private final boolean emitDelete;
     private final int operationPos;
     private final RowDataToAvroConverters.RowDataToAvroConverter 
rowDataToAvroConverter;
     private final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter;
-    private final GenericRecordBuilder recordBuilder;
 
-    private final RowDataProjection projection;
+    private final Option<RowDataProjection> projection;
+    private final Option<Function<IndexedRecord, GenericRecord>> 
avroProjection;
 
     private final InstantRange instantRange;
 
@@ -651,7 +714,7 @@ public class MergeOnReadInputFormat
 
     private RowData currentRecord;
 
-    MergeIterator(
+    public MergeIterator(
         Configuration flinkConf,
         org.apache.hadoop.conf.Configuration hadoopConf,
         MergeOnReadInputSplit split,
@@ -659,23 +722,42 @@ public class MergeOnReadInputFormat
         RowType requiredRowType,
         Schema tableSchema,
         Schema requiredSchema,
+        InternalSchema querySchema,
         int[] requiredPos,
         boolean emitDelete,
         int operationPos,
-        ParquetColumnarRowSplitReader reader) { // the reader should be with 
full schema
+        HoodieParquetReader reader) { // the reader should be with full schema
+      this(flinkConf, hadoopConf, split, tableRowType, requiredRowType, 
tableSchema,
+          querySchema,
+          Option.of(RowDataProjection.instance(requiredRowType, requiredPos)),
+          Option.of(record -> buildAvroRecordBySchema(record, requiredSchema, 
requiredPos, new GenericRecordBuilder(requiredSchema))),
+          emitDelete, operationPos, reader);
+    }
+
+    public MergeIterator(
+        Configuration flinkConf,
+        org.apache.hadoop.conf.Configuration hadoopConf,
+        MergeOnReadInputSplit split,
+        RowType tableRowType,
+        RowType requiredRowType,
+        Schema tableSchema,
+        InternalSchema querySchema,
+        Option<RowDataProjection> projection,
+        Option<Function<IndexedRecord, GenericRecord>> avroProjection,
+        boolean emitDelete,
+        int operationPos,
+        HoodieParquetReader reader) { // the reader should be with full schema
       this.tableSchema = tableSchema;
       this.reader = reader;
-      this.scanner = FormatUtils.logScanner(split, tableSchema, flinkConf, 
hadoopConf);
+      this.scanner = FormatUtils.logScanner(split, tableSchema, querySchema, 
flinkConf, hadoopConf);
       this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps();
       this.logKeysIterator = scanner.getRecords().keySet().iterator();
-      this.requiredSchema = requiredSchema;
-      this.requiredPos = requiredPos;
       this.emitDelete = emitDelete;
       this.operationPos = operationPos;
-      this.recordBuilder = new GenericRecordBuilder(requiredSchema);
+      this.avroProjection = avroProjection;
       this.rowDataToAvroConverter = 
RowDataToAvroConverters.createConverter(tableRowType);
       this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(requiredRowType);
-      this.projection = RowDataProjection.instance(requiredRowType, 
requiredPos);
+      this.projection = projection;
       this.instantRange = split.getInstantRange().orElse(null);
     }
 
@@ -703,18 +785,18 @@ public class MergeOnReadInputFormat
               // deleted
               continue;
             }
-            GenericRecord avroRecord = buildAvroRecordBySchema(
-                mergedAvroRecord.get(),
-                requiredSchema,
-                requiredPos,
-                recordBuilder);
+            IndexedRecord avroRecord = avroProjection.isPresent()
+                    ? avroProjection.get().apply(mergedAvroRecord.get())
+                    : mergedAvroRecord.get();
             this.currentRecord = (RowData) 
avroToRowDataConverter.convert(avroRecord);
             this.currentRecord.setRowKind(rowKind);
             return false;
           }
         }
         // project the full record in base with required positions
-        currentRecord = projection.project(currentRecord);
+        if (projection.isPresent()) {
+          currentRecord = projection.get().project(currentRecord);
+        }
         return false;
       }
       // read the logs
@@ -725,11 +807,9 @@ public class MergeOnReadInputFormat
           Option<IndexedRecord> insertAvroRecord = getInsertValue(curKey);
           if (insertAvroRecord.isPresent()) {
             // the record is a DELETE if insertAvroRecord not present, skipping
-            GenericRecord avroRecord = buildAvroRecordBySchema(
-                insertAvroRecord.get(),
-                requiredSchema,
-                requiredPos,
-                recordBuilder);
+            IndexedRecord avroRecord = avroProjection.isPresent()
+                    ? avroProjection.get().apply(insertAvroRecord.get())
+                    : insertAvroRecord.get();
             this.currentRecord = (RowData) 
avroToRowDataConverter.convert(avroRecord);
             FormatUtils.setRowKind(this.currentRecord, insertAvroRecord.get(), 
this.operationPos);
             return false;
@@ -775,12 +855,13 @@ public class MergeOnReadInputFormat
    * Builder for {@link MergeOnReadInputFormat}.
    */
   public static class Builder {
-    private Configuration conf;
-    private MergeOnReadTableState tableState;
-    private List<DataType> fieldTypes;
-    private String defaultPartName;
-    private long limit = -1;
-    private boolean emitDelete = false;
+    protected Configuration conf;
+    protected MergeOnReadTableState tableState;
+    protected List<DataType> fieldTypes;
+    protected String defaultPartName;
+    protected long limit = -1;
+    protected boolean emitDelete = false;
+    protected InternalSchemaManager internalSchemaManager = 
InternalSchemaManager.DISABLED;
 
     public Builder config(Configuration conf) {
       this.conf = conf;
@@ -812,9 +893,14 @@ public class MergeOnReadInputFormat
       return this;
     }
 
+    public Builder internalSchemaManager(InternalSchemaManager 
internalSchemaManager) {
+      this.internalSchemaManager = internalSchemaManager;
+      return this;
+    }
+
     public MergeOnReadInputFormat build() {
       return new MergeOnReadInputFormat(conf, tableState, fieldTypes,
-          defaultPartName, limit, emitDelete);
+          defaultPartName, limit, emitDelete, internalSchemaManager);
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java
new file mode 100644
index 00000000000..55e85aa1f60
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.table.format.CastMap;
+
+import org.apache.flink.table.types.logical.LogicalType;
+
+import javax.annotation.Nullable;
+import java.util.stream.IntStream;
+
+/**
+ * This class is responsible to project row as well as {@link 
RowDataProjection}.
+ * In addition, fields are converted according to the CastMap.
+ */
+public final class RowDataCastProjection extends RowDataProjection {
+  private static final long serialVersionUID = 1L;
+
+  private final CastMap castMap;
+
+  public RowDataCastProjection(LogicalType[] types, CastMap castMap) {
+    super(types, IntStream.range(0, types.length).toArray());
+    this.castMap = castMap;
+  }
+
+  @Override
+  protected @Nullable Object getVal(int pos, @Nullable Object val) {
+    if (val == null) {
+      return null;
+    }
+    return castMap.castIfNeeded(pos, val);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
index 8076d982b99..d6604159579 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
+import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
@@ -37,7 +38,7 @@ public class RowDataProjection implements Serializable {
 
   private final RowData.FieldGetter[] fieldGetters;
 
-  private RowDataProjection(LogicalType[] types, int[] positions) {
+  protected RowDataProjection(LogicalType[] types, int[] positions) {
     ValidationUtils.checkArgument(types.length == positions.length,
         "types and positions should have the equal number");
     this.fieldGetters = new RowData.FieldGetter[types.length];
@@ -70,7 +71,7 @@ public class RowDataProjection implements Serializable {
     GenericRowData genericRowData = new 
GenericRowData(this.fieldGetters.length);
     for (int i = 0; i < this.fieldGetters.length; i++) {
       final Object val = this.fieldGetters[i].getFieldOrNull(rowData);
-      genericRowData.setField(i, val);
+      genericRowData.setField(i, getVal(i, val));
     }
     return genericRowData;
   }
@@ -86,4 +87,8 @@ public class RowDataProjection implements Serializable {
     }
     return values;
   }
+
+  protected @Nullable Object getVal(int pos, @Nullable Object val) {
+    return val;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
new file mode 100644
index 00000000000..3a668514674
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.utils.FlinkMiniCluster;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.internal.schema.action.TableChange.ColumnPositionChange.ColumnPositionType.AFTER;
+import static 
org.apache.hudi.internal.schema.action.TableChange.ColumnPositionChange.ColumnPositionType.BEFORE;
+import static 
org.apache.hudi.utils.TestConfigurations.ROW_TYPE_EVOLUTION_AFTER;
+import static 
org.apache.hudi.utils.TestConfigurations.ROW_TYPE_EVOLUTION_BEFORE;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@SuppressWarnings({"SqlDialectInspection", "SqlNoDataSourceInspection"})
+@ExtendWith(FlinkMiniCluster.class)
+public class ITTestSchemaEvolution {
+
+  @TempDir File tempFile;
+  private StreamTableEnvironment tEnv;
+
+  @BeforeEach
+  public void setUp() {
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
+    tEnv = StreamTableEnvironment.create(env);
+  }
+
+  @Test
+  public void testCopyOnWriteInputFormat() throws Exception {
+    testSchemaEvolution(defaultTableOptions(tempFile.getAbsolutePath()));
+  }
+
+  @Test
+  public void testMergeOnReadInputFormatBaseFileOnlyIterator() throws 
Exception {
+    TableOptions tableOptions = defaultTableOptions(tempFile.getAbsolutePath())
+        .withOption(FlinkOptions.READ_AS_STREAMING.key(), true)
+        .withOption(FlinkOptions.READ_START_COMMIT.key(), 
FlinkOptions.START_COMMIT_EARLIEST);
+    testSchemaEvolution(tableOptions);
+  }
+
+  @Test
+  public void testMergeOnReadInputFormatBaseFileOnlyFilteringIterator() throws 
Exception {
+    TableOptions tableOptions = defaultTableOptions(tempFile.getAbsolutePath())
+        .withOption(FlinkOptions.READ_AS_STREAMING.key(), true)
+        .withOption(FlinkOptions.READ_START_COMMIT.key(), 1);
+    testSchemaEvolution(tableOptions);
+  }
+
+  @Test
+  public void 
testMergeOnReadInputFormatLogFileOnlyIteratorGetLogFileIterator() throws 
Exception {
+    TableOptions tableOptions = defaultTableOptions(tempFile.getAbsolutePath())
+        .withOption(FlinkOptions.TABLE_TYPE.key(), 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    testSchemaEvolution(tableOptions);
+  }
+
+  @Test
+  public void 
testMergeOnReadInputFormatLogFileOnlyIteratorGetUnMergedLogFileIterator() 
throws Exception {
+    TableOptions tableOptions = defaultTableOptions(tempFile.getAbsolutePath())
+        .withOption(FlinkOptions.TABLE_TYPE.key(), 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
+        .withOption(FlinkOptions.READ_AS_STREAMING.key(), true)
+        .withOption(FlinkOptions.READ_START_COMMIT.key(), 
FlinkOptions.START_COMMIT_EARLIEST)
+        .withOption(FlinkOptions.CHANGELOG_ENABLED.key(), true);
+    testSchemaEvolution(tableOptions, false, EXPECTED_UNMERGED_RESULT);
+  }
+
+  @Test
+  public void testMergeOnReadInputFormatMergeIterator() throws Exception {
+    TableOptions tableOptions = defaultTableOptions(tempFile.getAbsolutePath())
+        .withOption(FlinkOptions.TABLE_TYPE.key(), 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
+        .withOption(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), 1);
+    testSchemaEvolution(tableOptions, true);
+  }
+
+  @Test
+  public void testMergeOnReadInputFormatSkipMergeIterator() throws Exception {
+    TableOptions tableOptions = defaultTableOptions(tempFile.getAbsolutePath())
+        .withOption(FlinkOptions.TABLE_TYPE.key(), 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
+        .withOption(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), 1)
+        .withOption(FlinkOptions.MERGE_TYPE.key(), 
FlinkOptions.REALTIME_SKIP_MERGE);
+    testSchemaEvolution(tableOptions, true, EXPECTED_UNMERGED_RESULT);
+  }
+
+  @Test
+  public void testCompaction() throws Exception {
+    TableOptions tableOptions = defaultTableOptions(tempFile.getAbsolutePath())
+        .withOption(FlinkOptions.TABLE_TYPE.key(), 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ)
+        .withOption(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), 1);
+    testSchemaEvolution(tableOptions);
+    try (HoodieFlinkWriteClient<?> writeClient = 
FlinkWriteClients.createWriteClient(tableOptions.toConfig())) {
+      Option<String> compactionInstant = 
writeClient.scheduleCompaction(Option.empty());
+      writeClient.compact(compactionInstant.get());
+    }
+    checkAnswerEvolved(EXPECTED_MERGED_RESULT.evolvedRows);
+  }
+
+  private void testSchemaEvolution(TableOptions tableOptions) throws Exception 
{
+    testSchemaEvolution(tableOptions, false);
+  }
+
+  private void testSchemaEvolution(TableOptions tableOptions, boolean 
shouldCompact) throws Exception {
+    testSchemaEvolution(tableOptions, shouldCompact, EXPECTED_MERGED_RESULT);
+  }
+
+  private void testSchemaEvolution(TableOptions tableOptions, boolean 
shouldCompact, ExpectedResult expectedResult) throws Exception {
+    writeTableWithSchema1(tableOptions);
+    changeTableSchema(tableOptions, shouldCompact);
+    writeTableWithSchema2(tableOptions);
+    checkAnswerEvolved(expectedResult.evolvedRows);
+    checkAnswerCount(expectedResult.rowCount);
+    checkAnswerWithMeta(tableOptions, expectedResult.rowsWithMeta);
+  }
+
+  private void writeTableWithSchema1(TableOptions tableOptions) throws 
ExecutionException, InterruptedException {
+    //language=SQL
+    tEnv.executeSql(""
+        + "create table t1 ("
+        + "  uuid string,"
+        + "  name string,"
+        + "  gender char,"
+        + "  age int,"
+        + "  ts timestamp,"
+        + "  `partition` string"
+        + ") partitioned by (`partition`) with (" + tableOptions + ")"
+    );
+    //language=SQL
+    tEnv.executeSql(""
+        + "insert into t1 select "
+        + "  cast(uuid as string),"
+        + "  cast(name as string),"
+        + "  cast(gender as char),"
+        + "  cast(age as int),"
+        + "  cast(ts as timestamp),"
+        + "  cast(`partition` as string) "
+        + "from (values "
+        + "  ('id1', 'Danny', 'M', 23, '2000-01-01 00:00:01', 'par1'),"
+        + "  ('id2', 'Stephen', 'M', 33, '2000-01-01 00:00:02', 'par1'),"
+        + "  ('id3', 'Julian', 'M', 53, '2000-01-01 00:00:03', 'par2'),"
+        + "  ('id4', 'Fabian', 'M', 31, '2000-01-01 00:00:04', 'par2'),"
+        + "  ('id5', 'Sophia', 'F', 18, '2000-01-01 00:00:05', 'par3'),"
+        + "  ('id6', 'Emma', 'F', 20, '2000-01-01 00:00:06', 'par3'),"
+        + "  ('id7', 'Bob', 'M', 44, '2000-01-01 00:00:07', 'par4'),"
+        + "  ('id8', 'Han', 'M', 56, '2000-01-01 00:00:08', 'par4')"
+        + ") as A(uuid, name, gender, age, ts, `partition`)"
+    ).await();
+  }
+
+  private void changeTableSchema(TableOptions tableOptions, boolean 
shouldCompactBeforeSchemaChanges) throws IOException {
+    try (HoodieFlinkWriteClient<?> writeClient = 
FlinkWriteClients.createWriteClient(tableOptions.toConfig())) {
+      if (shouldCompactBeforeSchemaChanges) {
+        Option<String> compactionInstant = 
writeClient.scheduleCompaction(Option.empty());
+        writeClient.compact(compactionInstant.get());
+      }
+      Schema doubleType = 
SchemaBuilder.unionOf().nullType().and().doubleType().endUnion();
+      Schema stringType = 
SchemaBuilder.unionOf().nullType().and().stringType().endUnion();
+      writeClient.addColumn("salary", doubleType, null, "name", AFTER);
+      writeClient.deleteColumns("gender");
+      writeClient.renameColumn("name", "first_name");
+      writeClient.updateColumnType("age", Types.StringType.get());
+      writeClient.addColumn("last_name", stringType, "empty allowed", 
"salary", BEFORE);
+      writeClient.reOrderColPosition("age", "first_name", BEFORE);
+    }
+  }
+
+  private void writeTableWithSchema2(TableOptions tableOptions) throws 
ExecutionException, InterruptedException {
+    tableOptions.withOption(
+        FlinkOptions.SOURCE_AVRO_SCHEMA.key(),
+        AvroSchemaConverter.convertToSchema(ROW_TYPE_EVOLUTION_AFTER, 
"hoodie.t1.t1_record"));
+
+    //language=SQL
+    tEnv.executeSql("drop table t1");
+    //language=SQL
+    tEnv.executeSql(""
+        + "create table t1 ("
+        + "  uuid string,"
+        + "  age string,"
+        + "  first_name string,"
+        + "  last_name string,"
+        + "  salary double,"
+        + "  ts timestamp,"
+        + "  `partition` string"
+        + ") partitioned by (`partition`) with (" + tableOptions + ")"
+    );
+    //language=SQL
+    tEnv.executeSql(""
+        + "insert into t1 select "
+        + "  cast(uuid as string),"
+        + "  cast(age as string),"
+        + "  cast(first_name as string),"
+        + "  cast(last_name as string),"
+        + "  cast(salary as double),"
+        + "  cast(ts as timestamp),"
+        + "  cast(`partition` as string) "
+        + "from (values "
+        + "  ('id1', '23', 'Danny', '', 10000.1, '2000-01-01 00:00:01', 
'par1'),"
+        + "  ('id9', 'unknown', 'Alice', '', 90000.9, '2000-01-01 00:00:09', 
'par1'),"
+        + "  ('id3', '53', 'Julian', '', 30000.3, '2000-01-01 00:00:03', 
'par2')"
+        + ") as A(uuid, age, first_name, last_name, salary, ts, `partition`)"
+    ).await();
+  }
+
+  private TableOptions defaultTableOptions(String tablePath) {
+    return new TableOptions(
+        FactoryUtil.CONNECTOR.key(), HoodieTableFactory.FACTORY_ID,
+        FlinkOptions.PATH.key(), tablePath,
+        FlinkOptions.TABLE_TYPE.key(), FlinkOptions.TABLE_TYPE_COPY_ON_WRITE,
+        HoodieTableConfig.NAME.key(), "t1",
+        FlinkOptions.READ_AS_STREAMING.key(), false,
+        FlinkOptions.QUERY_TYPE.key(), FlinkOptions.QUERY_TYPE_SNAPSHOT,
+        KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "uuid",
+        KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition",
+        KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), true,
+        HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), 
ComplexAvroKeyGenerator.class.getName(),
+        FlinkOptions.WRITE_BATCH_SIZE.key(), 0.000001, // each record triggers 
flush
+        FlinkOptions.SOURCE_AVRO_SCHEMA.key(), 
AvroSchemaConverter.convertToSchema(ROW_TYPE_EVOLUTION_BEFORE),
+        FlinkOptions.READ_TASKS.key(), 1,
+        FlinkOptions.WRITE_TASKS.key(), 1,
+        FlinkOptions.INDEX_BOOTSTRAP_TASKS.key(), 1,
+        FlinkOptions.BUCKET_ASSIGN_TASKS.key(), 1,
+        FlinkOptions.COMPACTION_TASKS.key(), 1,
+        FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), false,
+        HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), true);
+  }
+
+  private void checkAnswerEvolved(String... expectedResult) throws Exception {
+    //language=SQL
+    checkAnswer("select first_name, salary, age from t1", expectedResult);
+  }
+
+  private void checkAnswerCount(String... expectedResult) throws Exception {
+    //language=SQL
+    checkAnswer("select count(*) from t1", expectedResult);
+  }
+
+  private void checkAnswerWithMeta(TableOptions tableOptions, String... 
expectedResult) throws Exception {
+    //language=SQL
+    tEnv.executeSql("drop table t1");
+    //language=SQL
+    tEnv.executeSql(""
+        + "create table t1 ("
+        + "  `_hoodie_commit_time` string,"
+        + "  `_hoodie_commit_seqno` string,"
+        + "  `_hoodie_record_key` string,"
+        + "  `_hoodie_partition_path` string,"
+        + "  `_hoodie_file_name` string,"
+        + "  uuid string,"
+        + "  age string,"
+        + "  first_name string,"
+        + "  last_name string,"
+        + "  salary double,"
+        + "  ts timestamp,"
+        + "  `partition` string"
+        + ") partitioned by (`partition`) with (" + tableOptions + ")"
+    );
+    //language=SQL
+    checkAnswer("select `_hoodie_record_key`, first_name, salary from t1", 
expectedResult);
+  }
+
+  private void checkAnswer(String query, String... expectedResult) throws 
Exception {
+    TableResult actualResult = tEnv.executeSql(query);
+    Set<String> expected = new HashSet<>(Arrays.asList(expectedResult));
+    Set<String> actual = new HashSet<>(expected.size());
+    try (CloseableIterator<Row> iterator = actualResult.collect()) {
+      for (int i = 0; i < expected.size() && iterator.hasNext(); i++) {
+        actual.add(iterator.next().toString());
+      }
+    }
+    assertEquals(expected, actual);
+  }
+
+  private static final class TableOptions {
+    private final Map<String, String> map = new HashMap<>();
+
+    TableOptions(Object... options) {
+      Preconditions.checkArgument(options.length % 2 == 0);
+      for (int i = 0; i < options.length; i += 2) {
+        withOption(options[i].toString(), options[i + 1]);
+      }
+    }
+
+    TableOptions withOption(String optionName, Object optionValue) {
+      if (StringUtils.isNullOrEmpty(optionName)) {
+        throw new IllegalArgumentException("optionName must be presented");
+      }
+      map.put(optionName, optionValue.toString());
+      return this;
+    }
+
+    Configuration toConfig() {
+      return FlinkOptions.fromMap(map);
+    }
+
+    @Override
+    public String toString() {
+      return map.entrySet().stream()
+          .map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue()))
+          .collect(Collectors.joining(", "));
+    }
+  }
+
+  private static final class ExpectedResult {
+    final String[] evolvedRows;
+    final String[] rowsWithMeta;
+    final String[] rowCount;
+
+    private ExpectedResult(String[] evolvedRows, String[] rowsWithMeta, 
String[] rowCount) {
+      this.evolvedRows = evolvedRows;
+      this.rowsWithMeta = rowsWithMeta;
+      this.rowCount = rowCount;
+    }
+  }
+
+  private static final ExpectedResult EXPECTED_MERGED_RESULT = new 
ExpectedResult(
+      new String[] {
+          "+I[Danny, 10000.1, 23]",
+          "+I[Stephen, null, 33]",
+          "+I[Julian, 30000.3, 53]",
+          "+I[Fabian, null, 31]",
+          "+I[Sophia, null, 18]",
+          "+I[Emma, null, 20]",
+          "+I[Bob, null, 44]",
+          "+I[Han, null, 56]",
+          "+I[Alice, 90000.9, unknown]",
+      },
+      new String[] {
+          "+I[uuid:id1, Danny, 10000.1]",
+          "+I[uuid:id2, Stephen, null]",
+          "+I[uuid:id3, Julian, 30000.3]",
+          "+I[uuid:id4, Fabian, null]",
+          "+I[uuid:id5, Sophia, null]",
+          "+I[uuid:id6, Emma, null]",
+          "+I[uuid:id7, Bob, null]",
+          "+I[uuid:id8, Han, null]",
+          "+I[uuid:id9, Alice, 90000.9]",
+      },
+      new String[] {
+          "+I[1]",
+          "-U[1]",
+          "+U[2]",
+          "-U[2]",
+          "+U[3]",
+          "-U[3]",
+          "+U[4]",
+          "-U[4]",
+          "+U[5]",
+          "-U[5]",
+          "+U[6]",
+          "-U[6]",
+          "+U[7]",
+          "-U[7]",
+          "+U[8]",
+          "-U[8]",
+          "+U[9]",
+      }
+  );
+
+  private static final ExpectedResult EXPECTED_UNMERGED_RESULT = new 
ExpectedResult(
+      new String[] {
+          "+I[Danny, null, 23]",
+          "+I[Stephen, null, 33]",
+          "+I[Julian, null, 53]",
+          "+I[Fabian, null, 31]",
+          "+I[Sophia, null, 18]",
+          "+I[Emma, null, 20]",
+          "+I[Bob, null, 44]",
+          "+I[Han, null, 56]",
+          "+I[Alice, 90000.9, unknown]",
+          "+I[Danny, 10000.1, 23]",
+          "+I[Julian, 30000.3, 53]",
+      },
+      new String[] {
+          "+I[uuid:id1, Danny, null]",
+          "+I[uuid:id2, Stephen, null]",
+          "+I[uuid:id3, Julian, null]",
+          "+I[uuid:id4, Fabian, null]",
+          "+I[uuid:id5, Sophia, null]",
+          "+I[uuid:id6, Emma, null]",
+          "+I[uuid:id7, Bob, null]",
+          "+I[uuid:id8, Han, null]",
+          "+I[uuid:id9, Alice, 90000.9]",
+          "+I[uuid:id1, Danny, 10000.1]",
+          "+I[uuid:id3, Julian, 30000.3]",
+      },
+      new String[] {
+          "+I[1]",
+          "-U[1]",
+          "+U[2]",
+          "-U[2]",
+          "+U[3]",
+          "-U[3]",
+          "+U[4]",
+          "-U[4]",
+          "+U[5]",
+          "-U[5]",
+          "+U[6]",
+          "-U[6]",
+          "+U[7]",
+          "-U[7]",
+          "+U[8]",
+          "-U[8]",
+          "+U[9]",
+          "-U[9]",
+          "+U[10]",
+          "-U[10]",
+          "+U[11]",
+      }
+  );
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestCastMap.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestCastMap.java
new file mode 100644
index 00000000000..bcefea5b1cf
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestCastMap.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests for {@link CastMap}.
+ */
+public class TestCastMap {
+
+  @Test
+  public void testCastInt() {
+    CastMap castMap = new CastMap();
+    castMap.add(0, new IntType(), new BigIntType());
+    castMap.add(1, new IntType(), new FloatType());
+    castMap.add(2, new IntType(), new DoubleType());
+    castMap.add(3, new IntType(), new DecimalType());
+    castMap.add(4, new IntType(), new VarCharType());
+    int val = 1;
+    assertEquals(1L, castMap.castIfNeeded(0, val));
+    assertEquals(1.0F, castMap.castIfNeeded(1, val));
+    assertEquals(1.0, castMap.castIfNeeded(2, val));
+    assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), 
castMap.castIfNeeded(3, val));
+    assertEquals(BinaryStringData.fromString("1"), castMap.castIfNeeded(4, 
val));
+  }
+
+  @Test
+  public void testCastLong() {
+    CastMap castMap = new CastMap();
+    castMap.add(0, new BigIntType(), new FloatType());
+    castMap.add(1, new BigIntType(), new DoubleType());
+    castMap.add(2, new BigIntType(), new DecimalType());
+    castMap.add(3, new BigIntType(), new VarCharType());
+    long val = 1L;
+    assertEquals(1.0F, castMap.castIfNeeded(0, val));
+    assertEquals(1.0, castMap.castIfNeeded(1, val));
+    assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), 
castMap.castIfNeeded(2, val));
+    assertEquals(BinaryStringData.fromString("1"), castMap.castIfNeeded(3, 
val));
+  }
+
+  @Test
+  public void testCastFloat() {
+    CastMap castMap = new CastMap();
+    castMap.add(0, new FloatType(), new DoubleType());
+    castMap.add(1, new FloatType(), new DecimalType());
+    castMap.add(2, new FloatType(), new VarCharType());
+    float val = 1F;
+    assertEquals(1.0, castMap.castIfNeeded(0, val));
+    assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), 
castMap.castIfNeeded(1, val));
+    assertEquals(BinaryStringData.fromString("1.0"), castMap.castIfNeeded(2, 
val));
+  }
+
+  @Test
+  public void testCastDouble() {
+    CastMap castMap = new CastMap();
+    castMap.add(0, new DoubleType(), new DecimalType());
+    castMap.add(1, new DoubleType(), new VarCharType());
+    double val = 1;
+    assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), 
castMap.castIfNeeded(0, val));
+    assertEquals(BinaryStringData.fromString("1.0"), castMap.castIfNeeded(1, 
val));
+  }
+
+  @Test
+  public void testCastDecimal() {
+    CastMap castMap = new CastMap();
+    castMap.add(0, new DecimalType(2, 1), new DecimalType(3, 2));
+    castMap.add(1, new DecimalType(), new VarCharType());
+    DecimalData val = DecimalData.fromBigDecimal(BigDecimal.ONE, 2, 1);
+    assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 3, 2), 
castMap.castIfNeeded(0, val));
+    assertEquals(BinaryStringData.fromString("1.0"), castMap.castIfNeeded(1, 
val));
+  }
+
+  @Test
+  public void testCastString() {
+    CastMap castMap = new CastMap();
+    castMap.add(0, new VarCharType(), new DecimalType());
+    castMap.add(1, new VarCharType(), new DateType());
+    assertEquals(DecimalData.fromBigDecimal(BigDecimal.ONE, 1, 0), 
castMap.castIfNeeded(0, BinaryStringData.fromString("1.0")));
+    assertEquals((int) LocalDate.parse("2022-05-12").toEpochDay(), 
castMap.castIfNeeded(1, BinaryStringData.fromString("2022-05-12")));
+  }
+
+  @Test
+  public void testCastDate() {
+    CastMap castMap = new CastMap();
+    castMap.add(0, new DateType(), new VarCharType());
+    assertEquals(BinaryStringData.fromString("2022-05-12"), 
castMap.castIfNeeded(0, (int) LocalDate.parse("2022-05-12").toEpochDay()));
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index a5b7e368a88..bff2553df81 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -84,6 +84,29 @@ public class TestConfigurations {
 
   public static final RowType ROW_TYPE_DATE = (RowType) 
ROW_DATA_TYPE_DATE.getLogicalType();
 
+  public static final DataType ROW_DATA_TYPE_EVOLUTION_BEFORE = DataTypes.ROW(
+          DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),
+          DataTypes.FIELD("name", DataTypes.VARCHAR(10)),
+          DataTypes.FIELD("gender", DataTypes.CHAR(1)), // removed field
+          DataTypes.FIELD("age", DataTypes.INT()),
+          DataTypes.FIELD("ts", DataTypes.TIMESTAMP(6)),
+          DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+      .notNull();
+
+  public static final RowType ROW_TYPE_EVOLUTION_BEFORE = (RowType) 
ROW_DATA_TYPE_EVOLUTION_BEFORE.getLogicalType();
+
+  public static final DataType ROW_DATA_TYPE_EVOLUTION_AFTER = DataTypes.ROW(
+          DataTypes.FIELD("uuid", DataTypes.VARCHAR(20)),
+          DataTypes.FIELD("age", DataTypes.VARCHAR(10)), // changed type, 
reordered
+          DataTypes.FIELD("first_name", DataTypes.VARCHAR(10)), // renamed
+          DataTypes.FIELD("last_name", DataTypes.VARCHAR(10)), // new field
+          DataTypes.FIELD("salary", DataTypes.DOUBLE()), // new field
+          DataTypes.FIELD("ts", DataTypes.TIMESTAMP(6)),
+          DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
+      .notNull();
+
+  public static final RowType ROW_TYPE_EVOLUTION_AFTER = (RowType) 
ROW_DATA_TYPE_EVOLUTION_AFTER.getLogicalType();
+
   public static String getCreateHoodieTableDDL(String tableName, Map<String, 
String> options) {
     return getCreateHoodieTableDDL(tableName, options, true, "partition");
   }

Reply via email to