This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2aa91ca24e2 [HUDI-7173] Fix hudi-on-flink read issues involving schema 
evolution and decimal types (#10247)
2aa91ca24e2 is described below

commit 2aa91ca24e2d3b52c403441215c45b3830d722ff
Author: voonhous <[email protected]>
AuthorDate: Thu Dec 7 12:01:08 2023 +0800

    [HUDI-7173] Fix hudi-on-flink read issues involving schema evolution and 
decimal types (#10247)
---
 .../apache/hudi/table/ITTestSchemaEvolution.java   | 96 +++++++++++-----------
 .../org/apache/hudi/utils/TestConfigurations.java  |  4 +-
 .../table/format/cow/ParquetSplitReaderUtil.java   | 28 +++----
 .../table/format/cow/vector/HeapDecimalVector.java | 39 +++++++++
 .../table/format/cow/ParquetSplitReaderUtil.java   | 28 +++----
 .../table/format/cow/vector/HeapDecimalVector.java | 39 +++++++++
 .../table/format/cow/ParquetSplitReaderUtil.java   | 28 +++----
 .../table/format/cow/vector/HeapDecimalVector.java | 39 +++++++++
 .../table/format/cow/ParquetSplitReaderUtil.java   | 28 +++----
 .../table/format/cow/vector/HeapDecimalVector.java | 39 +++++++++
 .../table/format/cow/ParquetSplitReaderUtil.java   | 28 +++----
 .../table/format/cow/vector/HeapDecimalVector.java | 39 +++++++++
 12 files changed, 317 insertions(+), 118 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
index 1555a8215dc..0417285815a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
@@ -250,6 +250,10 @@ public class ITTestSchemaEvolution {
       writeClient.addColumn("new_row_col", structType);
       writeClient.addColumn("new_array_col", arrayType);
       writeClient.addColumn("new_map_col", mapType);
+
+      // perform comprehensive evolution on a struct column by reordering 
field positions
+      writeClient.updateColumnType("f_struct.f0", Types.DecimalType.get(20, 
0));
+      writeClient.reOrderColPosition("f_struct.f0", "f_struct.drop_add", 
AFTER);
     }
   }
 
@@ -269,7 +273,7 @@ public class ITTestSchemaEvolution {
         + "  last_name string,"
         + "  salary double,"
         + "  ts timestamp,"
-        + "  f_struct row<f0 int, f2 int, f1 string, renamed_change_type 
bigint, f3 string, drop_add string>,"
+        + "  f_struct row<f2 int, f1 string, renamed_change_type bigint, f3 
string, drop_add string, f0 decimal(20, 0)>,"
         + "  f_map map<string, double>,"
         + "  f_array array<double>,"
         + "  new_row_col row<f0 bigint, f1 string>,"
@@ -287,7 +291,7 @@ public class ITTestSchemaEvolution {
         + "  cast(last_name as string),"
         + "  cast(salary as double),"
         + "  cast(ts as timestamp),"
-        + "  cast(f_struct as row<f0 int, f2 int, f1 string, 
renamed_change_type bigint, f3 string, drop_add string>),"
+        + "  cast(f_struct as row<f2 int, f1 string, renamed_change_type 
bigint, f3 string, drop_add string, f0 decimal(20, 0)>),"
         + "  cast(f_map as map<string, double>),"
         + "  cast(f_array as array<double>),"
         + "  cast(new_row_col as row<f0 bigint, f1 string>),"
@@ -295,11 +299,11 @@ public class ITTestSchemaEvolution {
         + "  cast(new_map_col as map<string, string>),"
         + "  cast(`partition` as string) "
         + "from (values "
-        + "  ('id1', '23', 'Danny', '', 10000.1, '2000-01-01 00:00:01', row(1, 
1, 's1', 11, 't1', 'drop_add1'), cast(map['Danny', 2323.23] as map<string, 
double>), array[23, 23, 23], "
+        + "  ('id1', '23', 'Danny', '', 10000.1, '2000-01-01 00:00:01', row(1, 
's1', 11, 't1', 'drop_add1', 1), cast(map['Danny', 2323.23] as map<string, 
double>), array[23, 23, 23], "
         + "  row(1, '1'), array['1'], Map['k1','v1'], 'par1'),"
