This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 014168b948a [HUDI-6211] Fix reading of schema-evolved complex columns 
for Flink (#8711)
014168b948a is described below

commit 014168b948a5a1ca88ea3e8cca213dfd46cef3f4
Author: voonhous <[email protected]>
AuthorDate: Wed May 17 16:45:06 2023 +0800

    [HUDI-6211] Fix reading of schema-evolved complex columns for Flink (#8711)
---
 .../apache/hudi/table/ITTestSchemaEvolution.java   | 202 +++++++++++++--------
 .../org/apache/hudi/utils/TestConfigurations.java  |   8 +
 .../table/format/cow/ParquetSplitReaderUtil.java   |  56 ++++--
 .../cow/vector/reader/EmptyColumnReader.java       |  41 +++++
 .../table/format/cow/ParquetSplitReaderUtil.java   |  56 ++++--
 .../cow/vector/reader/EmptyColumnReader.java       |  41 +++++
 .../table/format/cow/ParquetSplitReaderUtil.java   |  56 ++++--
 .../cow/vector/reader/EmptyColumnReader.java       |  42 +++++
 .../table/format/cow/ParquetSplitReaderUtil.java   |  56 ++++--
 .../cow/vector/reader/EmptyColumnReader.java       |  42 +++++
 .../table/format/cow/ParquetSplitReaderUtil.java   |  56 ++++--
 .../cow/vector/reader/EmptyColumnReader.java       |  42 +++++
 12 files changed, 533 insertions(+), 165 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
index 1b0fd3a342e..0372570c3cb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
@@ -34,7 +34,6 @@ import org.apache.hudi.utils.FlinkMiniCluster;
 
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
-
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableResult;
@@ -43,7 +42,6 @@ import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Preconditions;
-
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -57,13 +55,17 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.internal.schema.action.TableChange.ColumnPositionChange.ColumnPositionType.AFTER;
 import static 
org.apache.hudi.internal.schema.action.TableChange.ColumnPositionChange.ColumnPositionType.BEFORE;
 import static 
org.apache.hudi.utils.TestConfigurations.ROW_TYPE_EVOLUTION_AFTER;
 import static 
org.apache.hudi.utils.TestConfigurations.ROW_TYPE_EVOLUTION_BEFORE;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @SuppressWarnings({"SqlDialectInspection", "SqlNoDataSourceInspection"})
@@ -173,6 +175,7 @@ public class ITTestSchemaEvolution {
         + "  gender char,"
         + "  age int,"
         + "  ts timestamp,"
+        + "  f_struct row<f0 int, f1 string>,"
         + "  `partition` string"
         + ") partitioned by (`partition`) with (" + tableOptions + ")"
     );
@@ -184,17 +187,19 @@ public class ITTestSchemaEvolution {
         + "  cast(gender as char),"
         + "  cast(age as int),"
         + "  cast(ts as timestamp),"
+        + "  cast(f_struct as row<f0 int, f1 string>),"
         + "  cast(`partition` as string) "
         + "from (values "
-        + "  ('id1', 'Danny', 'M', 23, '2000-01-01 00:00:01', 'par1'),"
-        + "  ('id2', 'Stephen', 'M', 33, '2000-01-01 00:00:02', 'par1'),"
-        + "  ('id3', 'Julian', 'M', 53, '2000-01-01 00:00:03', 'par2'),"
-        + "  ('id4', 'Fabian', 'M', 31, '2000-01-01 00:00:04', 'par2'),"
-        + "  ('id5', 'Sophia', 'F', 18, '2000-01-01 00:00:05', 'par3'),"
-        + "  ('id6', 'Emma', 'F', 20, '2000-01-01 00:00:06', 'par3'),"
-        + "  ('id7', 'Bob', 'M', 44, '2000-01-01 00:00:07', 'par4'),"
-        + "  ('id8', 'Han', 'M', 56, '2000-01-01 00:00:08', 'par4')"
-        + ") as A(uuid, name, gender, age, ts, `partition`)"
+        + "  ('id0', 'Indica', 'F', 12, '2000-01-01 00:00:00', cast(null as 
row<f0 int, f1 string>), 'par0'),"
+        + "  ('id1', 'Danny', 'M', 23, '2000-01-01 00:00:01', row(1, 's1'), 
'par1'),"
+        + "  ('id2', 'Stephen', 'M', 33, '2000-01-01 00:00:02', row(2, 's2'), 
'par1'),"
+        + "  ('id3', 'Julian', 'M', 53, '2000-01-01 00:00:03', row(3, 's3'), 
'par2'),"
+        + "  ('id4', 'Fabian', 'M', 31, '2000-01-01 00:00:04', row(4, 's4'), 
'par2'),"
+        + "  ('id5', 'Sophia', 'F', 18, '2000-01-01 00:00:05', row(5, 's5'), 
'par3'),"
+        + "  ('id6', 'Emma', 'F', 20, '2000-01-01 00:00:06', row(6, 's6'), 
'par3'),"
+        + "  ('id7', 'Bob', 'M', 44, '2000-01-01 00:00:07', row(7, 's7'), 
'par4'),"
+        + "  ('id8', 'Han', 'M', 56, '2000-01-01 00:00:08', row(8, 's8'), 
'par4')"
+        + ") as A(uuid, name, gender, age, ts, f_struct, `partition`)"
     ).await();
   }
 
