This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 26db324de [core] support user define delimiter with list agg (#3930)
26db324de is described below
commit 26db324dec911fe6fd72166dcc4326f4414b0dc7
Author: wangwj <[email protected]>
AuthorDate: Sun Aug 11 19:41:40 2024 +0800
[core] support user define delimiter with list agg (#3930)
---
.../primary-key-table/merge-engine/aggregation.md | 1 +
.../main/java/org/apache/paimon/CoreOptions.java | 9 +++
.../compact/aggregate/FieldAggregator.java | 2 +-
.../compact/aggregate/FieldListaggAgg.java | 9 +--
.../mergetree/compact/IntervalPartitionTest.java | 3 +-
.../aggregate/AggregateMergeFunctionTest.java | 83 ++++++++++++++++++++++
.../compact/aggregate/FieldAggregatorTest.java | 23 +++++-
7 files changed, 121 insertions(+), 9 deletions(-)
diff --git a/docs/content/primary-key-table/merge-engine/aggregation.md
b/docs/content/primary-key-table/merge-engine/aggregation.md
index c009d1d7a..7cdd9044a 100644
--- a/docs/content/primary-key-table/merge-engine/aggregation.md
+++ b/docs/content/primary-key-table/merge-engine/aggregation.md
@@ -93,6 +93,7 @@ Current supported aggregate functions and data types are:
### listagg
The listagg function concatenates multiple string values into a single
string.
It supports STRING data type.
+ Each field not part of the primary keys can be given a list agg delimiter,
specified by the fields.<field-name>.list-agg-delimiter table property,
otherwise it will use "," as default.
### bool_and
The bool_and function evaluates whether all values in a boolean set are true.
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index aa4bcb685..c48527d2a 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -81,6 +81,8 @@ public class CoreOptions implements Serializable {
public static final String DISTINCT = "distinct";
+ public static final String LIST_AGG_DELIMITER = "list-agg-delimiter";
+
public static final String FILE_INDEX = "file-index";
public static final String COLUMNS = "columns";
@@ -1474,6 +1476,13 @@ public class CoreOptions implements Serializable {
.defaultValue(false));
}
+ public String fieldListAggDelimiter(String fieldName) {
+ return options.get(
+ key(FIELDS_PREFIX + "." + fieldName + "." + LIST_AGG_DELIMITER)
+ .stringType()
+ .defaultValue(","));
+ }
+
@Nullable
public String fileCompression() {
return options.get(FILE_COMPRESSION);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
index 77eeecd8d..8d0f5b79f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
@@ -74,7 +74,7 @@ public abstract class FieldAggregator implements Serializable
{
fieldAggregator = new FieldLastValueAgg(fieldType);
break;
case FieldListaggAgg.NAME:
- fieldAggregator = new FieldListaggAgg(fieldType);
+ fieldAggregator = new FieldListaggAgg(fieldType,
options, field);
break;
case FieldBoolOrAgg.NAME:
fieldAggregator = new FieldBoolOrAgg(fieldType);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
index a4937a929..e0286bbb7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
@@ -18,6 +18,7 @@
package org.apache.paimon.mergetree.compact.aggregate;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.StringUtils;
@@ -27,11 +28,11 @@ public class FieldListaggAgg extends FieldAggregator {
public static final String NAME = "listagg";
- // TODO: make it configurable by with clause
- public static final String DELIMITER = ",";
+ private final String delimiter;
- public FieldListaggAgg(DataType dataType) {
+ public FieldListaggAgg(DataType dataType, CoreOptions options, String
field) {
super(dataType);
+ this.delimiter = options.fieldListAggDelimiter(field);
}
@Override
@@ -54,7 +55,7 @@ public class FieldListaggAgg extends FieldAggregator {
BinaryString inFieldSD = (BinaryString) inputField;
concatenate =
StringUtils.concat(
- mergeFieldSD,
BinaryString.fromString(DELIMITER), inFieldSD);
+ mergeFieldSD,
BinaryString.fromString(delimiter), inFieldSD);
break;
default:
String msg =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
index 4d4117c7d..05fce451a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
@@ -48,8 +48,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link IntervalPartition}. */
public class IntervalPartitionTest {
- private static final RecordComparator COMPARATOR =
- (RecordComparator) (o1, o2) -> o1.getInt(0) - o2.getInt(0);
+ private static final RecordComparator COMPARATOR = (o1, o2) ->
o1.getInt(0) - o2.getInt(0);
@Test
public void testSameMinKey() {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
index 4a5f5f8df..70cdbe310 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.mergetree.compact.aggregate;
import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.options.Options;
@@ -35,6 +36,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Test for aggregate merge function. */
class AggregateMergeFunctionTest {
+
@Test
void testDefaultAggFunc() {
Options options = new Options();
@@ -62,8 +64,89 @@ class AggregateMergeFunctionTest {
assertThat(aggregateFunction.getResult().value()).isEqualTo(GenericRow.of(1, 2,
13, 1, 1));
}
+ @Test
+ void tesListAggFunc() {
+ Options options = new Options();
+ options.set("fields.a.aggregate-function", "listagg");
+ options.set("fields.b.aggregate-function", "listagg");
+ options.set("fields.b.list-agg-delimiter", "-");
+ options.set("fields.d.aggregate-function", "listagg");
+ options.set("fields.d.list-agg-delimiter", "/");
+
+ MergeFunction<KeyValue> aggregateFunction =
+ AggregateMergeFunction.factory(
+ options,
+ Arrays.asList("k", "a", "b", "c", "d"),
+ Arrays.asList(
+ DataTypes.INT(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.STRING()),
+ Collections.singletonList("k"))
+ .create();
+ aggregateFunction.reset();
+
+ aggregateFunction.add(
+ value(
+ 1,
+ BinaryString.fromString("1"),
+ BinaryString.fromString("1"),
+ 1,
+ BinaryString.fromString("1")));
+ aggregateFunction.add(
+ value(
+ 1,
+ BinaryString.fromString("2"),
+ BinaryString.fromString("2"),
+ 2,
+ BinaryString.fromString("2")));
+ aggregateFunction.add(
+ value(
+ 1,
+ BinaryString.fromString("3"),
+ BinaryString.fromString("3"),
+ 3,
+ BinaryString.fromString("3")));
+ aggregateFunction.add(
+ value(
+ 1,
+ BinaryString.fromString("4"),
+ BinaryString.fromString("4"),
+ 4,
+ BinaryString.fromString("4")));
+ aggregateFunction.add(
+ value(
+ 1,
+ BinaryString.fromString("5"),
+ BinaryString.fromString("5"),
+ 5,
+ BinaryString.fromString("5")));
+ assertThat(aggregateFunction.getResult().value())
+ .isEqualTo(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("1,2,3,4,5"),
+ BinaryString.fromString("1-2-3-4-5"),
+ 5,
+ BinaryString.fromString("1/2/3/4/5")));
+ }
+
private KeyValue value(Integer... values) {
return new KeyValue()
.replace(GenericRow.of(values[0]), RowKind.INSERT,
GenericRow.of(values));
}
+
+ private KeyValue value(
+ Integer value1,
+ BinaryString value2,
+ BinaryString value3,
+ Integer value4,
+ BinaryString value5) {
+ return new KeyValue()
+ .replace(
+ GenericRow.of(value1),
+ RowKind.INSERT,
+ GenericRow.of(value1, value2, value3, value4, value5));
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
index 3140ba5f1..7fae50622 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.mergetree.compact.aggregate;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
@@ -43,6 +44,8 @@ import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.RoaringBitmap32;
import org.apache.paimon.utils.RoaringBitmap64;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -123,14 +126,30 @@ public class FieldAggregatorTest {
}
@Test
- public void testFieldListaggAgg() {
- FieldListaggAgg fieldListaggAgg = new FieldListaggAgg(new
VarCharType());
+ public void testFieldListAggWithDefaultDelimiter() {
+ FieldListaggAgg fieldListaggAgg =
+ new FieldListaggAgg(
+ new VarCharType(), new CoreOptions(new HashMap<>()),
"fieldName");
BinaryString accumulator = BinaryString.fromString("user1");
BinaryString inputField = BinaryString.fromString("user2");
assertThat(fieldListaggAgg.agg(accumulator, inputField).toString())
.isEqualTo("user1,user2");
}
+ @Test
+ public void testFieldListAggWithCustomDelimiter() {
+ FieldListaggAgg fieldListaggAgg =
+ new FieldListaggAgg(
+ new VarCharType(),
+ CoreOptions.fromMap(
+
ImmutableMap.of("fields.fieldName.list-agg-delimiter", "-")),
+ "fieldName");
+ BinaryString accumulator = BinaryString.fromString("user1");
+ BinaryString inputField = BinaryString.fromString("user2");
+ assertThat(fieldListaggAgg.agg(accumulator, inputField).toString())
+ .isEqualTo("user1-user2");
+ }
+
@Test
public void testFieldMaxAgg() {
FieldMaxAgg fieldMaxAgg = new FieldMaxAgg(new IntType());