-        + "  ('id9', 'unknown', 'Alice', '', 90000.9, '2000-01-01 00:00:09', 
row(9, 9, 's9', 99, 't9', 'drop_add9'), cast(map['Alice', 9999.99] as 
map<string, double>), array[9999, 9999], "
+        + "  ('id9', 'unknown', 'Alice', '', 90000.9, '2000-01-01 00:00:09', 
row(9, 's9', 99, 't9', 'drop_add9', 9), cast(map['Alice', 9999.99] as 
map<string, double>), array[9999, 9999], "
         + "  row(9, '9'), array['9'], Map['k9','v9'], 'par1'),"
-        + "  ('id3', '53', 'Julian', '', 30000.3, '2000-01-01 00:00:03', 
row(3, 3, 's3', 33, 't3', 'drop_add3'), cast(map['Julian', 5353.53] as 
map<string, double>), array[53], "
+        + "  ('id3', '53', 'Julian', '', 30000.3, '2000-01-01 00:00:03', 
row(3, 's3', 33, 't3', 'drop_add3', 3), cast(map['Julian', 5353.53] as 
map<string, double>), array[53], "
         + "  row(3, '3'), array['3'], Map['k3','v3'], 'par2')"
         + ") as A(uuid, age, first_name, last_name, salary, ts, f_struct, 
f_map, f_array, new_row_col, new_array_col, new_map_col, `partition`)"
     ).await();