@@ -204,6 +209,7 @@ public class ITTestSchemaEvolution {
         Option<String> compactionInstant = 
writeClient.scheduleCompaction(Option.empty());
         writeClient.compact(compactionInstant.get());
       }
+      Schema intType = 
SchemaBuilder.unionOf().nullType().and().intType().endUnion();
       Schema doubleType = 
SchemaBuilder.unionOf().nullType().and().doubleType().endUnion();
       Schema stringType = 
SchemaBuilder.unionOf().nullType().and().stringType().endUnion();
       writeClient.addColumn("salary", doubleType, null, "name", AFTER);
@@ -212,6 +218,11 @@ public class ITTestSchemaEvolution {
       writeClient.updateColumnType("age", Types.StringType.get());
       writeClient.addColumn("last_name", stringType, "empty allowed", 
"salary", BEFORE);
       writeClient.reOrderColPosition("age", "first_name", BEFORE);
+      // add a field in the middle of the `f_struct` column
+      writeClient.addColumn("f_struct.f2", intType, "add field in middle of 
struct", "f_struct.f0", AFTER);
+      // add a field at the end of `f_struct` column
+      writeClient.addColumn("f_struct.f3", stringType);
+
     }
   }
 
@@ -231,6 +242,7 @@ public class ITTestSchemaEvolution {
         + "  last_name string,"
         + "  salary double,"
         + "  ts timestamp,"
+        + "  f_struct row<f0 int, f2 int, f1 string, f3 string>,"
         + "  `partition` string"
         + ") partitioned by (`partition`) with (" + tableOptions + ")"
     );
@@ -243,12 +255,13 @@ public class ITTestSchemaEvolution {
         + "  cast(last_name as string),"
         + "  cast(salary as double),"
         + "  cast(ts as timestamp),"
+        + "  cast(f_struct as row<f0 int, f2 int, f1 string, f3 string>),"
         + "  cast(`partition` as string) "
         + "from (values "
-        + "  ('id1', '23', 'Danny', '', 10000.1, '2000-01-01 00:00:01', 
'par1'),"
-        + "  ('id9', 'unknown', 'Alice', '', 90000.9, '2000-01-01 00:00:09', 
'par1'),"
-        + "  ('id3', '53', 'Julian', '', 30000.3, '2000-01-01 00:00:03', 
'par2')"
-        + ") as A(uuid, age, first_name, last_name, salary, ts, `partition`)"
+        + "  ('id1', '23', 'Danny', '', 10000.1, '2000-01-01 00:00:01', row(1, 
1, 's1', 't1'), 'par1'),"
+        + "  ('id9', 'unknown', 'Alice', '', 90000.9, '2000-01-01 00:00:09', 
row(9, 9, 's9', 't9'), 'par1'),"
+        + "  ('id3', '53', 'Julian', '', 30000.3, '2000-01-01 00:00:03', 
row(3, 3, 's3', 't3'), 'par2')"
+        + ") as A(uuid, age, first_name, last_name, salary, ts, f_struct, 
`partition`)"
     ).await();
   }
 
