This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2aa91ca24e2 [HUDI-7173] Fix hudi-on-flink read issues involving schema
evolution and decimal types (#10247)
2aa91ca24e2 is described below
commit 2aa91ca24e2d3b52c403441215c45b3830d722ff
Author: voonhous <[email protected]>
AuthorDate: Thu Dec 7 12:01:08 2023 +0800
[HUDI-7173] Fix hudi-on-flink read issues involving schema evolution and
decimal types (#10247)
---
.../apache/hudi/table/ITTestSchemaEvolution.java | 96 +++++++++++-----------
.../org/apache/hudi/utils/TestConfigurations.java | 4 +-
.../table/format/cow/ParquetSplitReaderUtil.java | 28 +++----
.../table/format/cow/vector/HeapDecimalVector.java | 39 +++++++++
.../table/format/cow/ParquetSplitReaderUtil.java | 28 +++----
.../table/format/cow/vector/HeapDecimalVector.java | 39 +++++++++
.../table/format/cow/ParquetSplitReaderUtil.java | 28 +++----
.../table/format/cow/vector/HeapDecimalVector.java | 39 +++++++++
.../table/format/cow/ParquetSplitReaderUtil.java | 28 +++----
.../table/format/cow/vector/HeapDecimalVector.java | 39 +++++++++
.../table/format/cow/ParquetSplitReaderUtil.java | 28 +++----
.../table/format/cow/vector/HeapDecimalVector.java | 39 +++++++++
12 files changed, 317 insertions(+), 118 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
index 1555a8215dc..0417285815a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
@@ -250,6 +250,10 @@ public class ITTestSchemaEvolution {
writeClient.addColumn("new_row_col", structType);
writeClient.addColumn("new_array_col", arrayType);
writeClient.addColumn("new_map_col", mapType);
+
+ // perform comprehensive evolution on a struct column by reordering
field positions
+ writeClient.updateColumnType("f_struct.f0", Types.DecimalType.get(20,
0));
+ writeClient.reOrderColPosition("f_struct.f0", "f_struct.drop_add",
AFTER);
}
}
@@ -269,7 +273,7 @@ public class ITTestSchemaEvolution {
+ " last_name string,"
+ " salary double,"
+ " ts timestamp,"
- + " f_struct row<f0 int, f2 int, f1 string, renamed_change_type
bigint, f3 string, drop_add string>,"
+ + " f_struct row<f2 int, f1 string, renamed_change_type bigint, f3
string, drop_add string, f0 decimal(20, 0)>,"
+ " f_map map<string, double>,"
+ " f_array array<double>,"
+ " new_row_col row<f0 bigint, f1 string>,"
@@ -287,7 +291,7 @@ public class ITTestSchemaEvolution {
+ " cast(last_name as string),"
+ " cast(salary as double),"
+ " cast(ts as timestamp),"
- + " cast(f_struct as row<f0 int, f2 int, f1 string,
renamed_change_type bigint, f3 string, drop_add string>),"
+ + " cast(f_struct as row<f2 int, f1 string, renamed_change_type
bigint, f3 string, drop_add string, f0 decimal(20, 0)>),"
+ " cast(f_map as map<string, double>),"
+ " cast(f_array as array<double>),"
+ " cast(new_row_col as row<f0 bigint, f1 string>),"
@@ -295,11 +299,11 @@ public class ITTestSchemaEvolution {
+ " cast(new_map_col as map<string, string>),"
+ " cast(`partition` as string) "
+ "from (values "
- + " ('id1', '23', 'Danny', '', 10000.1, '2000-01-01 00:00:01', row(1,
1, 's1', 11, 't1', 'drop_add1'), cast(map['Danny', 2323.23] as map<string,
double>), array[23, 23, 23], "
+ + " ('id1', '23', 'Danny', '', 10000.1, '2000-01-01 00:00:01', row(1,
's1', 11, 't1', 'drop_add1', 1), cast(map['Danny', 2323.23] as map<string,
double>), array[23, 23, 23], "
+ " row(1, '1'), array['1'], Map['k1','v1'], 'par1'),"
- + " ('id9', 'unknown', 'Alice', '', 90000.9, '2000-01-01 00:00:09',
row(9, 9, 's9', 99, 't9', 'drop_add9'), cast(map['Alice', 9999.99] as
map<string, double>), array[9999, 9999], "
+ + " ('id9', 'unknown', 'Alice', '', 90000.9, '2000-01-01 00:00:09',
row(9, 's9', 99, 't9', 'drop_add9', 9), cast(map['Alice', 9999.99] as
map<string, double>), array[9999, 9999], "
+ " row(9, '9'), array['9'], Map['k9','v9'], 'par1'),"
- + " ('id3', '53', 'Julian', '', 30000.3, '2000-01-01 00:00:03',
row(3, 3, 's3', 33, 't3', 'drop_add3'), cast(map['Julian', 5353.53] as
map<string, double>), array[53], "
+ + " ('id3', '53', 'Julian', '', 30000.3, '2000-01-01 00:00:03',
row(3, 's3', 33, 't3', 'drop_add3', 3), cast(map['Julian', 5353.53] as
map<string, double>), array[53], "
+ " row(3, '3'), array['3'], Map['k3','v3'], 'par2')"
+ ") as A(uuid, age, first_name, last_name, salary, ts, f_struct,
f_map, f_array, new_row_col, new_array_col, new_map_col, `partition`)"
).await();
@@ -367,7 +371,7 @@ public class ITTestSchemaEvolution {
+ " last_name string,"
+ " salary double,"
+ " ts timestamp,"
- + " f_struct row<f0 int, f2 int, f1 string, renamed_change_type
bigint, f3 string, drop_add string>,"
+ + " f_struct row<f2 int, f1 string, renamed_change_type bigint, f3
string, drop_add string, f0 decimal(20, 0)>,"
+ " f_map map<string, double>,"
+ " f_array array<double>,"
+ " new_row_col row<f0 bigint, f1 string>,"
@@ -469,27 +473,27 @@ public class ITTestSchemaEvolution {
private static final ExpectedResult EXPECTED_MERGED_RESULT = new
ExpectedResult(
new String[] {
"+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null,
null]",
- "+I[Danny, 10000.1, 23, +I[1, 1, s1, 11, t1, drop_add1],
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
- "+I[Stephen, null, 33, +I[2, null, s2, 2, null, null],
{Stephen=3333.0}, [33.0], null, null, null]",
- "+I[Julian, 30000.3, 53, +I[3, 3, s3, 33, t3, drop_add3],
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
- "+I[Fabian, null, 31, +I[4, null, s4, 4, null, null],
{Fabian=3131.0}, [31.0], null, null, null]",
- "+I[Sophia, null, 18, +I[5, null, s5, 5, null, null],
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
- "+I[Emma, null, 20, +I[6, null, s6, 6, null, null], {Emma=2020.0},
[20.0], null, null, null]",
- "+I[Bob, null, 44, +I[7, null, s7, 7, null, null], {Bob=4444.0},
[44.0, 44.0], null, null, null]",
- "+I[Han, null, 56, +I[8, null, s8, 8, null, null], {Han=5656.0},
[56.0, 56.0, 56.0], null, null, null]",
- "+I[Alice, 90000.9, unknown, +I[9, 9, s9, 99, t9, drop_add9],
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
+ "+I[Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1],
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
+ "+I[Stephen, null, 33, +I[null, s2, 2, null, null, 2],
{Stephen=3333.0}, [33.0], null, null, null]",
+ "+I[Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3],
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
+ "+I[Fabian, null, 31, +I[null, s4, 4, null, null, 4],
{Fabian=3131.0}, [31.0], null, null, null]",
+ "+I[Sophia, null, 18, +I[null, s5, 5, null, null, 5],
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
+ "+I[Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0},
[20.0], null, null, null]",
+ "+I[Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0},
[44.0, 44.0], null, null, null]",
+ "+I[Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0},
[56.0, 56.0, 56.0], null, null, null]",
+ "+I[Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9],
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
},
new String[] {
"+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null,
null, null]",
- "+I[id1, Danny, 10000.1, 23, +I[1, 1, s1, 11, t1, drop_add1],
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
- "+I[id2, Stephen, null, 33, +I[2, null, s2, 2, null, null],
{Stephen=3333.0}, [33.0], null, null, null]",
- "+I[id3, Julian, 30000.3, 53, +I[3, 3, s3, 33, t3, drop_add3],
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
- "+I[id4, Fabian, null, 31, +I[4, null, s4, 4, null, null],
{Fabian=3131.0}, [31.0], null, null, null]",
- "+I[id5, Sophia, null, 18, +I[5, null, s5, 5, null, null],
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
- "+I[id6, Emma, null, 20, +I[6, null, s6, 6, null, null],
{Emma=2020.0}, [20.0], null, null, null]",
- "+I[id7, Bob, null, 44, +I[7, null, s7, 7, null, null],
{Bob=4444.0}, [44.0, 44.0], null, null, null]",
- "+I[id8, Han, null, 56, +I[8, null, s8, 8, null, null],
{Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]",
- "+I[id9, Alice, 90000.9, unknown, +I[9, 9, s9, 99, t9, drop_add9],
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
+ "+I[id1, Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1],
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
+ "+I[id2, Stephen, null, 33, +I[null, s2, 2, null, null, 2],
{Stephen=3333.0}, [33.0], null, null, null]",
+ "+I[id3, Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3],
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
+ "+I[id4, Fabian, null, 31, +I[null, s4, 4, null, null, 4],
{Fabian=3131.0}, [31.0], null, null, null]",
+ "+I[id5, Sophia, null, 18, +I[null, s5, 5, null, null, 5],
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
+ "+I[id6, Emma, null, 20, +I[null, s6, 6, null, null, 6],
{Emma=2020.0}, [20.0], null, null, null]",
+ "+I[id7, Bob, null, 44, +I[null, s7, 7, null, null, 7],
{Bob=4444.0}, [44.0, 44.0], null, null, null]",
+ "+I[id8, Han, null, 56, +I[null, s8, 8, null, null, 8],
{Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]",
+ "+I[id9, Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9],
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
},
new String[] {
"+I[1]",
@@ -517,31 +521,31 @@ public class ITTestSchemaEvolution {
private static final ExpectedResult EXPECTED_UNMERGED_RESULT = new
ExpectedResult(
new String[] {
"+I[Indica, null, 12, null, {Indica=1212.0}, [12.0], null, null,
null]",
- "+I[Danny, null, 23, +I[1, null, s1, 1, null, null], {Danny=2323.0},
[23.0, 23.0], null, null, null]",
- "+I[Stephen, null, 33, +I[2, null, s2, 2, null, null],
{Stephen=3333.0}, [33.0], null, null, null]",
- "+I[Julian, null, 53, +I[3, null, s3, 3, null, null],
{Julian=5353.0}, [53.0, 53.0], null, null, null]",
- "+I[Fabian, null, 31, +I[4, null, s4, 4, null, null],
{Fabian=3131.0}, [31.0], null, null, null]",
- "+I[Sophia, null, 18, +I[5, null, s5, 5, null, null],
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
- "+I[Emma, null, 20, +I[6, null, s6, 6, null, null], {Emma=2020.0},
[20.0], null, null, null]",
- "+I[Bob, null, 44, +I[7, null, s7, 7, null, null], {Bob=4444.0},
[44.0, 44.0], null, null, null]",
- "+I[Han, null, 56, +I[8, null, s8, 8, null, null], {Han=5656.0},
[56.0, 56.0, 56.0], null, null, null]",
- "+I[Alice, 90000.9, unknown, +I[9, 9, s9, 99, t9, drop_add9],
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
- "+I[Danny, 10000.1, 23, +I[1, 1, s1, 11, t1, drop_add1],
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
- "+I[Julian, 30000.3, 53, +I[3, 3, s3, 33, t3, drop_add3],
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
+ "+I[Danny, null, 23, +I[null, s1, 1, null, null, 1], {Danny=2323.0},
[23.0, 23.0], null, null, null]",
+ "+I[Stephen, null, 33, +I[null, s2, 2, null, null, 2],
{Stephen=3333.0}, [33.0], null, null, null]",
+ "+I[Julian, null, 53, +I[null, s3, 3, null, null, 3],
{Julian=5353.0}, [53.0, 53.0], null, null, null]",
+ "+I[Fabian, null, 31, +I[null, s4, 4, null, null, 4],
{Fabian=3131.0}, [31.0], null, null, null]",
+ "+I[Sophia, null, 18, +I[null, s5, 5, null, null, 5],
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
+ "+I[Emma, null, 20, +I[null, s6, 6, null, null, 6], {Emma=2020.0},
[20.0], null, null, null]",
+ "+I[Bob, null, 44, +I[null, s7, 7, null, null, 7], {Bob=4444.0},
[44.0, 44.0], null, null, null]",
+ "+I[Han, null, 56, +I[null, s8, 8, null, null, 8], {Han=5656.0},
[56.0, 56.0, 56.0], null, null, null]",
+ "+I[Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9],
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
+ "+I[Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1],
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
+ "+I[Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3],
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
},
new String[] {
"+I[id0, Indica, null, 12, null, {Indica=1212.0}, [12.0], null,
null, null]",
- "+I[id1, Danny, null, 23, +I[1, null, s1, 1, null, null],
{Danny=2323.0}, [23.0, 23.0], null, null, null]",
- "+I[id2, Stephen, null, 33, +I[2, null, s2, 2, null, null],
{Stephen=3333.0}, [33.0], null, null, null]",
- "+I[id3, Julian, null, 53, +I[3, null, s3, 3, null, null],
{Julian=5353.0}, [53.0, 53.0], null, null, null]",
- "+I[id4, Fabian, null, 31, +I[4, null, s4, 4, null, null],
{Fabian=3131.0}, [31.0], null, null, null]",
- "+I[id5, Sophia, null, 18, +I[5, null, s5, 5, null, null],
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
- "+I[id6, Emma, null, 20, +I[6, null, s6, 6, null, null],
{Emma=2020.0}, [20.0], null, null, null]",
- "+I[id7, Bob, null, 44, +I[7, null, s7, 7, null, null],
{Bob=4444.0}, [44.0, 44.0], null, null, null]",
- "+I[id8, Han, null, 56, +I[8, null, s8, 8, null, null],
{Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]",
- "+I[id9, Alice, 90000.9, unknown, +I[9, 9, s9, 99, t9, drop_add9],
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
- "+I[id1, Danny, 10000.1, 23, +I[1, 1, s1, 11, t1, drop_add1],
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
- "+I[id3, Julian, 30000.3, 53, +I[3, 3, s3, 33, t3, drop_add3],
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
+ "+I[id1, Danny, null, 23, +I[null, s1, 1, null, null, 1],
{Danny=2323.0}, [23.0, 23.0], null, null, null]",
+ "+I[id2, Stephen, null, 33, +I[null, s2, 2, null, null, 2],
{Stephen=3333.0}, [33.0], null, null, null]",
+ "+I[id3, Julian, null, 53, +I[null, s3, 3, null, null, 3],
{Julian=5353.0}, [53.0, 53.0], null, null, null]",
+ "+I[id4, Fabian, null, 31, +I[null, s4, 4, null, null, 4],
{Fabian=3131.0}, [31.0], null, null, null]",
+ "+I[id5, Sophia, null, 18, +I[null, s5, 5, null, null, 5],
{Sophia=1818.0}, [18.0, 18.0], null, null, null]",
+ "+I[id6, Emma, null, 20, +I[null, s6, 6, null, null, 6],
{Emma=2020.0}, [20.0], null, null, null]",
+ "+I[id7, Bob, null, 44, +I[null, s7, 7, null, null, 7],
{Bob=4444.0}, [44.0, 44.0], null, null, null]",
+ "+I[id8, Han, null, 56, +I[null, s8, 8, null, null, 8],
{Han=5656.0}, [56.0, 56.0, 56.0], null, null, null]",
+ "+I[id9, Alice, 90000.9, unknown, +I[9, s9, 99, t9, drop_add9, 9],
{Alice=9999.99}, [9999.0, 9999.0], +I[9, 9], [9], {k9=v9}]",
+ "+I[id1, Danny, 10000.1, 23, +I[1, s1, 11, t1, drop_add1, 1],
{Danny=2323.23}, [23.0, 23.0, 23.0], +I[1, 1], [1], {k1=v1}]",
+ "+I[id3, Julian, 30000.3, 53, +I[3, s3, 33, t3, drop_add3, 3],
{Julian=5353.53}, [53.0], +I[3, 3], [3], {k3=v3}]",
},
new String[] {
"+I[1]",
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index b4f769fcc00..71295d93b10 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -110,12 +110,12 @@ public class TestConfigurations {
DataTypes.FIELD("salary", DataTypes.DOUBLE()), // new field
DataTypes.FIELD("ts", DataTypes.TIMESTAMP(6)),
DataTypes.FIELD("f_struct", DataTypes.ROW(
- DataTypes.FIELD("f0", DataTypes.INT()),
DataTypes.FIELD("f2", DataTypes.INT()), // new field added in
the middle of struct
DataTypes.FIELD("f1", DataTypes.STRING()),
DataTypes.FIELD("renamed_change_type", DataTypes.BIGINT()),
DataTypes.FIELD("f3", DataTypes.STRING()),
- DataTypes.FIELD("drop_add", DataTypes.STRING()))), // new field
added at the end of struct
+ DataTypes.FIELD("drop_add", DataTypes.STRING()),
+ DataTypes.FIELD("f0", DataTypes.DECIMAL(20, 0)))),
DataTypes.FIELD("f_map", DataTypes.MAP(DataTypes.STRING(),
DataTypes.DOUBLE())),
DataTypes.FIELD("f_array", DataTypes.ARRAY(DataTypes.DOUBLE())),
DataTypes.FIELD("new_row_col", DataTypes.ROW(
diff --git
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index ed63bae6749..76aa827a84a 100644
---
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -20,9 +20,9 @@ package org.apache.hudi.table.format.cow;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapDecimalVector;
import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
-import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
@@ -65,7 +65,6 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.ParquetRuntimeException;
@@ -234,17 +233,18 @@ public class ParquetSplitReaderUtil {
}
return lv;
case DECIMAL:
- DecimalType decimalType = (DecimalType) type;
- int precision = decimalType.getPrecision();
- int scale = decimalType.getScale();
- DecimalData decimal = value == null
- ? null
- :
Preconditions.checkNotNull(DecimalData.fromBigDecimal((BigDecimal) value,
precision, scale));
- ColumnVector internalVector = createVectorFromConstant(
- new VarBinaryType(),
- decimal == null ? null : decimal.toUnscaledBytes(),
- batchSize);
- return new ParquetDecimalVector(internalVector);
+ HeapDecimalVector decv = new HeapDecimalVector(batchSize);
+ if (value == null) {
+ decv.fillWithNulls();
+ } else {
+ DecimalType decimalType = (DecimalType) type;
+ int precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ DecimalData decimal = Preconditions.checkNotNull(
+ DecimalData.fromBigDecimal((BigDecimal) value, precision,
scale));
+ decv.fill(decimal.toUnscaledBytes());
+ }
+ return decv;
case FLOAT:
HeapFloatVector fv = new HeapFloatVector(batchSize);
if (value == null) {
@@ -513,7 +513,7 @@ public class ParquetSplitReaderUtil {
|| typeName == PrimitiveType.PrimitiveTypeName.BINARY)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s", typeName);
- return new HeapBytesVector(batchSize);
+ return new HeapDecimalVector(batchSize);
case ARRAY:
ArrayType arrayType = (ArrayType) fieldType;
return new HeapArrayVector(
diff --git
a/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
new file mode 100644
index 00000000000..06cf200a841
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.14.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.vector.DecimalColumnVector;
+import org.apache.flink.table.data.vector.heap.HeapBytesVector;
+
+/**
+ * This class represents a nullable heap map decimal vector.
+ */
+public class HeapDecimalVector extends HeapBytesVector implements
DecimalColumnVector {
+
+ public HeapDecimalVector(int len) {
+ super(len);
+ }
+
+ @Override
+ public DecimalData getDecimal(int i, int precision, int scale) {
+ return DecimalData.fromUnscaledBytes(
+ this.getBytes(i).getBytes(), precision, scale);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 9bf5390ee26..1b636c63b2f 100644
---
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -20,9 +20,9 @@ package org.apache.hudi.table.format.cow;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapDecimalVector;
import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
-import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
@@ -65,7 +65,6 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.ParquetRuntimeException;
@@ -234,17 +233,18 @@ public class ParquetSplitReaderUtil {
}
return lv;
case DECIMAL:
- DecimalType decimalType = (DecimalType) type;
- int precision = decimalType.getPrecision();
- int scale = decimalType.getScale();
- DecimalData decimal = value == null
- ? null
- :
Preconditions.checkNotNull(DecimalData.fromBigDecimal((BigDecimal) value,
precision, scale));
- ColumnVector internalVector = createVectorFromConstant(
- new VarBinaryType(),
- decimal == null ? null : decimal.toUnscaledBytes(),
- batchSize);
- return new ParquetDecimalVector(internalVector);
+ HeapDecimalVector decv = new HeapDecimalVector(batchSize);
+ if (value == null) {
+ decv.fillWithNulls();
+ } else {
+ DecimalType decimalType = (DecimalType) type;
+ int precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ DecimalData decimal = Preconditions.checkNotNull(
+ DecimalData.fromBigDecimal((BigDecimal) value, precision,
scale));
+ decv.fill(decimal.toUnscaledBytes());
+ }
+ return decv;
case FLOAT:
HeapFloatVector fv = new HeapFloatVector(batchSize);
if (value == null) {
@@ -513,7 +513,7 @@ public class ParquetSplitReaderUtil {
|| typeName == PrimitiveType.PrimitiveTypeName.BINARY)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s", typeName);
- return new HeapBytesVector(batchSize);
+ return new HeapDecimalVector(batchSize);
case ARRAY:
ArrayType arrayType = (ArrayType) fieldType;
return new HeapArrayVector(
diff --git
a/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
new file mode 100644
index 00000000000..fdc55ac18fc
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.15.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.columnar.vector.DecimalColumnVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector;
+
+/**
+ * This class represents a nullable heap map decimal vector.
+ */
+public class HeapDecimalVector extends HeapBytesVector implements
DecimalColumnVector {
+
+ public HeapDecimalVector(int len) {
+ super(len);
+ }
+
+ @Override
+ public DecimalData getDecimal(int i, int precision, int scale) {
+ return DecimalData.fromUnscaledBytes(
+ this.getBytes(i).getBytes(), precision, scale);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 9bf5390ee26..1b636c63b2f 100644
---
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -20,9 +20,9 @@ package org.apache.hudi.table.format.cow;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapDecimalVector;
import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
-import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
@@ -65,7 +65,6 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.ParquetRuntimeException;
@@ -234,17 +233,18 @@ public class ParquetSplitReaderUtil {
}
return lv;
case DECIMAL:
- DecimalType decimalType = (DecimalType) type;
- int precision = decimalType.getPrecision();
- int scale = decimalType.getScale();
- DecimalData decimal = value == null
- ? null
- :
Preconditions.checkNotNull(DecimalData.fromBigDecimal((BigDecimal) value,
precision, scale));
- ColumnVector internalVector = createVectorFromConstant(
- new VarBinaryType(),
- decimal == null ? null : decimal.toUnscaledBytes(),
- batchSize);
- return new ParquetDecimalVector(internalVector);
+ HeapDecimalVector decv = new HeapDecimalVector(batchSize);
+ if (value == null) {
+ decv.fillWithNulls();
+ } else {
+ DecimalType decimalType = (DecimalType) type;
+ int precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ DecimalData decimal = Preconditions.checkNotNull(
+ DecimalData.fromBigDecimal((BigDecimal) value, precision,
scale));
+ decv.fill(decimal.toUnscaledBytes());
+ }
+ return decv;
case FLOAT:
HeapFloatVector fv = new HeapFloatVector(batchSize);
if (value == null) {
@@ -513,7 +513,7 @@ public class ParquetSplitReaderUtil {
|| typeName == PrimitiveType.PrimitiveTypeName.BINARY)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s", typeName);
- return new HeapBytesVector(batchSize);
+ return new HeapDecimalVector(batchSize);
case ARRAY:
ArrayType arrayType = (ArrayType) fieldType;
return new HeapArrayVector(
diff --git
a/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
new file mode 100644
index 00000000000..fdc55ac18fc
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.16.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.columnar.vector.DecimalColumnVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector;
+
+/**
+ * This class represents a nullable heap map decimal vector.
+ */
+public class HeapDecimalVector extends HeapBytesVector implements
DecimalColumnVector {
+
+ public HeapDecimalVector(int len) {
+ super(len);
+ }
+
+ @Override
+ public DecimalData getDecimal(int i, int precision, int scale) {
+ return DecimalData.fromUnscaledBytes(
+ this.getBytes(i).getBytes(), precision, scale);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 9bf5390ee26..1b636c63b2f 100644
---
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -20,9 +20,9 @@ package org.apache.hudi.table.format.cow;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapDecimalVector;
import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
-import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
@@ -65,7 +65,6 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.ParquetRuntimeException;
@@ -234,17 +233,18 @@ public class ParquetSplitReaderUtil {
}
return lv;
case DECIMAL:
- DecimalType decimalType = (DecimalType) type;
- int precision = decimalType.getPrecision();
- int scale = decimalType.getScale();
- DecimalData decimal = value == null
- ? null
- :
Preconditions.checkNotNull(DecimalData.fromBigDecimal((BigDecimal) value,
precision, scale));
- ColumnVector internalVector = createVectorFromConstant(
- new VarBinaryType(),
- decimal == null ? null : decimal.toUnscaledBytes(),
- batchSize);
- return new ParquetDecimalVector(internalVector);
+ HeapDecimalVector decv = new HeapDecimalVector(batchSize);
+ if (value == null) {
+ decv.fillWithNulls();
+ } else {
+ DecimalType decimalType = (DecimalType) type;
+ int precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ DecimalData decimal = Preconditions.checkNotNull(
+ DecimalData.fromBigDecimal((BigDecimal) value, precision,
scale));
+ decv.fill(decimal.toUnscaledBytes());
+ }
+ return decv;
case FLOAT:
HeapFloatVector fv = new HeapFloatVector(batchSize);
if (value == null) {
@@ -513,7 +513,7 @@ public class ParquetSplitReaderUtil {
|| typeName == PrimitiveType.PrimitiveTypeName.BINARY)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s", typeName);
- return new HeapBytesVector(batchSize);
+ return new HeapDecimalVector(batchSize);
case ARRAY:
ArrayType arrayType = (ArrayType) fieldType;
return new HeapArrayVector(
diff --git
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
new file mode 100644
index 00000000000..fdc55ac18fc
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.columnar.vector.DecimalColumnVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector;
+
+/**
+ * This class represents a nullable heap map decimal vector.
+ */
+public class HeapDecimalVector extends HeapBytesVector implements
DecimalColumnVector {
+
+ public HeapDecimalVector(int len) {
+ super(len);
+ }
+
+ @Override
+ public DecimalData getDecimal(int i, int precision, int scale) {
+ return DecimalData.fromUnscaledBytes(
+ this.getBytes(i).getBytes(), precision, scale);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 9bf5390ee26..1b636c63b2f 100644
---
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -20,9 +20,9 @@ package org.apache.hudi.table.format.cow;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapDecimalVector;
import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
-import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
import
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
@@ -65,7 +65,6 @@ import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
-import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.ParquetRuntimeException;
@@ -234,17 +233,18 @@ public class ParquetSplitReaderUtil {
}
return lv;
case DECIMAL:
- DecimalType decimalType = (DecimalType) type;
- int precision = decimalType.getPrecision();
- int scale = decimalType.getScale();
- DecimalData decimal = value == null
- ? null
- :
Preconditions.checkNotNull(DecimalData.fromBigDecimal((BigDecimal) value,
precision, scale));
- ColumnVector internalVector = createVectorFromConstant(
- new VarBinaryType(),
- decimal == null ? null : decimal.toUnscaledBytes(),
- batchSize);
- return new ParquetDecimalVector(internalVector);
+ HeapDecimalVector decv = new HeapDecimalVector(batchSize);
+ if (value == null) {
+ decv.fillWithNulls();
+ } else {
+ DecimalType decimalType = (DecimalType) type;
+ int precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ DecimalData decimal = Preconditions.checkNotNull(
+ DecimalData.fromBigDecimal((BigDecimal) value, precision,
scale));
+ decv.fill(decimal.toUnscaledBytes());
+ }
+ return decv;
case FLOAT:
HeapFloatVector fv = new HeapFloatVector(batchSize);
if (value == null) {
@@ -513,7 +513,7 @@ public class ParquetSplitReaderUtil {
|| typeName == PrimitiveType.PrimitiveTypeName.BINARY)
&& primitiveType.getOriginalType() == OriginalType.DECIMAL,
"Unexpected type: %s", typeName);
- return new HeapBytesVector(batchSize);
+ return new HeapDecimalVector(batchSize);
case ARRAY:
ArrayType arrayType = (ArrayType) fieldType;
return new HeapArrayVector(
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
new file mode 100644
index 00000000000..fdc55ac18fc
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapDecimalVector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.columnar.vector.DecimalColumnVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector;
+
+/**
+ * This class represents a nullable heap map decimal vector.
+ */
+public class HeapDecimalVector extends HeapBytesVector implements
DecimalColumnVector {
+
+ public HeapDecimalVector(int len) {
+ super(len);
+ }
+
+ @Override
+ public DecimalData getDecimal(int i, int precision, int scale) {
+ return DecimalData.fromUnscaledBytes(
+ this.getBytes(i).getBytes(), precision, scale);
+ }
+}