This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 014168b948a [HUDI-6211] Fix reading of schema-evolved complex columns
for Flink (#8711)
014168b948a is described below
commit 014168b948a5a1ca88ea3e8cca213dfd46cef3f4
Author: voonhous <[email protected]>
AuthorDate: Wed May 17 16:45:06 2023 +0800
[HUDI-6211] Fix reading of schema-evolved complex columns for Flink (#8711)
---
.../apache/hudi/table/ITTestSchemaEvolution.java | 202 +++++++++++++--------
.../org/apache/hudi/utils/TestConfigurations.java | 8 +
.../table/format/cow/ParquetSplitReaderUtil.java | 56 ++++--
.../cow/vector/reader/EmptyColumnReader.java | 41 +++++
.../table/format/cow/ParquetSplitReaderUtil.java | 56 ++++--
.../cow/vector/reader/EmptyColumnReader.java | 41 +++++
.../table/format/cow/ParquetSplitReaderUtil.java | 56 ++++--
.../cow/vector/reader/EmptyColumnReader.java | 42 +++++
.../table/format/cow/ParquetSplitReaderUtil.java | 56 ++++--
.../cow/vector/reader/EmptyColumnReader.java | 42 +++++
.../table/format/cow/ParquetSplitReaderUtil.java | 56 ++++--
.../cow/vector/reader/EmptyColumnReader.java | 42 +++++
12 files changed, 533 insertions(+), 165 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
index 1b0fd3a342e..0372570c3cb 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
@@ -34,7 +34,6 @@ import org.apache.hudi.utils.FlinkMiniCluster;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
-
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
@@ -43,7 +42,6 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
-
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -57,13 +55,17 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static
org.apache.hudi.internal.schema.action.TableChange.ColumnPositionChange.ColumnPositionType.AFTER;
import static
org.apache.hudi.internal.schema.action.TableChange.ColumnPositionChange.ColumnPositionType.BEFORE;
import static
org.apache.hudi.utils.TestConfigurations.ROW_TYPE_EVOLUTION_AFTER;
import static
org.apache.hudi.utils.TestConfigurations.ROW_TYPE_EVOLUTION_BEFORE;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
@SuppressWarnings({"SqlDialectInspection", "SqlNoDataSourceInspection"})
@@ -173,6 +175,7 @@ public class ITTestSchemaEvolution {
+ " gender char,"
+ " age int,"
+ " ts timestamp,"
+ + " f_struct row<f0 int, f1 string>,"
+ " `partition` string"
+ ") partitioned by (`partition`) with (" + tableOptions + ")"
);
@@ -184,17 +187,19 @@ public class ITTestSchemaEvolution {
+ " cast(gender as char),"
+ " cast(age as int),"
+ " cast(ts as timestamp),"
+ + " cast(f_struct as row<f0 int, f1 string>),"
+ " cast(`partition` as string) "
+ "from (values "
- + " ('id1', 'Danny', 'M', 23, '2000-01-01 00:00:01', 'par1'),"
- + " ('id2', 'Stephen', 'M', 33, '2000-01-01 00:00:02', 'par1'),"
- + " ('id3', 'Julian', 'M', 53, '2000-01-01 00:00:03', 'par2'),"
- + " ('id4', 'Fabian', 'M', 31, '2000-01-01 00:00:04', 'par2'),"
- + " ('id5', 'Sophia', 'F', 18, '2000-01-01 00:00:05', 'par3'),"
- + " ('id6', 'Emma', 'F', 20, '2000-01-01 00:00:06', 'par3'),"
- + " ('id7', 'Bob', 'M', 44, '2000-01-01 00:00:07', 'par4'),"
- + " ('id8', 'Han', 'M', 56, '2000-01-01 00:00:08', 'par4')"
- + ") as A(uuid, name, gender, age, ts, `partition`)"
+ + " ('id0', 'Indica', 'F', 12, '2000-01-01 00:00:00', cast(null as
row<f0 int, f1 string>), 'par0'),"
+ + " ('id1', 'Danny', 'M', 23, '2000-01-01 00:00:01', row(1, 's1'),
'par1'),"
+ + " ('id2', 'Stephen', 'M', 33, '2000-01-01 00:00:02', row(2, 's2'),
'par1'),"
+ + " ('id3', 'Julian', 'M', 53, '2000-01-01 00:00:03', row(3, 's3'),
'par2'),"
+ + " ('id4', 'Fabian', 'M', 31, '2000-01-01 00:00:04', row(4, 's4'),
'par2'),"
+ + " ('id5', 'Sophia', 'F', 18, '2000-01-01 00:00:05', row(5, 's5'),
'par3'),"
+ + " ('id6', 'Emma', 'F', 20, '2000-01-01 00:00:06', row(6, 's6'),
'par3'),"
+ + " ('id7', 'Bob', 'M', 44, '2000-01-01 00:00:07', row(7, 's7'),
'par4'),"
+ + " ('id8', 'Han', 'M', 56, '2000-01-01 00:00:08', row(8, 's8'),
'par4')"
+ + ") as A(uuid, name, gender, age, ts, f_struct, `partition`)"
).await();
}
@@ -204,6 +209,7 @@ public class ITTestSchemaEvolution {
Option<String> compactionInstant =
writeClient.scheduleCompaction(Option.empty());
writeClient.compact(compactionInstant.get());
}
+ Schema intType =
SchemaBuilder.unionOf().nullType().and().intType().endUnion();
Schema doubleType =
SchemaBuilder.unionOf().nullType().and().doubleType().endUnion();
Schema stringType =
SchemaBuilder.unionOf().nullType().and().stringType().endUnion();
writeClient.addColumn("salary", doubleType, null, "name", AFTER);
@@ -212,6 +218,11 @@ public class ITTestSchemaEvolution {
writeClient.updateColumnType("age", Types.StringType.get());
writeClient.addColumn("last_name", stringType, "empty allowed",
"salary", BEFORE);
writeClient.reOrderColPosition("age", "first_name", BEFORE);
+ // add a field in the middle of the `f_struct` column
+ writeClient.addColumn("f_struct.f2", intType, "add field in middle of
struct", "f_struct.f0", AFTER);
+ // add a field at the end of `f_struct` column
+ writeClient.addColumn("f_struct.f3", stringType);
+
}
}
@@ -231,6 +242,7 @@ public class ITTestSchemaEvolution {
+ " last_name string,"
+ " salary double,"
+ " ts timestamp,"
+ + " f_struct row<f0 int, f2 int, f1 string, f3 string>,"
+ " `partition` string"
+ ") partitioned by (`partition`) with (" + tableOptions + ")"
);
@@ -243,12 +255,13 @@ public class ITTestSchemaEvolution {
+ " cast(last_name as string),"
+ " cast(salary as double),"
+ " cast(ts as timestamp),"
+ + " cast(f_struct as row<f0 int, f2 int, f1 string, f3 string>),"
+ " cast(`partition` as string) "
+ "from (values "
- + " ('id1', '23', 'Danny', '', 10000.1, '2000-01-01 00:00:01',
'par1'),"
- + " ('id9', 'unknown', 'Alice', '', 90000.9, '2000-01-01 00:00:09',
'par1'),"
- + " ('id3', '53', 'Julian', '', 30000.3, '2000-01-01 00:00:03',
'par2')"
- + ") as A(uuid, age, first_name, last_name, salary, ts, `partition`)"
+ + " ('id1', '23', 'Danny', '', 10000.1, '2000-01-01 00:00:01', row(1,
1, 's1', 't1'), 'par1'),"
+ + " ('id9', 'unknown', 'Alice', '', 90000.9, '2000-01-01 00:00:09',
row(9, 9, 's9', 't9'), 'par1'),"
+ + " ('id3', '53', 'Julian', '', 30000.3, '2000-01-01 00:00:03',
row(3, 3, 's3', 't3'), 'par2')"
+ + ") as A(uuid, age, first_name, last_name, salary, ts, f_struct,
`partition`)"
).await();
}
@@ -278,7 +291,7 @@ public class ITTestSchemaEvolution {
private void checkAnswerEvolved(String... expectedResult) throws Exception {
//language=SQL
- checkAnswer("select first_name, salary, age from t1", expectedResult);
+ checkAnswer("select first_name, salary, age, f_struct from t1",
expectedResult);
}
private void checkAnswerCount(String... expectedResult) throws Exception {
@@ -303,22 +316,43 @@ public class ITTestSchemaEvolution {
+ " last_name string,"
+ " salary double,"
+ " ts timestamp,"
+ + " f_struct row<f0 int, f2 int, f1 string, f3 string>,"
+ " `partition` string"
+ ") partitioned by (`partition`) with (" + tableOptions + ")"
);
//language=SQL
- checkAnswer("select `_hoodie_record_key`, first_name, salary from t1",
expectedResult);
+ checkAnswer("select `_hoodie_record_key`, first_name, salary, f_struct
from t1", expectedResult);
}
- private void checkAnswer(String query, String... expectedResult) throws
Exception {
+ private void checkAnswer(String query, String... expectedResult) {
TableResult actualResult = tEnv.executeSql(query);
Set<String> expected = new HashSet<>(Arrays.asList(expectedResult));
- Set<String> actual = new HashSet<>(expected.size());
- try (CloseableIterator<Row> iterator = actualResult.collect()) {
- for (int i = 0; i < expected.size() && iterator.hasNext(); i++) {
- actual.add(iterator.next().toString());
+ Set<String> actual = new HashSet<>();
+
+ // create a runnable to handle reads (especially useful for streaming
reads as they are unbounded)
+ Runnable runnable = () -> {
+ try (CloseableIterator<Row> iterator = actualResult.collect()) {
+ while (iterator.hasNext()) {
+ actual.add(iterator.next().toString());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
+ };
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future future = executor.submit(runnable);
+ try {
+ // allow result collector to run for a short period of time
+ future.get(5, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ future.cancel(true);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ executor.shutdownNow();
}
+
assertEquals(expected, actual);
}
@@ -366,97 +400,105 @@ public class ITTestSchemaEvolution {
private static final ExpectedResult EXPECTED_MERGED_RESULT = new
ExpectedResult(
new String[] {
- "+I[Danny, 10000.1, 23]",
- "+I[Stephen, null, 33]",
- "+I[Julian, 30000.3, 53]",
- "+I[Fabian, null, 31]",
- "+I[Sophia, null, 18]",
- "+I[Emma, null, 20]",
- "+I[Bob, null, 44]",
- "+I[Han, null, 56]",
- "+I[Alice, 90000.9, unknown]",
+ "+I[Indica, null, 12, null]",
+ "+I[Danny, 10000.1, 23, +I[1, 1, s1, t1]]",
+ "+I[Stephen, null, 33, +I[2, null, s2, null]]",
+ "+I[Julian, 30000.3, 53, +I[3, 3, s3, t3]]",
+ "+I[Fabian, null, 31, +I[4, null, s4, null]]",
+ "+I[Sophia, null, 18, +I[5, null, s5, null]]",
+ "+I[Emma, null, 20, +I[6, null, s6, null]]",
+ "+I[Bob, null, 44, +I[7, null, s7, null]]",
+ "+I[Han, null, 56, +I[8, null, s8, null]]",
+ "+I[Alice, 90000.9, unknown, +I[9, 9, s9, t9]]",
},
new String[] {
- "+I[uuid:id1, Danny, 10000.1]",
- "+I[uuid:id2, Stephen, null]",
- "+I[uuid:id3, Julian, 30000.3]",
- "+I[uuid:id4, Fabian, null]",
- "+I[uuid:id5, Sophia, null]",
- "+I[uuid:id6, Emma, null]",
- "+I[uuid:id7, Bob, null]",
- "+I[uuid:id8, Han, null]",
- "+I[uuid:id9, Alice, 90000.9]",
+ "+I[uuid:id0, Indica, null, null]",
+ "+I[uuid:id1, Danny, 10000.1, +I[1, 1, s1, t1]]",
+ "+I[uuid:id2, Stephen, null, +I[2, null, s2, null]]",
+ "+I[uuid:id3, Julian, 30000.3, +I[3, 3, s3, t3]]",
+ "+I[uuid:id4, Fabian, null, +I[4, null, s4, null]]",
+ "+I[uuid:id5, Sophia, null, +I[5, null, s5, null]]",
+ "+I[uuid:id6, Emma, null, +I[6, null, s6, null]]",
+ "+I[uuid:id7, Bob, null, +I[7, null, s7, null]]",
+ "+I[uuid:id8, Han, null, +I[8, null, s8, null]]",
+ "+I[uuid:id9, Alice, 90000.9, +I[9, 9, s9, t9]]",
},
new String[] {
"+I[1]",
- "-U[1]",
"+U[2]",
- "-U[2]",
"+U[3]",
- "-U[3]",
"+U[4]",
- "-U[4]",
"+U[5]",
- "-U[5]",
"+U[6]",
- "-U[6]",
"+U[7]",
- "-U[7]",
"+U[8]",
- "-U[8]",
"+U[9]",
+ "+U[10]",
+ "-U[1]",
+ "-U[2]",
+ "-U[3]",
+ "-U[4]",
+ "-U[5]",
+ "-U[6]",
+ "-U[7]",
+ "-U[8]",
+ "-U[9]",
}
);
private static final ExpectedResult EXPECTED_UNMERGED_RESULT = new
ExpectedResult(
new String[] {
- "+I[Danny, null, 23]",
- "+I[Stephen, null, 33]",
- "+I[Julian, null, 53]",
- "+I[Fabian, null, 31]",
- "+I[Sophia, null, 18]",
- "+I[Emma, null, 20]",
- "+I[Bob, null, 44]",
- "+I[Han, null, 56]",
- "+I[Alice, 90000.9, unknown]",
- "+I[Danny, 10000.1, 23]",
- "+I[Julian, 30000.3, 53]",
+ "+I[Indica, null, 12, null]",
+ "+I[Danny, null, 23, +I[1, null, s1, null]]",
+ "+I[Stephen, null, 33, +I[2, null, s2, null]]",
+ "+I[Julian, null, 53, +I[3, null, s3, null]]",
+ "+I[Fabian, null, 31, +I[4, null, s4, null]]",
+ "+I[Sophia, null, 18, +I[5, null, s5, null]]",
+ "+I[Emma, null, 20, +I[6, null, s6, null]]",
+ "+I[Bob, null, 44, +I[7, null, s7, null]]",
+ "+I[Han, null, 56, +I[8, null, s8, null]]",
+ "+I[Alice, 90000.9, unknown, +I[9, 9, s9, t9]]",
+ "+I[Danny, 10000.1, 23, +I[1, 1, s1, t1]]",
+ "+I[Julian, 30000.3, 53, +I[3, 3, s3, t3]]",
},
new String[] {
- "+I[uuid:id1, Danny, null]",
- "+I[uuid:id2, Stephen, null]",
- "+I[uuid:id3, Julian, null]",
- "+I[uuid:id4, Fabian, null]",
- "+I[uuid:id5, Sophia, null]",
- "+I[uuid:id6, Emma, null]",
- "+I[uuid:id7, Bob, null]",
- "+I[uuid:id8, Han, null]",
- "+I[uuid:id9, Alice, 90000.9]",
- "+I[uuid:id1, Danny, 10000.1]",
- "+I[uuid:id3, Julian, 30000.3]",
+ "+I[uuid:id0, Indica, null, null]",
+ "+I[uuid:id1, Danny, null, +I[1, null, s1, null]]",
+ "+I[uuid:id2, Stephen, null, +I[2, null, s2, null]]",
+ "+I[uuid:id3, Julian, null, +I[3, null, s3, null]]",
+ "+I[uuid:id4, Fabian, null, +I[4, null, s4, null]]",
+ "+I[uuid:id5, Sophia, null, +I[5, null, s5, null]]",
+ "+I[uuid:id6, Emma, null, +I[6, null, s6, null]]",
+ "+I[uuid:id7, Bob, null, +I[7, null, s7, null]]",
+ "+I[uuid:id8, Han, null, +I[8, null, s8, null]]",
+ "+I[uuid:id9, Alice, 90000.9, +I[9, 9, s9, t9]]",
+ "+I[uuid:id1, Danny, 10000.1, +I[1, 1, s1, t1]]",
+ "+I[uuid:id3, Julian, 30000.3, +I[3, 3, s3, t3]]",
},
new String[] {
"+I[1]",
- "-U[1]",
"+U[2]",
- "-U[2]",
"+U[3]",
- "-U[3]",
"+U[4]",
- "-U[4]",
"+U[5]",
- "-U[5]",
"+U[6]",
- "-U[6]",
"+U[7]",
- "-U[7]",
"+U[8]",
- "-U[8]",
"+U[9]",
- "-U[9]",
"+U[10]",
"-U[10]",
"+U[11]",
+ "-U[11]",
+ "+U[12]",
+ "-U[1]",
+ "-U[2]",
+ "-U[3]",
+ "-U[4]",
+ "-U[5]",
+ "-U[6]",
+ "-U[7]",
+ "-U[8]",
+ "-U[9]",
}
);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index 9e3557a721a..7a63eedac3f 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -90,6 +90,9 @@ public class TestConfigurations {
DataTypes.FIELD("gender", DataTypes.CHAR(1)), // removed field
DataTypes.FIELD("age", DataTypes.INT()),
DataTypes.FIELD("ts", DataTypes.TIMESTAMP(6)),
+ DataTypes.FIELD("f_struct", DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f1", DataTypes.STRING()))),
DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
.notNull();
@@ -102,6 +105,11 @@ public class TestConfigurations {
DataTypes.FIELD("last_name", DataTypes.VARCHAR(10)), // new field
DataTypes.FIELD("salary", DataTypes.DOUBLE()), // new field
DataTypes.FIELD("ts", DataTypes.TIMESTAMP(6)),
+ DataTypes.FIELD("f_struct", DataTypes.ROW(
+ DataTypes.FIELD("f0", DataTypes.INT()),
+ DataTypes.FIELD("f2", DataTypes.INT()), // new field added in
the middle of struct
+ DataTypes.FIELD("f1", DataTypes.STRING()),
+ DataTypes.FIELD("f3", DataTypes.STRING()))), // new field added
at the end of struct
DataTypes.FIELD("partition", DataTypes.VARCHAR(10)))
.notNull();
diff --git
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 7c5d7646f8c..52d381a2187 100644
---
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -25,6 +25,7 @@ import
org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
@@ -387,14 +388,20 @@ public class ParquetSplitReaderUtil {
GroupType groupType = physicalType.asGroupType();
List<ColumnReader> fieldReaders = new ArrayList<>();
for (int i = 0; i < rowType.getFieldCount(); i++) {
- fieldReaders.add(
- createColumnReader(
- utcTimestamp,
- rowType.getTypeAt(i),
- groupType.getType(i),
- descriptors,
- pages,
- depth + 1));
+ // schema evolution: read the parquet file with a new extended field
name.
+ int fieldIndex =
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+ if (fieldIndex < 0) {
+ fieldReaders.add(new EmptyColumnReader());
+ } else {
+ fieldReaders.add(i,
+ createColumnReader(
+ utcTimestamp,
+ rowType.getTypeAt(i),
+ groupType.getType(fieldIndex),
+ descriptors,
+ pages,
+ depth + 1));
+ }
}
return new RowColumnReader(fieldReaders);
default:
@@ -508,20 +515,35 @@ public class ParquetSplitReaderUtil {
case ROW:
RowType rowType = (RowType) fieldType;
GroupType groupType = physicalType.asGroupType();
- WritableColumnVector[] columnVectors =
- new WritableColumnVector[rowType.getFieldCount()];
+ WritableColumnVector[] columnVectors = new
WritableColumnVector[rowType.getFieldCount()];
for (int i = 0; i < columnVectors.length; i++) {
- columnVectors[i] =
- createWritableColumnVector(
- batchSize,
- rowType.getTypeAt(i),
- groupType.getType(i),
- descriptors,
- depth + 1);
+ // schema evolution: read the file with a new extended field name.
+ int fieldIndex =
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+ if (fieldIndex < 0) {
+ columnVectors[i] = (WritableColumnVector)
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
+ } else {
+ columnVectors[i] =
+ createWritableColumnVector(
+ batchSize,
+ rowType.getTypeAt(i),
+ groupType.getType(fieldIndex),
+ descriptors,
+ depth + 1);
+ }
}
return new HeapRowColumnVector(batchSize, columnVectors);
default:
throw new UnsupportedOperationException(fieldType + " is not supported
now.");
}
}
+
+ /**
+ * Returns the field index with given physical row type {@code groupType}
and field name {@code fieldName}.
+ *
+ * @return The physical field index or -1 if the field does not exist
+ */
+ private static int getFieldIndexInPhysicalType(String fieldName, GroupType
groupType) {
+ // get index from fileSchema type, else, return -1
+ return groupType.containsField(fieldName) ?
groupType.getFieldIndex(fieldName) : -1;
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
new file mode 100644
index 00000000000..8be29289bba
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.13.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+import java.io.IOException;
+
+/**
+ * Empty {@link ColumnReader}.
+ * <p>
+ * This reader is to handle parquet files that have not been updated to the
latest Schema.
+ * When reading a parquet file with the latest schema, parquet file might not
have the new field.
+ * The EmptyColumnReader is used to handle such scenarios.
+ */
+public class EmptyColumnReader implements ColumnReader<WritableColumnVector> {
+
+ public EmptyColumnReader() {}
+
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector) throws
IOException {
+ vector.fillWithNulls();
+ }
+}
\ No newline at end of file
diff --git
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 4de234ccda1..1f2a2f453b9 100644
---
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -24,6 +24,7 @@ import
org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
@@ -387,14 +388,20 @@ public class ParquetSplitReaderUtil {
GroupType groupType = physicalType.asGroupType();
List<ColumnReader> fieldReaders = new ArrayList<>();
for (int i = 0; i < rowType.getFieldCount(); i++) {
- fieldReaders.add(
- createColumnReader(
- utcTimestamp,
- rowType.getTypeAt(i),
- groupType.getType(i),
- descriptors,
- pages,
- depth + 1));
+ // schema evolution: read the parquet file with a new extended field
name.
+ int fieldIndex =
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+ if (fieldIndex < 0) {
+ fieldReaders.add(new EmptyColumnReader());
+ } else {
+ fieldReaders.add(i,
+ createColumnReader(
+ utcTimestamp,
+ rowType.getTypeAt(i),
+ groupType.getType(fieldIndex),
+ descriptors,
+ pages,
+ depth + 1));
+ }
}
return new RowColumnReader(fieldReaders);
default:
@@ -508,20 +515,35 @@ public class ParquetSplitReaderUtil {
case ROW:
RowType rowType = (RowType) fieldType;
GroupType groupType = physicalType.asGroupType();
- WritableColumnVector[] columnVectors =
- new WritableColumnVector[rowType.getFieldCount()];
+ WritableColumnVector[] columnVectors = new
WritableColumnVector[rowType.getFieldCount()];
for (int i = 0; i < columnVectors.length; i++) {
- columnVectors[i] =
- createWritableColumnVector(
- batchSize,
- rowType.getTypeAt(i),
- groupType.getType(i),
- descriptors,
- depth + 1);
+ // schema evolution: read the file with a new extended field name.
+ int fieldIndex =
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+ if (fieldIndex < 0) {
+ columnVectors[i] = (WritableColumnVector)
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
+ } else {
+ columnVectors[i] =
+ createWritableColumnVector(
+ batchSize,
+ rowType.getTypeAt(i),
+ groupType.getType(fieldIndex),
+ descriptors,
+ depth + 1);
+ }
}
return new HeapRowColumnVector(batchSize, columnVectors);
default:
throw new UnsupportedOperationException(fieldType + " is not supported
now.");
}
}
+
+ /**
+ * Returns the field index with given physical row type {@code groupType}
and field name {@code fieldName}.
+ *
+ * @return The physical field index or -1 if the field does not exist
+ */
+ private static int getFieldIndexInPhysicalType(String fieldName, GroupType
groupType) {
+ // get index from fileSchema type, else, return -1
+ return groupType.containsField(fieldName) ?
groupType.getFieldIndex(fieldName) : -1;
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
new file mode 100644
index 00000000000..8be29289bba
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+
+import java.io.IOException;
+
+/**
+ * Empty {@link ColumnReader}.
+ * <p>
+ * This reader is to handle parquet files that have not been updated to the
latest Schema.
+ * When reading a parquet file with the latest schema, parquet file might not
have the new field.
+ * The EmptyColumnReader is used to handle such scenarios.
+ */
+public class EmptyColumnReader implements ColumnReader<WritableColumnVector> {
+
+ public EmptyColumnReader() {}
+
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector) throws
IOException {
+ vector.fillWithNulls();
+ }
+}
\ No newline at end of file
diff --git
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index e1f6f9ce72f..d38af61420d 100644
---
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -24,6 +24,7 @@ import
org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
@@ -387,14 +388,20 @@ public class ParquetSplitReaderUtil {
GroupType groupType = physicalType.asGroupType();
List<ColumnReader> fieldReaders = new ArrayList<>();
for (int i = 0; i < rowType.getFieldCount(); i++) {
- fieldReaders.add(
- createColumnReader(
- utcTimestamp,
- rowType.getTypeAt(i),
- groupType.getType(i),
- descriptors,
- pages,
- depth + 1));
+ // schema evolution: read the parquet file with a new extended field
name.
+ int fieldIndex =
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+ if (fieldIndex < 0) {
+ fieldReaders.add(new EmptyColumnReader());
+ } else {
+ fieldReaders.add(i,
+ createColumnReader(
+ utcTimestamp,
+ rowType.getTypeAt(i),
+ groupType.getType(fieldIndex),
+ descriptors,
+ pages,
+ depth + 1));
+ }
}
return new RowColumnReader(fieldReaders);
default:
@@ -508,20 +515,35 @@ public class ParquetSplitReaderUtil {
case ROW:
RowType rowType = (RowType) fieldType;
GroupType groupType = physicalType.asGroupType();
- WritableColumnVector[] columnVectors =
- new WritableColumnVector[rowType.getFieldCount()];
+ WritableColumnVector[] columnVectors = new
WritableColumnVector[rowType.getFieldCount()];
for (int i = 0; i < columnVectors.length; i++) {
- columnVectors[i] =
- createWritableColumnVector(
- batchSize,
- rowType.getTypeAt(i),
- groupType.getType(i),
- descriptors,
- depth + 1);
+ // schema evolution: read the file with a new extended field name.
+ int fieldIndex =
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+ if (fieldIndex < 0) {
+ columnVectors[i] = (WritableColumnVector)
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
+ } else {
+ columnVectors[i] =
+ createWritableColumnVector(
+ batchSize,
+ rowType.getTypeAt(i),
+ groupType.getType(fieldIndex),
+ descriptors,
+ depth + 1);
+ }
}
return new HeapRowColumnVector(batchSize, columnVectors);
default:
throw new UnsupportedOperationException(fieldType + " is not supported
now.");
}
}
+
+ /**
+ * Returns the field index with given physical row type {@code groupType}
and field name {@code fieldName}.
+ *
+ * @return The physical field index or -1 if the field does not exist
+ */
+ private static int getFieldIndexInPhysicalType(String fieldName, GroupType
groupType) {
+ // get index from fileSchema type, else, return -1
+ return groupType.containsField(fieldName) ?
groupType.getFieldIndex(fieldName) : -1;
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
new file mode 100644
index 00000000000..6ea610bf2af
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+
+import java.io.IOException;
+
+/**
+ * Empty {@link ColumnReader}.
+ * <p>
+ * This reader is to handle parquet files that have not been updated to the
latest Schema.
+ * When reading a parquet file with the latest schema, parquet file might not
have the new field.
+ * The EmptyColumnReader is used to handle such scenarios.
+ */
+public class EmptyColumnReader implements ColumnReader<WritableColumnVector> {
+
+ public EmptyColumnReader() {}
+
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector) throws
IOException {
+ vector.fillWithNulls();
+ }
+}
+
diff --git
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index e1f6f9ce72f..d38af61420d 100644
---
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -24,6 +24,7 @@ import
org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
@@ -387,14 +388,20 @@ public class ParquetSplitReaderUtil {
GroupType groupType = physicalType.asGroupType();
List<ColumnReader> fieldReaders = new ArrayList<>();
for (int i = 0; i < rowType.getFieldCount(); i++) {
- fieldReaders.add(
- createColumnReader(
- utcTimestamp,
- rowType.getTypeAt(i),
- groupType.getType(i),
- descriptors,
- pages,
- depth + 1));
+ // schema evolution: read the parquet file with a new extended field
name.
+ int fieldIndex =
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+ if (fieldIndex < 0) {
+ fieldReaders.add(new EmptyColumnReader());
+ } else {
+ fieldReaders.add(i,
+ createColumnReader(
+ utcTimestamp,
+ rowType.getTypeAt(i),
+ groupType.getType(fieldIndex),
+ descriptors,
+ pages,
+ depth + 1));
+ }
}
return new RowColumnReader(fieldReaders);
default:
@@ -508,20 +515,35 @@ public class ParquetSplitReaderUtil {
case ROW:
RowType rowType = (RowType) fieldType;
GroupType groupType = physicalType.asGroupType();
- WritableColumnVector[] columnVectors =
- new WritableColumnVector[rowType.getFieldCount()];
+ WritableColumnVector[] columnVectors = new
WritableColumnVector[rowType.getFieldCount()];
for (int i = 0; i < columnVectors.length; i++) {
- columnVectors[i] =
- createWritableColumnVector(
- batchSize,
- rowType.getTypeAt(i),
- groupType.getType(i),
- descriptors,
- depth + 1);
+ // schema evolution: read the file with a new extended field name.
+ int fieldIndex =
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+ if (fieldIndex < 0) {
+ columnVectors[i] = (WritableColumnVector)
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
+ } else {
+ columnVectors[i] =
+ createWritableColumnVector(
+ batchSize,
+ rowType.getTypeAt(i),
+ groupType.getType(fieldIndex),
+ descriptors,
+ depth + 1);
+ }
}
return new HeapRowColumnVector(batchSize, columnVectors);
default:
throw new UnsupportedOperationException(fieldType + " is not supported
now.");
}
}
+
+ /**
+ * Returns the field index with given physical row type {@code groupType}
and field name {@code fieldName}.
+ *
+ * @return The physical field index or -1 if the field does not exist
+ */
+ private static int getFieldIndexInPhysicalType(String fieldName, GroupType
groupType) {
+ // get index from fileSchema type, else, return -1
+ return groupType.containsField(fieldName) ?
groupType.getFieldIndex(fieldName) : -1;
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
new file mode 100644
index 00000000000..6ea610bf2af
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+
+import java.io.IOException;
+
+/**
+ * Empty {@link ColumnReader}.
+ * <p>
+ * This reader is to handle parquet files that have not been updated to the
latest Schema.
+ * When reading a parquet file with the latest schema, parquet file might not
have the new field.
+ * The EmptyColumnReader is used to handle such scenarios.
+ */
+public class EmptyColumnReader implements ColumnReader<WritableColumnVector> {
+
+ public EmptyColumnReader() {}
+
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector) throws
IOException {
+ vector.fillWithNulls();
+ }
+}
+
diff --git
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index e1f6f9ce72f..d38af61420d 100644
---
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -24,6 +24,7 @@ import
org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
@@ -387,14 +388,20 @@ public class ParquetSplitReaderUtil {
GroupType groupType = physicalType.asGroupType();
List<ColumnReader> fieldReaders = new ArrayList<>();
for (int i = 0; i < rowType.getFieldCount(); i++) {
- fieldReaders.add(
- createColumnReader(
- utcTimestamp,
- rowType.getTypeAt(i),
- groupType.getType(i),
- descriptors,
- pages,
- depth + 1));
+ // schema evolution: read the parquet file with a new extended field
name.
+ int fieldIndex =
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+ if (fieldIndex < 0) {
+ fieldReaders.add(new EmptyColumnReader());
+ } else {
+ fieldReaders.add(i,
+ createColumnReader(
+ utcTimestamp,
+ rowType.getTypeAt(i),
+ groupType.getType(fieldIndex),
+ descriptors,
+ pages,
+ depth + 1));
+ }
}
return new RowColumnReader(fieldReaders);
default:
@@ -508,20 +515,35 @@ public class ParquetSplitReaderUtil {
case ROW:
RowType rowType = (RowType) fieldType;
GroupType groupType = physicalType.asGroupType();
- WritableColumnVector[] columnVectors =
- new WritableColumnVector[rowType.getFieldCount()];
+ WritableColumnVector[] columnVectors = new
WritableColumnVector[rowType.getFieldCount()];
for (int i = 0; i < columnVectors.length; i++) {
- columnVectors[i] =
- createWritableColumnVector(
- batchSize,
- rowType.getTypeAt(i),
- groupType.getType(i),
- descriptors,
- depth + 1);
+ // schema evolution: read the file with a new extended field name.
+ int fieldIndex =
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
+ if (fieldIndex < 0) {
+ columnVectors[i] = (WritableColumnVector)
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
+ } else {
+ columnVectors[i] =
+ createWritableColumnVector(
+ batchSize,
+ rowType.getTypeAt(i),
+ groupType.getType(fieldIndex),
+ descriptors,
+ depth + 1);
+ }
}
return new HeapRowColumnVector(batchSize, columnVectors);
default:
throw new UnsupportedOperationException(fieldType + " is not supported
now.");
}
}
+
+ /**
+ * Returns the field index with given physical row type {@code groupType}
and field name {@code fieldName}.
+ *
+ * @return The physical field index or -1 if the field does not exist
+ */
+ private static int getFieldIndexInPhysicalType(String fieldName, GroupType
groupType) {
+ // get index from fileSchema type, else, return -1
+ return groupType.containsField(fieldName) ?
groupType.getFieldIndex(fieldName) : -1;
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
new file mode 100644
index 00000000000..6ea610bf2af
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/EmptyColumnReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+
+import java.io.IOException;
+
+/**
+ * Empty {@link ColumnReader}.
+ * <p>
+ * This reader is to handle parquet files that have not been updated to the
latest Schema.
+ * When reading a parquet file with the latest schema, parquet file might not
have the new field.
+ * The EmptyColumnReader is used to handle such scenarios.
+ */
+public class EmptyColumnReader implements ColumnReader<WritableColumnVector> {
+
+ public EmptyColumnReader() {}
+
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector) throws
IOException {
+ vector.fillWithNulls();
+ }
+}
+