@@ -278,7 +291,7 @@ public class ITTestSchemaEvolution {
 
   private void checkAnswerEvolved(String... expectedResult) throws Exception {
     //language=SQL
-    checkAnswer("select first_name, salary, age from t1", expectedResult);
+    checkAnswer("select first_name, salary, age, f_struct from t1", 
expectedResult);
   }
 
   private void checkAnswerCount(String... expectedResult) throws Exception {
@@ -303,22 +316,43 @@ public class ITTestSchemaEvolution {
         + "  last_name string,"
         + "  salary double,"
         + "  ts timestamp,"
+        + "  f_struct row<f0 int, f2 int, f1 string, f3 string>,"
         + "  `partition` string"
         + ") partitioned by (`partition`) with (" + tableOptions + ")"
     );
     //language=SQL
-    checkAnswer("select `_hoodie_record_key`, first_name, salary from t1", 
expectedResult);
+    checkAnswer("select `_hoodie_record_key`, first_name, salary, f_struct 
from t1", expectedResult);
   }
 
-  private void checkAnswer(String query, String... expectedResult) throws 
Exception {
+  private void checkAnswer(String query, String... expectedResult) {
     TableResult actualResult = tEnv.executeSql(query);
     Set<String> expected = new HashSet<>(Arrays.asList(expectedResult));
-    Set<String> actual = new HashSet<>(expected.size());
-    try (CloseableIterator<Row> iterator = actualResult.collect()) {
-      for (int i = 0; i < expected.size() && iterator.hasNext(); i++) {
-        actual.add(iterator.next().toString());
+    Set<String> actual = new HashSet<>();
+
+    // create a runnable to handle reads (especially useful for streaming 
reads as they are unbounded)
+    Runnable runnable = () -> {
+      try (CloseableIterator<Row> iterator = actualResult.collect()) {
+        while (iterator.hasNext()) {
+          actual.add(iterator.next().toString());
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
       }
+    };
+
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Future future = executor.submit(runnable);
+    try {
+      // allow result collector to run for a short period of time
+      future.get(5, TimeUnit.SECONDS);
+    } catch (TimeoutException e) {
+      future.cancel(true);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      executor.shutdownNow();
     }
+
     assertEquals(expected, actual);
   }
 
@@ -366,97 +400,105 @@ public class ITTestSchemaEvolution {
 
   private static final ExpectedResult EXPECTED_MERGED_RESULT = new 
ExpectedResult(
       new String[] {
-          "+I[Danny, 10000.1, 23]",
-          "+I[Stephen, null, 33]",
-          "+I[Julian, 30000.3, 53]",
-          "+I[Fabian, null, 31]",
-          "+I[Sophia, null, 18]",
-          "+I[Emma, null, 20]",
-          "+I[Bob, null, 44]",
-          "+I[Han, null, 56]",
-          "+I[Alice, 90000.9, unknown]",
+          "+I[Indica, null, 12, null]",
+          "+I[Danny, 10000.1, 23, +I[1, 1, s1, t1]]",
+          "+I[Stephen, null, 33, +I[2, null, s2, null]]",
+          "+I[Julian, 30000.3, 53, +I[3, 3, s3, t3]]",
+          "+I[Fabian, null, 31, +I[4, null, s4, null]]",
+          "+I[Sophia, null, 18, +I[5, null, s5, null]]",
+          "+I[Emma, null, 20, +I[6, null, s6, null]]",
+          "+I[Bob, null, 44, +I[7, null, s7, null]]",
+          "+I[Han, null, 56, +I[8, null, s8, null]]",
+          "+I[Alice, 90000.9, unknown, +I[9, 9, s9, t9]]",
       },
       new String[] {
-          "+I[uuid:id1, Danny, 10000.1]",
-          "+I[uuid:id2, Stephen, null]",
-          "+I[uuid:id3, Julian, 30000.3]",
-          "+I[uuid:id4, Fabian, null]",
-          "+I[uuid:id5, Sophia, null]",
-          "+I[uuid:id6, Emma, null]",
-          "+I[uuid:id7, Bob, null]",
-          "+I[uuid:id8, Han, null]",
-          "+I[uuid:id9, Alice, 90000.9]",
+          "+I[uuid:id0, Indica, null, null]",
+          "+I[uuid:id1, Danny, 10000.1, +I[1, 1, s1, t1]]",
+          "+I[uuid:id2, Stephen, null, +I[2, null, s2, null]]",
+          "+I[uuid:id3, Julian, 30000.3, +I[3, 3, s3, t3]]",
+          "+I[uuid:id4, Fabian, null, +I[4, null, s4, null]]",
+          "+I[uuid:id5, Sophia, null, +I[5, null, s5, null]]",
+          "+I[uuid:id6, Emma, null, +I[6, null, s6, null]]",
+          "+I[uuid:id7, Bob, null, +I[7, null, s7, null]]",
+          "+I[uuid:id8, Han, null, +I[8, null, s8, null]]",
+          "+I[uuid:id9, Alice, 90000.9, +I[9, 9, s9, t9]]",
       },
       new String[] {
           "+I[1]",
-          "-U[1]",
           "+U[2]",
-          "-U[2]",
           "+U[3]",
-          "-U[3]",
           "+U[4]",
-          "-U[4]",
           "+U[5]",
-          "-U[5]",
           "+U[6]",
-          "-U[6]",
           "+U[7]",
-          "-U[7]",
           "+U[8]",
-          "-U[8]",
           "+U[9]",
+          "+U[10]",
+          "-U[1]",
+          "-U[2]",
+          "-U[3]",
+          "-U[4]",
+          "-U[5]",
+          "-U[6]",
+          "-U[7]",
+          "-U[8]",
+          "-U[9]",
       }
   );
 
   private static final ExpectedResult EXPECTED_UNMERGED_RESULT = new 
ExpectedResult(
       new String[] {
-          "+I[Danny, null, 23]",
-          "+I[Stephen, null, 33]",
-          "+I[Julian, null, 53]",
-          "+I[Fabian, null, 31]",
-          "+I[Sophia, null, 18]",
-          "+I[Emma, null, 20]",
-          "+I[Bob, null, 44]",
-          "+I[Han, null, 56]",
-          "+I[Alice, 90000.9, unknown]",
-          "+I[Danny, 10000.1, 23]",
-          "+I[Julian, 30000.3, 53]",
+          "+I[Indica, null, 12, null]",
+          "+I[Danny, null, 23, +I[1, null, s1, null]]",
+          "+I[Stephen, null, 33, +I[2, null, s2, null]]",
+          "+I[Julian, null, 53, +I[3, null, s3, null]]",
+          "+I[Fabian, null, 31, +I[4, null, s4, null]]",
+          "+I[Sophia, null, 18, +I[5, null, s5, null]]",
+          "+I[Emma, null, 20, +I[6, null, s6, null]]",
+          "+I[Bob, null, 44, +I[7, null, s7, null]]",
+          "+I[Han, null, 56, +I[8, null, s8, null]]",
+          "+I[Alice, 90000.9, unknown, +I[9, 9, s9, t9]]",
+          "+I[Danny, 10000.1, 23, +I[1, 1, s1, t1]]",
+          "+I[Julian, 30000.3, 53, +I[3, 3, s3, t3]]",
       },
       new String[] {
-          "+I[uuid:id1, Danny, null]",
-          "+I[uuid:id2, Stephen, null]",
-          "+I[uuid:id3, Julian, null]",
-          "+I[uuid:id4, Fabian, null]",
-          "+I[uuid:id5, Sophia, null]",
-          "+I[uuid:id6, Emma, null]",
-          "+I[uuid:id7, Bob, null]",
-          "+I[uuid:id8, Han, null]",
-          "+I[uuid:id9, Alice, 90000.9]",
-          "+I[uuid:id1, Danny, 10000.1]",
-          "+I[uuid:id3, Julian, 30000.3]",
+          "+I[uuid:id0, Indica, null, null]",
+          "+I[uuid:id1, Danny, null, +I[1, null, s1, null]]",
+          "+I[uuid:id2, Stephen, null, +I[2, null, s2, null]]",
+          "+I[uuid:id3, Julian, null, +I[3, null, s3, null]]",
+          "+I[uuid:id4, Fabian, null, +I[4, null, s4, null]]",
+          "+I[uuid:id5, Sophia, null, +I[5, null, s5, null]]",
+          "+I[uuid:id6, Emma, null, +I[6, null, s6, null]]",
+          "+I[uuid:id7, Bob, null, +I[7, null, s7, null]]",
+          "+I[uuid:id8, Han, null, +I[8, null, s8, null]]",
+          "+I[uuid:id9, Alice, 90000.9, +I[9, 9, s9, t9]]",
+          "+I[uuid:id1, Danny, 10000.1, +I[1, 1, s1, t1]]",
+          "+I[uuid:id3, Julian, 30000.3, +I[3, 3, s3, t3]]",
       },
       new String[] {
           "+I[1]",
-          "-U[1]",
           "+U[2]",
-          "-U[2]",
           "+U[3]",
-          "-U[3]",
           "+U[4]",
-          "-U[4]",
           "+U[5]",
-          "-U[5]",
           "+U[6]",
-          "-U[6]",
           "+U[7]",
-          "-U[7]",
           "+U[8]",
-          "-U[8]",
           "+U[9]",
-          "-U[9]",
           "+U[10]",
           "-U[10]",
           "+U[11]",
+          "-U[11]",
+          "+U[12]",
+          "-U[1]",
+          "-U[2]",
+          "-U[3]",
+          "-U[4]",
+          "-U[5]",
+          "-U[6]",
+          "-U[7]",
+          "-U[8]",
+          "-U[9]",
       }
   );
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index 9e3557a721a..7a63eedac3f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -90,6 +90,9 @@ public class TestConfigurations {
           DataTypes.FIELD("gender", DataTypes.CHAR(1)), // removed field
           DataTypes.FIELD("age", DataTypes.INT()),
           DataTypes.FIELD("ts", DataTypes.TIMESTAMP(6)),
+          DataTypes.FIELD("f_struct", DataTypes.ROW(
+              DataTypes.FIELD("f0", DataTypes.INT()),
+              DataTypes.FIELD("f1", DataTypes.STRING()))),
           DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
       .notNull();
 
@@ -102,6 +105,11 @@ public class TestConfigurations {
           DataTypes.FIELD("last_name", DataTypes.VARCHAR(10)), // new field
           DataTypes.FIELD("salary", DataTypes.DOUBLE()), // new field
           DataTypes.FIELD("ts", DataTypes.TIMESTAMP(6)),
+          DataTypes.FIELD("f_struct", DataTypes.ROW(
+              DataTypes.FIELD("f0", DataTypes.INT()),
+              DataTypes.FIELD("f2", DataTypes.INT()), // new field added in 
the middle of struct
+              DataTypes.FIELD("f1", DataTypes.STRING()),
+              DataTypes.FIELD("f3", DataTypes.STRING()))), // new field added 
at the end of struct
           DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
       .notNull();
 
diff --git 
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 7c5d7646f8c..52d381a2187 100644
--- 
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -25,6 +25,7 @@ import 
org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
 import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
 import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
 import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
 import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
@@ -387,14 +388,20 @@ public class ParquetSplitReaderUtil {
         GroupType groupType = physicalType.asGroupType();
         List<ColumnReader> fieldReaders = new ArrayList<>();
         for (int i = 0; i < rowType.getFieldCount(); i++) {
-          fieldReaders.add(
-              createColumnReader(
-                  utcTimestamp,
-                  rowType.getTypeAt(i),
-                  groupType.getType(i),
-                  descriptors,
-                  pages,
-                  depth + 1));
+          // schema evolution: read the parquet file with a new extended field 
name.
+          int fieldIndex = 
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+          if (fieldIndex < 0) {
+            fieldReaders.add(new EmptyColumnReader());
+          } else {
+            fieldReaders.add(i,
+                createColumnReader(
+                    utcTimestamp,
+                    rowType.getTypeAt(i),
+                    groupType.getType(fieldIndex),
+                    descriptors,
+                    pages,
+                    depth + 1));
+          }
         }
         return new RowColumnReader(fieldReaders);
       default:
@@ -508,20 +515,35 @@ public class ParquetSplitReaderUtil {
       case ROW:
         RowType rowType = (RowType) fieldType;
         GroupType groupType = physicalType.asGroupType();
-        WritableColumnVector[] columnVectors =
-            new WritableColumnVector[rowType.getFieldCount()];
+        WritableColumnVector[] columnVectors = new 
WritableColumnVector[rowType.getFieldCount()];
         for (int i = 0; i < columnVectors.length; i++) {
-          columnVectors[i] =
-              createWritableColumnVector(
-                  batchSize,
-                  rowType.getTypeAt(i),
-                  groupType.getType(i),
-                  descriptors,
-                  depth + 1);
+          // schema evolution: read the file with a new extended field name.
+          int fieldIndex = 
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+          if (fieldIndex < 0) {
+            columnVectors[i] = (WritableColumnVector) 
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
+          } else {
+            columnVectors[i] =
+                createWritableColumnVector(
+                    batchSize,
+                    rowType.getTypeAt(i),
+                    groupType.getType(fieldIndex),
+                    descriptors,
+                    depth + 1);
+          }
         }
         return new HeapRowColumnVector(batchSize, columnVectors);
       default:
         throw new UnsupportedOperationException(fieldType + " is not supported 
now.");
     }
   }
+
+  /**
+   * Returns the field index with given physical row type {@code groupType} 
and field name {@code fieldName}.
+   *
+   * @return The physical field index or -1 if the field does not exist
+   */
+  private static int getFieldIndexInPhysicalType(String fieldName, GroupType 
groupType) {
+    // get index from fileSchema type, else, return -1
+    return groupType.containsField(fieldName) ? 
groupType.getFieldIndex(fieldName) : -1;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
 
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
new file mode 100644
index 00000000000..8be29289bba
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+import java.io.IOException;
+
+/**
+ * Empty {@link ColumnReader}.
+ * <p>
+ * This reader is to handle parquet files that have not been updated to the 
latest Schema.
+ * When reading a parquet file with the latest schema, parquet file might not 
have the new field.
+ * The EmptyColumnReader is used to handle such scenarios.
+ */
+public class EmptyColumnReader implements ColumnReader<WritableColumnVector> {
+
+  public EmptyColumnReader() {}
+
+  @Override
+  public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
+    vector.fillWithNulls();
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 4de234ccda1..1f2a2f453b9 100644
--- 
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -24,6 +24,7 @@ import 
org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
 import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
 import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
 import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
 import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
@@ -387,14 +388,20 @@ public class ParquetSplitReaderUtil {
         GroupType groupType = physicalType.asGroupType();
         List<ColumnReader> fieldReaders = new ArrayList<>();
         for (int i = 0; i < rowType.getFieldCount(); i++) {
-          fieldReaders.add(
-              createColumnReader(
-                  utcTimestamp,
-                  rowType.getTypeAt(i),
-                  groupType.getType(i),
-                  descriptors,
-                  pages,
-                  depth + 1));
+          // schema evolution: read the parquet file with a new extended field 
name.
+          int fieldIndex = 
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+          if (fieldIndex < 0) {
+            fieldReaders.add(new EmptyColumnReader());
+          } else {
+            fieldReaders.add(i,
+                createColumnReader(
+                    utcTimestamp,
+                    rowType.getTypeAt(i),
+                    groupType.getType(fieldIndex),
+                    descriptors,
+                    pages,
+                    depth + 1));
+          }
         }
         return new RowColumnReader(fieldReaders);
       default:
@@ -508,20 +515,35 @@ public class ParquetSplitReaderUtil {
       case ROW:
         RowType rowType = (RowType) fieldType;
         GroupType groupType = physicalType.asGroupType();
-        WritableColumnVector[] columnVectors =
-            new WritableColumnVector[rowType.getFieldCount()];
+        WritableColumnVector[] columnVectors = new 
WritableColumnVector[rowType.getFieldCount()];
         for (int i = 0; i < columnVectors.length; i++) {
-          columnVectors[i] =
-              createWritableColumnVector(
-                  batchSize,
-                  rowType.getTypeAt(i),
-                  groupType.getType(i),
-                  descriptors,
-                  depth + 1);
+          // schema evolution: read the file with a new extended field name.
+          int fieldIndex = 
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+          if (fieldIndex < 0) {
+            columnVectors[i] = (WritableColumnVector) 
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
+          } else {
+            columnVectors[i] =
+                createWritableColumnVector(
+                    batchSize,
+                    rowType.getTypeAt(i),
+                    groupType.getType(fieldIndex),
+                    descriptors,
+                    depth + 1);
+          }
         }
         return new HeapRowColumnVector(batchSize, columnVectors);
       default:
         throw new UnsupportedOperationException(fieldType + " is not supported 
now.");
     }
   }
+
+  /**
+   * Returns the field index with given physical row type {@code groupType} 
and field name {@code fieldName}.
+   *
+   * @return The physical field index or -1 if the field does not exist
+   */
+  private static int getFieldIndexInPhysicalType(String fieldName, GroupType 
groupType) {
+    // get index from fileSchema type, else, return -1
+    return groupType.containsField(fieldName) ? 
groupType.getFieldIndex(fieldName) : -1;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
new file mode 100644
index 00000000000..8be29289bba
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+import java.io.IOException;
+
+/**
+ * Empty {@link ColumnReader}.
+ * <p>
+ * This reader is to handle parquet files that have not been updated to the 
latest Schema.
+ * When reading a parquet file with the latest schema, parquet file might not 
have the new field.
+ * The EmptyColumnReader is used to handle such scenarios.
+ */
+public class EmptyColumnReader implements ColumnReader<WritableColumnVector> {
+
+  public EmptyColumnReader() {}
+
+  @Override
+  public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
+    vector.fillWithNulls();
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index e1f6f9ce72f..d38af61420d 100644
--- 
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -24,6 +24,7 @@ import 
org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
 import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
 import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
 import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
 import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
@@ -387,14 +388,20 @@ public class ParquetSplitReaderUtil {
         GroupType groupType = physicalType.asGroupType();
         List<ColumnReader> fieldReaders = new ArrayList<>();
         for (int i = 0; i < rowType.getFieldCount(); i++) {
-          fieldReaders.add(
-              createColumnReader(
-                  utcTimestamp,
-                  rowType.getTypeAt(i),
-                  groupType.getType(i),
-                  descriptors,
-                  pages,
-                  depth + 1));
+          // schema evolution: read the parquet file with a new extended field 
name.
+          int fieldIndex = 
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+          if (fieldIndex < 0) {
+            fieldReaders.add(new EmptyColumnReader());
+          } else {
+            fieldReaders.add(i,
+                createColumnReader(
+                    utcTimestamp,
+                    rowType.getTypeAt(i),
+                    groupType.getType(fieldIndex),
+                    descriptors,
+                    pages,
+                    depth + 1));
+          }
         }
         return new RowColumnReader(fieldReaders);
       default:
@@ -508,20 +515,35 @@ public class ParquetSplitReaderUtil {
       case ROW:
         RowType rowType = (RowType) fieldType;
         GroupType groupType = physicalType.asGroupType();
-        WritableColumnVector[] columnVectors =
-            new WritableColumnVector[rowType.getFieldCount()];
+        WritableColumnVector[] columnVectors = new 
WritableColumnVector[rowType.getFieldCount()];
         for (int i = 0; i < columnVectors.length; i++) {
-          columnVectors[i] =
-              createWritableColumnVector(
-                  batchSize,
-                  rowType.getTypeAt(i),
-                  groupType.getType(i),
-                  descriptors,
-                  depth + 1);
+          // schema evolution: read the file with a new extended field name.
+          int fieldIndex = 
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+          if (fieldIndex < 0) {
+            columnVectors[i] = (WritableColumnVector) 
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
+          } else {
+            columnVectors[i] =
+                createWritableColumnVector(
+                    batchSize,
+                    rowType.getTypeAt(i),
+                    groupType.getType(fieldIndex),
+                    descriptors,
+                    depth + 1);
+          }
         }
         return new HeapRowColumnVector(batchSize, columnVectors);
       default:
         throw new UnsupportedOperationException(fieldType + " is not supported 
now.");
     }
   }
+
+  /**
+   * Returns the field index with given physical row type {@code groupType} 
and field name {@code fieldName}.
+   *
+   * @return The physical field index or -1 if the field does not exist
+   */
+  private static int getFieldIndexInPhysicalType(String fieldName, GroupType 
groupType) {
+    // get index from fileSchema type, else, return -1
+    return groupType.containsField(fieldName) ? 
groupType.getFieldIndex(fieldName) : -1;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
new file mode 100644
index 00000000000..6ea610bf2af
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+
+import java.io.IOException;
+
+/**
+ * Empty {@link ColumnReader}.
+ * <p>
+ * This reader is to handle parquet files that have not been updated to the 
latest Schema.
+ * When reading a parquet file with the latest schema, parquet file might not 
have the new field.
+ * The EmptyColumnReader is used to handle such scenarios.
+ */
+public class EmptyColumnReader implements ColumnReader<WritableColumnVector> {
+
+  public EmptyColumnReader() {}
+
+  @Override
+  public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
+    vector.fillWithNulls();
+  }
+}
+
diff --git 
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index e1f6f9ce72f..d38af61420d 100644
--- 
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -24,6 +24,7 @@ import 
org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
 import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
 import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
 import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
 import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
@@ -387,14 +388,20 @@ public class ParquetSplitReaderUtil {
         GroupType groupType = physicalType.asGroupType();
         List<ColumnReader> fieldReaders = new ArrayList<>();
         for (int i = 0; i < rowType.getFieldCount(); i++) {
-          fieldReaders.add(
-              createColumnReader(
-                  utcTimestamp,
-                  rowType.getTypeAt(i),
-                  groupType.getType(i),
-                  descriptors,
-                  pages,
-                  depth + 1));
+          // schema evolution: read the parquet file with a new extended field 
name.
+          int fieldIndex = 
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+          if (fieldIndex < 0) {
+            fieldReaders.add(new EmptyColumnReader());
+          } else {
+            fieldReaders.add(i,
+                createColumnReader(
+                    utcTimestamp,
+                    rowType.getTypeAt(i),
+                    groupType.getType(fieldIndex),
+                    descriptors,
+                    pages,
+                    depth + 1));
+          }
         }
         return new RowColumnReader(fieldReaders);
       default:
@@ -508,20 +515,35 @@ public class ParquetSplitReaderUtil {
       case ROW:
         RowType rowType = (RowType) fieldType;
         GroupType groupType = physicalType.asGroupType();
-        WritableColumnVector[] columnVectors =
-            new WritableColumnVector[rowType.getFieldCount()];
+        WritableColumnVector[] columnVectors = new 
WritableColumnVector[rowType.getFieldCount()];
         for (int i = 0; i < columnVectors.length; i++) {
-          columnVectors[i] =
-              createWritableColumnVector(
-                  batchSize,
-                  rowType.getTypeAt(i),
-                  groupType.getType(i),
-                  descriptors,
-                  depth + 1);
+          // schema evolution: read the file with a new extended field name.
+          int fieldIndex = 
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+          if (fieldIndex < 0) {
+            columnVectors[i] = (WritableColumnVector) 
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
+          } else {
+            columnVectors[i] =
+                createWritableColumnVector(
+                    batchSize,
+                    rowType.getTypeAt(i),
+                    groupType.getType(fieldIndex),
+                    descriptors,
+                    depth + 1);
+          }
         }
         return new HeapRowColumnVector(batchSize, columnVectors);
       default:
         throw new UnsupportedOperationException(fieldType + " is not supported 
now.");
     }
   }
+
+  /**
+   * Returns the field index with given physical row type {@code groupType} 
and field name {@code fieldName}.
+   *
+   * @return The physical field index or -1 if the field does not exist
+   */
+  private static int getFieldIndexInPhysicalType(String fieldName, GroupType 
groupType) {
+    // get index from fileSchema type, else, return -1
+    return groupType.containsField(fieldName) ? 
groupType.getFieldIndex(fieldName) : -1;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
 
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
new file mode 100644
index 00000000000..6ea610bf2af
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+
+import java.io.IOException;
+
+/**
+ * Empty {@link ColumnReader}.
+ * <p>
+ * This reader is to handle parquet files that have not been updated to the 
latest Schema.
+ * When reading a parquet file with the latest schema, parquet file might not 
have the new field.
+ * The EmptyColumnReader is used to handle such scenarios.
+ */
+public class EmptyColumnReader implements ColumnReader<WritableColumnVector> {
+
+  public EmptyColumnReader() {}
+
+  @Override
+  public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
+    vector.fillWithNulls();
+  }
+}
+
diff --git 
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index e1f6f9ce72f..d38af61420d 100644
--- 
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -24,6 +24,7 @@ import 
org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
 import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
 import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
 import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
 import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
@@ -387,14 +388,20 @@ public class ParquetSplitReaderUtil {
         GroupType groupType = physicalType.asGroupType();
         List<ColumnReader> fieldReaders = new ArrayList<>();
         for (int i = 0; i < rowType.getFieldCount(); i++) {
-          fieldReaders.add(
-              createColumnReader(
-                  utcTimestamp,
-                  rowType.getTypeAt(i),
-                  groupType.getType(i),
-                  descriptors,
-                  pages,
-                  depth + 1));
+          // schema evolution: read the parquet file with a new extended field 
name.
+          int fieldIndex = 
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+          if (fieldIndex < 0) {
+            fieldReaders.add(new EmptyColumnReader());
+          } else {
+            fieldReaders.add(i,
+                createColumnReader(
+                    utcTimestamp,
+                    rowType.getTypeAt(i),
+                    groupType.getType(fieldIndex),
+                    descriptors,
+                    pages,
+                    depth + 1));
+          }
         }
         return new RowColumnReader(fieldReaders);
       default:
@@ -508,20 +515,35 @@ public class ParquetSplitReaderUtil {
       case ROW:
         RowType rowType = (RowType) fieldType;
         GroupType groupType = physicalType.asGroupType();
-        WritableColumnVector[] columnVectors =
-            new WritableColumnVector[rowType.getFieldCount()];
+        WritableColumnVector[] columnVectors = new 
WritableColumnVector[rowType.getFieldCount()];
         for (int i = 0; i < columnVectors.length; i++) {
-          columnVectors[i] =
-              createWritableColumnVector(
-                  batchSize,
-                  rowType.getTypeAt(i),
-                  groupType.getType(i),
-                  descriptors,
-                  depth + 1);
+          // schema evolution: read the file with a new extended field name.
+          int fieldIndex = 
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+          if (fieldIndex < 0) {
+            columnVectors[i] = (WritableColumnVector) 
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
+          } else {
+            columnVectors[i] =
+                createWritableColumnVector(
+                    batchSize,
+                    rowType.getTypeAt(i),
+                    groupType.getType(fieldIndex),
+                    descriptors,
+                    depth + 1);
+          }
         }
         return new HeapRowColumnVector(batchSize, columnVectors);
       default:
         throw new UnsupportedOperationException(fieldType + " is not supported 
now.");
     }
   }
+
+  /**
+   * Returns the field index with given physical row type {@code groupType} 
and field name {@code fieldName}.
+   *
+   * @return The physical field index or -1 if the field does not exist
+   */
+  private static int getFieldIndexInPhysicalType(String fieldName, GroupType 
groupType) {
+    // get index from fileSchema type, else, return -1
+    return groupType.containsField(fieldName) ? 
groupType.getFieldIndex(fieldName) : -1;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
new file mode 100644
index 00000000000..6ea610bf2af
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+
+import java.io.IOException;
+
+/**
+ * Empty {@link ColumnReader}.
+ * <p>
+ * This reader is to handle parquet files that have not been updated to the 
latest Schema.
+ * When reading a parquet file with the latest schema, parquet file might not 
have the new field.
+ * The EmptyColumnReader is used to handle such scenarios.
+ */
+public class EmptyColumnReader implements ColumnReader<WritableColumnVector> {
+
+  public EmptyColumnReader() {}
+
+  @Override
+  public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
+    vector.fillWithNulls();
+  }
+}
+

Reply via email to