This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 40d373b86 [core] support to compare varchar/char type in aggregate
merge engine (#2412)
40d373b86 is described below
commit 40d373b865e3d09f4c8f875318388b9485747924
Author: Aitozi <[email protected]>
AuthorDate: Wed Nov 29 10:15:30 2023 +0800
[core] support to compare varchar/char type in aggregate merge engine
(#2412)
---
docs/content/concepts/primary-key-table.md | 2 +-
.../org/apache/paimon/utils/InternalRowUtils.java | 6 +-
.../apache/paimon/utils/InternalRowUtilsTest.java | 9 +++
.../apache/paimon/flink/PreAggregationITCase.java | 92 ++++++++++++++--------
4 files changed, 73 insertions(+), 36 deletions(-)
diff --git a/docs/content/concepts/primary-key-table.md
b/docs/content/concepts/primary-key-table.md
index c6937efed..8c3b69516 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -283,7 +283,7 @@ Field `price` will be aggregated by the `max` function, and
field `sales` will b
Current supported aggregate functions and data types are:
* `sum`: supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT and
DOUBLE.
-* `min`/`max`: support DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT,
DOUBLE, DATE, TIME, TIMESTAMP and TIMESTAMP_LTZ.
+* `min`/`max`: support CHAR, VARCHAR, DECIMAL, TINYINT, SMALLINT, INTEGER,
BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP and TIMESTAMP_LTZ.
* `last_value` / `last_non_null_value`: support all data types.
* `listagg`: supports STRING data type.
* `bool_and` / `bool_or`: support BOOLEAN data type.
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java
index 649596f45..e3ac9ddfe 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowUtils.java
@@ -279,8 +279,12 @@ public class InternalRowUtils {
case VARBINARY:
ret = byteArrayCompare((byte[]) x, (byte[]) y);
break;
+ case VARCHAR:
+ case CHAR:
+ ret = ((BinaryString) x).compareTo((BinaryString) y);
+ break;
default:
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException("Incomparable type: " +
type);
}
return ret;
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowUtilsTest.java
b/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowUtilsTest.java
index 7cf9a492d..eb3a6d116 100644
---
a/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowUtilsTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowUtilsTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.utils;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
@@ -134,5 +135,13 @@ public class InternalRowUtilsTest {
// test TIME_WITHOUT_TIME_ZONE data type
assertThat(InternalRowUtils.compare(165, 168,
DataTypeRoot.TIME_WITHOUT_TIME_ZONE))
.isLessThan(0);
+
+ // test VARCHAR type
+ assertThat(
+ InternalRowUtils.compare(
+ BinaryString.fromString("a"),
+ BinaryString.fromString("b"),
+ DataTypeRoot.VARCHAR))
+ .isLessThan(0);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
index 195b4ab10..170d5d5fc 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
@@ -364,6 +364,8 @@ public class PreAggregationITCase {
+ "h DOUBLE,"
+ "i DATE,"
+ "l TIMESTAMP,"
+ + "m CHAR(1),"
+ + "n VARCHAR,"
+ "PRIMARY KEY (j,k) NOT ENFORCED)"
+ " WITH ('merge-engine'='aggregation', "
+ "'fields.a.aggregate-function'='min', "
@@ -374,7 +376,9 @@ public class PreAggregationITCase {
+ "'fields.f.aggregate-function'='min',"
+ "'fields.h.aggregate-function'='min',"
+ "'fields.i.aggregate-function'='min',"
- + "'fields.l.aggregate-function'='min'"
+ + "'fields.l.aggregate-function'='min',"
+ + "'fields.m.aggregate-function'='min',"
+ + "'fields.n.aggregate-function'='min'"
+ ");");
}
@@ -384,13 +388,13 @@ public class PreAggregationITCase {
"INSERT INTO T3 VALUES "
+ "(1, 2, CAST(NULL AS INT), 1.01, CAST(-1 AS
TINYINT), CAST(-1 AS SMALLINT), "
+ "CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS
DOUBLE), CAST('2020-01-01' AS DATE), "
- + "CAST('2021-01-01 01:01:01' AS TIMESTAMP)),"
+ + "CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a',
'aaa'),"
+ "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), "
+ "CAST(100000 AS BIGINT), -1.11, CAST(1.21 AS
DOUBLE), CAST('2020-01-02' AS DATE), "
- + "CAST('2022-01-01 01:01:01' AS TIMESTAMP)), "
+ + "CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b',
'bbb'), "
+ "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS
SMALLINT), "
+ "CAST(10000000 AS BIGINT), 0, CAST(-1.11 AS
DOUBLE), CAST('2022-01-02' AS DATE), "
- + "CAST('2022-01-01 02:00:00' AS TIMESTAMP))");
+ + "CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c',
'ccc')");
List<Row> result = batchSql("SELECT * FROM T3");
assertThat(result)
.containsExactlyInAnyOrder(
@@ -405,7 +409,9 @@ public class PreAggregationITCase {
(float) -1.11,
-1.11,
LocalDate.of(2020, 1, 1),
- LocalDateTime.of(2021, 1, 1, 1, 1, 1)));
+ LocalDateTime.of(2021, 1, 1, 1, 1, 1),
+ "a",
+ "aaa"));
}
@Test
@@ -413,15 +419,15 @@ public class PreAggregationITCase {
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
- + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01'
AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP))");
+ + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01'
AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')");
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), CAST(100000 AS BIGINT), "
- + "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02'
AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP))");
+ + "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02'
AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')");
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 2, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS
SMALLINT), CAST(10000000 AS BIGINT), "
- + "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP))");
+ + "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')");
List<Row> result = batchSql("SELECT * FROM T3");
assertThat(result)
@@ -437,7 +443,9 @@ public class PreAggregationITCase {
(float) -1.11,
-1.11,
LocalDate.of(2020, 1, 1),
- LocalDateTime.of(2021, 1, 1, 1, 1, 1)));
+ LocalDateTime.of(2021, 1, 1, 1, 1, 1),
+ "a",
+ "aaa"));
}
@Test
@@ -449,29 +457,29 @@ public class PreAggregationITCase {
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
- + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01'
AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP))");
+ + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01'
AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')");
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), CAST(100000 AS BIGINT), "
- + "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02'
AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP))");
+ + "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02'
AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')");
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 2, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS
SMALLINT), CAST(10000000 AS BIGINT), "
- + "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP))");
+ + "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')");
// key 1 3
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 3, CAST(NULL AS INT), 1.01, CAST(1 AS
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
- + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01'
AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP))");
+ + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01'
AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')");
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 3, 6, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), CAST(100000 AS BIGINT), "
- + "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02'
AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP))");
+ + "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02'
AS DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b', 'bbb')");
batchSql(
"INSERT INTO T3 VALUES "
+ "(1, 3, 3, 10.00, CAST(-1 AS TINYINT), CAST(1 AS
SMALLINT), CAST(10000000 AS BIGINT), "
- + "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP))");
+ + "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c', 'ccc')");
assertThat(batchSql("SELECT * FROM T3"))
.containsExactlyInAnyOrder(
@@ -486,7 +494,9 @@ public class PreAggregationITCase {
(float) -1.11,
-1.11,
LocalDate.of(2020, 1, 1),
- LocalDateTime.of(2021, 1, 1, 1, 1, 1)),
+ LocalDateTime.of(2021, 1, 1, 1, 1, 1),
+ "a",
+ "aaa"),
Row.of(
1,
3,
@@ -498,7 +508,9 @@ public class PreAggregationITCase {
(float) -1.11,
-1.11,
LocalDate.of(2020, 1, 1),
- LocalDateTime.of(2021, 1, 1, 1, 1, 1)));
+ LocalDateTime.of(2021, 1, 1, 1, 1, 1),
+ "a",
+ "aaa"));
}
@Test
@@ -525,6 +537,8 @@ public class PreAggregationITCase {
+ "h DOUBLE,"
+ "i DATE,"
+ "l TIMESTAMP,"
+ + "m CHAR,"
+ + "n VARCHAR,"
+ "PRIMARY KEY (j,k) NOT ENFORCED)"
+ " WITH ('merge-engine'='aggregation', "
+ "'fields.a.aggregate-function'='max', "
@@ -535,7 +549,9 @@ public class PreAggregationITCase {
+ "'fields.f.aggregate-function'='max',"
+ "'fields.h.aggregate-function'='max',"
+ "'fields.i.aggregate-function'='max',"
- + "'fields.l.aggregate-function'='max'"
+ + "'fields.l.aggregate-function'='max',"
+ + "'fields.m.aggregate-function'='max',"
+ + "'fields.n.aggregate-function'='max'"
+ ");");
}
@@ -545,13 +561,13 @@ public class PreAggregationITCase {
"INSERT INTO T2 VALUES "
+ "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS
TINYINT), CAST(-1 AS SMALLINT), "
+ "CAST(1000 AS BIGINT), 1.11, CAST(1.11 AS
DOUBLE), CAST('2020-01-01' AS DATE), "
- + "CAST('2021-01-01 01:01:01' AS TIMESTAMP)),"
+ + "CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a',
'aaa'),"
+ "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), CAST(100000 AS BIGINT), "
+ "-1.11, CAST(1.21 AS DOUBLE), CAST('2020-01-02'
AS DATE), "
- + "CAST('2022-01-01 01:01:01' AS TIMESTAMP)), "
+ + "CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b',
'bbb'), "
+ "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS
SMALLINT), CAST(10000000 AS BIGINT), "
+ "0, CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), "
- + "CAST('2022-01-01 02:00:00' AS TIMESTAMP))");
+ + "CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c',
'ccc')");
List<Row> result = batchSql("SELECT * FROM T2");
assertThat(result)
.containsExactlyInAnyOrder(
@@ -566,7 +582,9 @@ public class PreAggregationITCase {
(float) 1.11,
1.21,
LocalDate.of(2022, 1, 2),
- LocalDateTime.of(2022, 1, 1, 2, 0, 0)));
+ LocalDateTime.of(2022, 1, 1, 2, 0, 0),
+ "c",
+ "ccc"));
}
@Test
@@ -575,17 +593,17 @@ public class PreAggregationITCase {
"INSERT INTO T2 VALUES "
+ "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
+ "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01'
AS DATE), "
- + "CAST('2021-01-01 01:01:01' AS TIMESTAMP))");
+ + "CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a',
'aaa')");
batchSql(
"INSERT INTO T2 VALUES "
+ "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), CAST(100000 AS BIGINT), -1.11, "
+ "CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS
DATE), "
- + "CAST('2022-01-01 01:01:01' AS TIMESTAMP))");
+ + "CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'b',
'bbb')");
batchSql(
"INSERT INTO T2 VALUES "
+ "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS
SMALLINT), CAST(10000000 AS BIGINT), 0, "
+ "CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), "
- + "CAST('2022-01-01 02:00:00' AS TIMESTAMP))");
+ + "CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'c',
'ccc')");
List<Row> result = batchSql("SELECT * FROM T2");
assertThat(result)
@@ -601,7 +619,9 @@ public class PreAggregationITCase {
(float) 1.11,
1.21,
LocalDate.of(2022, 1, 2),
- LocalDateTime.of(2022, 1, 1, 2, 0, 0)));
+ LocalDateTime.of(2022, 1, 1, 2, 0, 0),
+ "c",
+ "ccc"));
}
@Test
@@ -613,29 +633,29 @@ public class PreAggregationITCase {
batchSql(
"INSERT INTO T2 VALUES "
+ "(1, 2, CAST(NULL AS INT), 1.01, CAST(1 AS
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
- + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01'
AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP))");
+ + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01'
AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')");
batchSql(
"INSERT INTO T2 VALUES "
+ "(1, 2, 2, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), CAST(100000 AS BIGINT), -1.11, "
- + "CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS
DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP))");
+ + "CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS
DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'c', 'ccc')");
batchSql(
"INSERT INTO T2 VALUES "
+ "(1, 2, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS
SMALLINT), CAST(10000000 AS BIGINT), 0, "
- + "CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP))");
+ + "CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'b', 'bbb')");
// key 1 3
batchSql(
"INSERT INTO T2 VALUES "
+ "(1, 3, CAST(NULL AS INT), 1.01, CAST(1 AS
TINYINT), CAST(-1 AS SMALLINT), CAST(1000 AS BIGINT), "
- + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01'
AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP))");
+ + "1.11, CAST(1.11 AS DOUBLE), CAST('2020-01-01'
AS DATE), CAST('2021-01-01 01:01:01' AS TIMESTAMP), 'a', 'aaa')");
batchSql(
"INSERT INTO T2 VALUES "
+ "(1, 3, 6, 1.10, CAST(2 AS TINYINT), CAST(2 AS
SMALLINT), CAST(100000 AS BIGINT), -1.11, "
- + "CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS
DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP))");
+ + "CAST(1.21 AS DOUBLE), CAST('2020-01-02' AS
DATE), CAST('2022-01-01 01:01:01' AS TIMESTAMP), 'c', 'ccc')");
batchSql(
"INSERT INTO T2 VALUES "
+ "(1, 3, 3, 10.00, CAST(1 AS TINYINT), CAST(1 AS
SMALLINT), CAST(10000000 AS BIGINT), 0, "
- + "CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP))");
+ + "CAST(-1.11 AS DOUBLE), CAST('2022-01-02' AS
DATE), CAST('2022-01-01 02:00:00' AS TIMESTAMP), 'b', 'bbb')");
assertThat(batchSql("SELECT * FROM T2"))
.containsExactlyInAnyOrder(
@@ -650,7 +670,9 @@ public class PreAggregationITCase {
(float) 1.11,
1.21,
LocalDate.of(2022, 1, 2),
- LocalDateTime.of(2022, 1, 1, 2, 0, 0)),
+ LocalDateTime.of(2022, 1, 1, 2, 0, 0),
+ "c",
+ "ccc"),
Row.of(
1,
3,
@@ -662,7 +684,9 @@ public class PreAggregationITCase {
(float) 1.11,
1.21,
LocalDate.of(2022, 1, 2),
- LocalDateTime.of(2022, 1, 1, 2, 0, 0)));
+ LocalDateTime.of(2022, 1, 1, 2, 0, 0),
+ "c",
+ "ccc" + ""));
}
@Test