@@ -367,7 +371,7 @@ public class ITTestSchemaEvolution {
         + "  last_name string,"
         + "  salary double,"
         + "  ts timestamp,"
-        + "  f_struct row<f0 int, f2 int, f1 string, renamed_change_type 
bigint, f3 string, drop_add string>,"
+        + "  f_struct row<f2 int, f1 string, renamed_change_type bigint, f3 
string, drop_add string, f0 decimal(20, 0)>,"
         + "  f_map map<string, double>,"
         + "  f_array array<double>,"
         + "  new_row_col row<f0 bigint, f1 string>,"
@@ -469,27 +473,27 @@ public class ITTestSchemaEvolution {
   private static final ExpectedResult EXPECTED_MERGED_RESULT = new 
ExpectedResult(
       new String[] {
           "+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null, 
null]",
-          "+I[Danny, 10000.1, 23, +I[1, 1, s1, 11, t1, drop_add1], 
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
-          "+I[Stephen, null, 33, +I[2, null, s2, 2, null, null], 
{Stephen=3333.0}, [33.0], null, null, null]",
-          "+I[Julian, 30000.3, 53, +I[3, 3, s3, 33, t3, drop_add3], 
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
-          "+I[Fabian, null, 31, +I[4, null, s4, 4, null, null], 
{Fabian=3131.0}, [31.0], null, null, null]",
-          "+I[Sophia, null, 18, +I[5, null, s5, 5, null, null], 
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
-          "+I[Emma, null, 20, +I[6, null, s6, 6, null, null], {Emma=2020.0}, 
[20.0], null, null, null]",
-          "+I[Bob, null, 44, +I[7, null, s7, 7, null, null], {Bob=4444.0}, 
[44.0, 44.0], null, null, null]",
-          "+I[Han, null, 56, +I[8, null, s8, 8, null, null], {Han=5656.0}, 
[56.0, 56.0, 56.0], null, null, null]",
-          "+I[Alice, 90000.9, unknown, +I[9, 9, s9, 99, t9, drop_add9], 
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
+          "+I[Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], 
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
+          "+I[Stephen, null, 33, +I[null, s2, 2, null, null, 2], 
{Stephen=3333.0}, [33.0], null, null, null]",
+          "+I[Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], 
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
+          "+I[Fabian, null, 31, +I[null, s4, 4, null, null, 4], 
{Fabian=3131.0}, [31.0], null, null, null]",
+          "+I[Sophia, null, 18, +I[null, s5, 5, null, null, 5], 
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
+          "+I[Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0}, 
[20.0], null, null, null]",
+          "+I[Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0}, 
[44.0, 44.0], null, null, null]",
+          "+I[Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0}, 
[56.0, 56.0, 56.0], null, null, null]",
+          "+I[Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], 
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
       },
       new String[] {
           "+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null, 
null, null]",
-          "+I[id1, Danny, 10000.1, 23, +I[1, 1, s1, 11, t1, drop_add1], 
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
-          "+I[id2, Stephen, null, 33, +I[2, null, s2, 2, null, null], 
{Stephen=3333.0}, [33.0], null, null, null]",
-          "+I[id3, Julian, 30000.3, 53, +I[3, 3, s3, 33, t3, drop_add3], 
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
-          "+I[id4, Fabian, null, 31, +I[4, null, s4, 4, null, null], 
{Fabian=3131.0}, [31.0], null, null, null]",
-          "+I[id5, Sophia, null, 18, +I[5, null, s5, 5, null, null], 
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
-          "+I[id6, Emma, null, 20, +I[6, null, s6, 6, null, null], 
{Emma=2020.0}, [20.0], null, null, null]",
-          "+I[id7, Bob, null, 44, +I[7, null, s7, 7, null, null], 
{Bob=4444.0}, [44.0, 44.0], null, null, null]",
-          "+I[id8, Han, null, 56, +I[8, null, s8, 8, null, null], 
{Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]",
-          "+I[id9, Alice, 90000.9, unknown, +I[9, 9, s9, 99, t9, drop_add9], 
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
+          "+I[id1, Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], 
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
+          "+I[id2, Stephen, null, 33, +I[null, s2, 2, null, null, 2], 
{Stephen=3333.0}, [33.0], null, null, null]",
+          "+I[id3, Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], 
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
+          "+I[id4, Fabian, null, 31, +I[null, s4, 4, null, null, 4], 
{Fabian=3131.0}, [31.0], null, null, null]",
+          "+I[id5, Sophia, null, 18, +I[null, s5, 5, null, null, 5], 
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
+          "+I[id6, Emma, null, 20, +I[null, s6, 6, null, null, 6], 
{Emma=2020.0}, [20.0], null, null, null]",
+          "+I[id7, Bob, null, 44, +I[null, s7, 7, null, null, 7], 
{Bob=4444.0}, [44.0, 44.0], null, null, null]",
+          "+I[id8, Han, null, 56, +I[null, s8, 8, null, null, 8], 
{Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]",
+          "+I[id9, Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], 
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
       },
       new String[] {
           "+I[1]",
@@ -517,31 +521,31 @@ public class ITTestSchemaEvolution {
   private static final ExpectedResult EXPECTED_UNMERGED_RESULT = new 
ExpectedResult(
       new String[] {
           "+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null, 
null]",
-          "+I[Danny, null, 23, +I[1, null, s1, 1, null, null], {Danny=2323.0}, 
[23.0, 23.0], null, null, null]",
-          "+I[Stephen, null, 33, +I[2, null, s2, 2, null, null], 
{Stephen=3333.0}, [33.0], null, null, null]",
-          "+I[Julian, null, 53, +I[3, null, s3, 3, null, null], 
{Julian=5353.0}, [53.0, 53.0], null, null, null]",
-          "+I[Fabian, null, 31, +I[4, null, s4, 4, null, null], 
{Fabian=3131.0}, [31.0], null, null, null]",
-          "+I[Sophia, null, 18, +I[5, null, s5, 5, null, null], 
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
-          "+I[Emma, null, 20, +I[6, null, s6, 6, null, null], {Emma=2020.0}, 
[20.0], null, null, null]",
-          "+I[Bob, null, 44, +I[7, null, s7, 7, null, null], {Bob=4444.0}, 
[44.0, 44.0], null, null, null]",
-          "+I[Han, null, 56, +I[8, null, s8, 8, null, null], {Han=5656.0}, 
[56.0, 56.0, 56.0], null, null, null]",
-          "+I[Alice, 90000.9, unknown, +I[9, 9, s9, 99, t9, drop_add9], 
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
-          "+I[Danny, 10000.1, 23, +I[1, 1, s1, 11, t1, drop_add1], 
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
-          "+I[Julian, 30000.3, 53, +I[3, 3, s3, 33, t3, drop_add3], 
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
+          "+I[Danny, null, 23, +I[null, s1, 1, null, null, 1], {Danny=2323.0}, 
[23.0, 23.0], null, null, null]",
+          "+I[Stephen, null, 33, +I[null, s2, 2, null, null, 2], 
{Stephen=3333.0}, [33.0], null, null, null]",
+          "+I[Julian, null, 53, +I[null, s3, 3, null, null, 3], 
{Julian=5353.0}, [53.0, 53.0], null, null, null]",
+          "+I[Fabian, null, 31, +I[null, s4, 4, null, null, 4], 
{Fabian=3131.0}, [31.0], null, null, null]",
+          "+I[Sophia, null, 18, +I[null, s5, 5, null, null, 5], 
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
+          "+I[Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0}, 
[20.0], null, null, null]",
+          "+I[Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0}, 
[44.0, 44.0], null, null, null]",
+          "+I[Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0}, 
[56.0, 56.0, 56.0], null, null, null]",
+          "+I[Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], 
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
+          "+I[Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], 
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
+          "+I[Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], 
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
       },
       new String[] {
           "+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null, 
null, null]",
-          "+I[id1, Danny, null, 23, +I[1, null, s1, 1, null, null], 
{Danny=2323.0}, [23.0, 23.0], null, null, null]",
-          "+I[id2, Stephen, null, 33, +I[2, null, s2, 2, null, null], 
{Stephen=3333.0}, [33.0], null, null, null]",
-          "+I[id3, Julian, null, 53, +I[3, null, s3, 3, null, null], 
{Julian=5353.0}, [53.0, 53.0], null, null, null]",
-          "+I[id4, Fabian, null, 31, +I[4, null, s4, 4, null, null], 
{Fabian=3131.0}, [31.0], null, null, null]",
-          "+I[id5, Sophia, null, 18, +I[5, null, s5, 5, null, null], 
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
-          "+I[id6, Emma, null, 20, +I[6, null, s6, 6, null, null], 
{Emma=2020.0}, [20.0], null, null, null]",
-          "+I[id7, Bob, null, 44, +I[7, null, s7, 7, null, null], 
{Bob=4444.0}, [44.0, 44.0], null, null, null]",
-          "+I[id8, Han, null, 56, +I[8, null, s8, 8, null, null], 
{Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]",
-          "+I[id9, Alice, 90000.9, unknown, +I[9, 9, s9, 99, t9, drop_add9], 
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
-          "+I[id1, Danny, 10000.1, 23, +I[1, 1, s1, 11, t1, drop_add1], 
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
-          "+I[id3, Julian, 30000.3, 53, +I[3, 3, s3, 33, t3, drop_add3], 
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
+          "+I[id1, Danny, null, 23, +I[null, s1, 1, null, null, 1], 
{Danny=2323.0}, [23.0, 23.0], null, null, null]",
+          "+I[id2, Stephen, null, 33, +I[null, s2, 2, null, null, 2], 
{Stephen=3333.0}, [33.0], null, null, null]",
+          "+I[id3, Julian, null, 53, +I[null, s3, 3, null, null, 3], 
{Julian=5353.0}, [53.0, 53.0], null, null, null]",
+          "+I[id4, Fabian, null, 31, +I[null, s4, 4, null, null, 4], 
{Fabian=3131.0}, [31.0], null, null, null]",
+          "+I[id5, Sophia, null, 18, +I[null, s5, 5, null, null, 5], 
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
+          "+I[id6, Emma, null, 20, +I[null, s6, 6, null, null, 6], 
{Emma=2020.0}, [20.0], null, null, null]",
+          "+I[id7, Bob, null, 44, +I[null, s7, 7, null, null, 7], 
{Bob=4444.0}, [44.0, 44.0], null, null, null]",
+          "+I[id8, Han, null, 56, +I[null, s8, 8, null, null, 8], 
{Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]",
+          "+I[id9, Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9], 
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
+          "+I[id1, Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1], 
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
+          "+I[id3, Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3], 
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
       },
       new String[] {
           "+I[1]",
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index b4f769fcc00..71295d93b10 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -110,12 +110,12 @@ public class TestConfigurations {
           DataTypes.FIELD("salary", DataTypes.DOUBLE()), // new field
           DataTypes.FIELD("ts", DataTypes.TIMESTAMP(6)),
           DataTypes.FIELD("f_struct", DataTypes.ROW(
-              DataTypes.FIELD("f0", DataTypes.INT()),
               DataTypes.FIELD("f2", DataTypes.INT()), // new field added in 
the middle of struct
               DataTypes.FIELD("f1", DataTypes.STRING()),
               DataTypes.FIELD("renamed_change_type", DataTypes.BIGINT()),
               DataTypes.FIELD("f3", DataTypes.STRING()),
-              DataTypes.FIELD("drop_add", DataTypes.STRING()))), // new field 
added at the end of struct
+              DataTypes.FIELD("drop_add", DataTypes.STRING()),
+              DataTypes.FIELD("f0", DataTypes.DECIMAL(20, 0)))),
           DataTypes.FIELD("f_map", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.DOUBLE())),
           DataTypes.FIELD("f_array", DataTypes.ARRAY(DataTypes.DOUBLE())),
           DataTypes.FIELD("new_row_col", DataTypes.ROW(
diff --git 
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index ed63bae6749..76aa827a84a 100644
--- 
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -20,9 +20,9 @@ package org.apache.hudi.table.format.cow;
 
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapDecimalVector;
 import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
 import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
-import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
 import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
 import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
@@ -65,7 +65,6 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.ParquetRuntimeException;
@@ -234,17 +233,18 @@ public class ParquetSplitReaderUtil {
         }
         return lv;
       case DECIMAL:
-        DecimalType decimalType = (DecimalType) type;
-        int precision = decimalType.getPrecision();
-        int scale = decimalType.getScale();
-        DecimalData decimal = value == null
-            ? null
-            : 
Preconditions.checkNotNull(DecimalData.fromBigDecimal((BigDecimal) value, 
precision, scale));
-        ColumnVector internalVector = createVectorFromConstant(
-            new VarBinaryType(),
-            decimal == null ? null : decimal.toUnscaledBytes(),
-            batchSize);
-        return new ParquetDecimalVector(internalVector);
+        HeapDecimalVector decv = new HeapDecimalVector(batchSize);
+        if (value == null) {
+          decv.fillWithNulls();
+        } else {
+          DecimalType decimalType = (DecimalType) type;
+          int precision = decimalType.getPrecision();
+          int scale = decimalType.getScale();
+          DecimalData decimal = Preconditions.checkNotNull(
+              DecimalData.fromBigDecimal((BigDecimal) value, precision, 
scale));
+          decv.fill(decimal.toUnscaledBytes());
+        }
+        return decv;
       case FLOAT:
         HeapFloatVector fv = new HeapFloatVector(batchSize);
         if (value == null) {
@@ -513,7 +513,7 @@ public class ParquetSplitReaderUtil {
                 || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
                 && primitiveType.getOriginalType() == OriginalType.DECIMAL,
             "Unexpected type: %s", typeName);
-        return new HeapBytesVector(batchSize);
+        return new HeapDecimalVector(batchSize);
       case ARRAY:
         ArrayType arrayType = (ArrayType) fieldType;
         return new HeapArrayVector(
diff --git 
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
new file mode 100644
index 00000000000..06cf200a841
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.vector.DecimalColumnVector;
+import org.apache.flink.table.data.vector.heap.HeapBytesVector;
+
+/**
+ * This class represents a nullable heap map decimal vector.
+ */
+public class HeapDecimalVector extends HeapBytesVector implements 
DecimalColumnVector {
+
+  public HeapDecimalVector(int len) {
+    super(len);
+  }
+
+  @Override
+  public DecimalData getDecimal(int i, int precision, int scale) {
+    return DecimalData.fromUnscaledBytes(
+        this.getBytes(i).getBytes(), precision, scale);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 9bf5390ee26..1b636c63b2f 100644
--- 
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -20,9 +20,9 @@ package org.apache.hudi.table.format.cow;
 
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapDecimalVector;
 import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
 import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
-import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
 import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
 import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
@@ -65,7 +65,6 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.ParquetRuntimeException;
@@ -234,17 +233,18 @@ public class ParquetSplitReaderUtil {
         }
         return lv;
       case DECIMAL:
-        DecimalType decimalType = (DecimalType) type;
-        int precision = decimalType.getPrecision();
-        int scale = decimalType.getScale();
-        DecimalData decimal = value == null
-            ? null
-            : 
Preconditions.checkNotNull(DecimalData.fromBigDecimal((BigDecimal) value, 
precision, scale));
-        ColumnVector internalVector = createVectorFromConstant(
-            new VarBinaryType(),
-            decimal == null ? null : decimal.toUnscaledBytes(),
-            batchSize);
-        return new ParquetDecimalVector(internalVector);
+        HeapDecimalVector decv = new HeapDecimalVector(batchSize);
+        if (value == null) {
+          decv.fillWithNulls();
+        } else {
+          DecimalType decimalType = (DecimalType) type;
+          int precision = decimalType.getPrecision();
+          int scale = decimalType.getScale();
+          DecimalData decimal = Preconditions.checkNotNull(
+              DecimalData.fromBigDecimal((BigDecimal) value, precision, 
scale));
+          decv.fill(decimal.toUnscaledBytes());
+        }
+        return decv;
       case FLOAT:
         HeapFloatVector fv = new HeapFloatVector(batchSize);
         if (value == null) {
@@ -513,7 +513,7 @@ public class ParquetSplitReaderUtil {
                 || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
                 && primitiveType.getOriginalType() == OriginalType.DECIMAL,
             "Unexpected type: %s", typeName);
-        return new HeapBytesVector(batchSize);
+        return new HeapDecimalVector(batchSize);
       case ARRAY:
         ArrayType arrayType = (ArrayType) fieldType;
         return new HeapArrayVector(
diff --git 
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
new file mode 100644
index 00000000000..fdc55ac18fc
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.columnar.vector.DecimalColumnVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector;
+
+/**
+ * This class represents a nullable heap map decimal vector.
+ */
+public class HeapDecimalVector extends HeapBytesVector implements 
DecimalColumnVector {
+
+  public HeapDecimalVector(int len) {
+    super(len);
+  }
+
+  @Override
+  public DecimalData getDecimal(int i, int precision, int scale) {
+    return DecimalData.fromUnscaledBytes(
+        this.getBytes(i).getBytes(), precision, scale);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 9bf5390ee26..1b636c63b2f 100644
--- 
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -20,9 +20,9 @@ package org.apache.hudi.table.format.cow;
 
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapDecimalVector;
 import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
 import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
-import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
 import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
 import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
@@ -65,7 +65,6 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.ParquetRuntimeException;
@@ -234,17 +233,18 @@ public class ParquetSplitReaderUtil {
         }
         return lv;
       case DECIMAL:
-        DecimalType decimalType = (DecimalType) type;
-        int precision = decimalType.getPrecision();
-        int scale = decimalType.getScale();
-        DecimalData decimal = value == null
-            ? null
-            : 
Preconditions.checkNotNull(DecimalData.fromBigDecimal((BigDecimal) value, 
precision, scale));
-        ColumnVector internalVector = createVectorFromConstant(
-            new VarBinaryType(),
-            decimal == null ? null : decimal.toUnscaledBytes(),
-            batchSize);
-        return new ParquetDecimalVector(internalVector);
+        HeapDecimalVector decv = new HeapDecimalVector(batchSize);
+        if (value == null) {
+          decv.fillWithNulls();
+        } else {
+          DecimalType decimalType = (DecimalType) type;
+          int precision = decimalType.getPrecision();
+          int scale = decimalType.getScale();
+          DecimalData decimal = Preconditions.checkNotNull(
+              DecimalData.fromBigDecimal((BigDecimal) value, precision, 
scale));
+          decv.fill(decimal.toUnscaledBytes());
+        }
+        return decv;
       case FLOAT:
         HeapFloatVector fv = new HeapFloatVector(batchSize);
         if (value == null) {
@@ -513,7 +513,7 @@ public class ParquetSplitReaderUtil {
                 || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
                 && primitiveType.getOriginalType() == OriginalType.DECIMAL,
             "Unexpected type: %s", typeName);
-        return new HeapBytesVector(batchSize);
+        return new HeapDecimalVector(batchSize);
       case ARRAY:
         ArrayType arrayType = (ArrayType) fieldType;
         return new HeapArrayVector(
diff --git 
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
 
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
new file mode 100644
index 00000000000..fdc55ac18fc
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.columnar.vector.DecimalColumnVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector;
+
+/**
+ * This class represents a nullable heap map decimal vector.
+ */
+public class HeapDecimalVector extends HeapBytesVector implements 
DecimalColumnVector {
+
+  public HeapDecimalVector(int len) {
+    super(len);
+  }
+
+  @Override
+  public DecimalData getDecimal(int i, int precision, int scale) {
+    return DecimalData.fromUnscaledBytes(
+        this.getBytes(i).getBytes(), precision, scale);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 9bf5390ee26..1b636c63b2f 100644
--- 
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -20,9 +20,9 @@ package org.apache.hudi.table.format.cow;
 
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapDecimalVector;
 import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
 import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
-import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
 import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
 import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
@@ -65,7 +65,6 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.ParquetRuntimeException;
@@ -234,17 +233,18 @@ public class ParquetSplitReaderUtil {
         }
         return lv;
       case DECIMAL:
-        DecimalType decimalType = (DecimalType) type;
-        int precision = decimalType.getPrecision();
-        int scale = decimalType.getScale();
-        DecimalData decimal = value == null
-            ? null
-            : 
Preconditions.checkNotNull(DecimalData.fromBigDecimal((BigDecimal) value, 
precision, scale));
-        ColumnVector internalVector = createVectorFromConstant(
-            new VarBinaryType(),
-            decimal == null ? null : decimal.toUnscaledBytes(),
-            batchSize);
-        return new ParquetDecimalVector(internalVector);
+        HeapDecimalVector decv = new HeapDecimalVector(batchSize);
+        if (value == null) {
+          decv.fillWithNulls();
+        } else {
+          DecimalType decimalType = (DecimalType) type;
+          int precision = decimalType.getPrecision();
+          int scale = decimalType.getScale();
+          DecimalData decimal = Preconditions.checkNotNull(
+              DecimalData.fromBigDecimal((BigDecimal) value, precision, 
scale));
+          decv.fill(decimal.toUnscaledBytes());
+        }
+        return decv;
       case FLOAT:
         HeapFloatVector fv = new HeapFloatVector(batchSize);
         if (value == null) {
@@ -513,7 +513,7 @@ public class ParquetSplitReaderUtil {
                 || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
                 && primitiveType.getOriginalType() == OriginalType.DECIMAL,
             "Unexpected type: %s", typeName);
-        return new HeapBytesVector(batchSize);
+        return new HeapDecimalVector(batchSize);
       case ARRAY:
         ArrayType arrayType = (ArrayType) fieldType;
         return new HeapArrayVector(
diff --git 
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
new file mode 100644
index 00000000000..fdc55ac18fc
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.columnar.vector.DecimalColumnVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector;
+
+/**
+ * This class represents a nullable heap map decimal vector.
+ */
+public class HeapDecimalVector extends HeapBytesVector implements 
DecimalColumnVector {
+
+  public HeapDecimalVector(int len) {
+    super(len);
+  }
+
+  @Override
+  public DecimalData getDecimal(int i, int precision, int scale) {
+    return DecimalData.fromUnscaledBytes(
+        this.getBytes(i).getBytes(), precision, scale);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 9bf5390ee26..1b636c63b2f 100644
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -20,9 +20,9 @@ package org.apache.hudi.table.format.cow;
 
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapDecimalVector;
 import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
 import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
-import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
 import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
 import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
@@ -65,7 +65,6 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.MapType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.ParquetRuntimeException;
@@ -234,17 +233,18 @@ public class ParquetSplitReaderUtil {
         }
         return lv;
       case DECIMAL:
-        DecimalType decimalType = (DecimalType) type;
-        int precision = decimalType.getPrecision();
-        int scale = decimalType.getScale();
-        DecimalData decimal = value == null
-            ? null
-            : 
Preconditions.checkNotNull(DecimalData.fromBigDecimal((BigDecimal) value, 
precision, scale));
-        ColumnVector internalVector = createVectorFromConstant(
-            new VarBinaryType(),
-            decimal == null ? null : decimal.toUnscaledBytes(),
-            batchSize);
-        return new ParquetDecimalVector(internalVector);
+        HeapDecimalVector decv = new HeapDecimalVector(batchSize);
+        if (value == null) {
+          decv.fillWithNulls();
+        } else {
+          DecimalType decimalType = (DecimalType) type;
+          int precision = decimalType.getPrecision();
+          int scale = decimalType.getScale();
+          DecimalData decimal = Preconditions.checkNotNull(
+              DecimalData.fromBigDecimal((BigDecimal) value, precision, 
scale));
+          decv.fill(decimal.toUnscaledBytes());
+        }
+        return decv;
       case FLOAT:
         HeapFloatVector fv = new HeapFloatVector(batchSize);
         if (value == null) {
@@ -513,7 +513,7 @@ public class ParquetSplitReaderUtil {
                 || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
                 && primitiveType.getOriginalType() == OriginalType.DECIMAL,
             "Unexpected type: %s", typeName);
-        return new HeapBytesVector(batchSize);
+        return new HeapDecimalVector(batchSize);
       case ARRAY:
         ArrayType arrayType = (ArrayType) fieldType;
         return new HeapArrayVector(
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
new file mode 100644
index 00000000000..fdc55ac18fc
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.columnar.vector.DecimalColumnVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector;
+
+/**
+ * This class represents a nullable heap map decimal vector.
+ */
+public class HeapDecimalVector extends HeapBytesVector implements 
DecimalColumnVector {
+
+  public HeapDecimalVector(int len) {
+    super(len);
+  }
+
+  @Override
+  public DecimalData getDecimal(int i, int precision, int scale) {
+    return DecimalData.fromUnscaledBytes(
+        this.getBytes(i).getBytes(), precision, scale);
+  }
+}


Reply via email to