This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 26db324de [core] support user define delimiter with list agg (#3930)
26db324de is described below

commit 26db324dec911fe6fd72166dcc4326f4414b0dc7
Author: wangwj <[email protected]>
AuthorDate: Sun Aug 11 19:41:40 2024 +0800

    [core] support user define delimiter with list agg (#3930)
---
 .../primary-key-table/merge-engine/aggregation.md  |  1 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  9 +++
 .../compact/aggregate/FieldAggregator.java         |  2 +-
 .../compact/aggregate/FieldListaggAgg.java         |  9 +--
 .../mergetree/compact/IntervalPartitionTest.java   |  3 +-
 .../aggregate/AggregateMergeFunctionTest.java      | 83 ++++++++++++++++++++++
 .../compact/aggregate/FieldAggregatorTest.java     | 23 +++++-
 7 files changed, 121 insertions(+), 9 deletions(-)

diff --git a/docs/content/primary-key-table/merge-engine/aggregation.md 
b/docs/content/primary-key-table/merge-engine/aggregation.md
index c009d1d7a..7cdd9044a 100644
--- a/docs/content/primary-key-table/merge-engine/aggregation.md
+++ b/docs/content/primary-key-table/merge-engine/aggregation.md
@@ -93,6 +93,7 @@ Current supported aggregate functions and data types are:
 ### listagg
   The listagg function concatenates multiple string values into a single 
string.
   It supports STRING data type.
+  Each field not part of the primary keys can be given a list agg delimiter, 
specified by the fields.<field-name>.list-agg-delimiter table property, 
otherwise it will use "," as default.
 
 ### bool_and
   The bool_and function evaluates whether all values in a boolean set are true.
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index aa4bcb685..c48527d2a 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -81,6 +81,8 @@ public class CoreOptions implements Serializable {
 
     public static final String DISTINCT = "distinct";
 
+    public static final String LIST_AGG_DELIMITER = "list-agg-delimiter";
+
     public static final String FILE_INDEX = "file-index";
 
     public static final String COLUMNS = "columns";
@@ -1474,6 +1476,13 @@ public class CoreOptions implements Serializable {
                         .defaultValue(false));
     }
 
+    public String fieldListAggDelimiter(String fieldName) {
+        return options.get(
+                key(FIELDS_PREFIX + "." + fieldName + "." + LIST_AGG_DELIMITER)
+                        .stringType()
+                        .defaultValue(","));
+    }
+
     @Nullable
     public String fileCompression() {
         return options.get(FILE_COMPRESSION);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
index 77eeecd8d..8d0f5b79f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
@@ -74,7 +74,7 @@ public abstract class FieldAggregator implements Serializable 
{
                         fieldAggregator = new FieldLastValueAgg(fieldType);
                         break;
                     case FieldListaggAgg.NAME:
-                        fieldAggregator = new FieldListaggAgg(fieldType);
+                        fieldAggregator = new FieldListaggAgg(fieldType, 
options, field);
                         break;
                     case FieldBoolOrAgg.NAME:
                         fieldAggregator = new FieldBoolOrAgg(fieldType);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
index a4937a929..e0286bbb7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.mergetree.compact.aggregate;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.utils.StringUtils;
@@ -27,11 +28,11 @@ public class FieldListaggAgg extends FieldAggregator {
 
     public static final String NAME = "listagg";
 
-    // TODO: make it configurable by with clause
-    public static final String DELIMITER = ",";
+    private final String delimiter;
 
-    public FieldListaggAgg(DataType dataType) {
+    public FieldListaggAgg(DataType dataType, CoreOptions options, String 
field) {
         super(dataType);
+        this.delimiter = options.fieldListAggDelimiter(field);
     }
 
     @Override
@@ -54,7 +55,7 @@ public class FieldListaggAgg extends FieldAggregator {
                     BinaryString inFieldSD = (BinaryString) inputField;
                     concatenate =
                             StringUtils.concat(
-                                    mergeFieldSD, 
BinaryString.fromString(DELIMITER), inFieldSD);
+                                    mergeFieldSD, 
BinaryString.fromString(delimiter), inFieldSD);
                     break;
                 default:
                     String msg =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
index 4d4117c7d..05fce451a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java
@@ -48,8 +48,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for {@link IntervalPartition}. */
 public class IntervalPartitionTest {
 
-    private static final RecordComparator COMPARATOR =
-            (RecordComparator) (o1, o2) -> o1.getInt(0) - o2.getInt(0);
+    private static final RecordComparator COMPARATOR = (o1, o2) -> 
o1.getInt(0) - o2.getInt(0);
 
     @Test
     public void testSameMinKey() {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
index 4a5f5f8df..70cdbe310 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunctionTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.mergetree.compact.aggregate;
 
 import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.mergetree.compact.MergeFunction;
 import org.apache.paimon.options.Options;
@@ -35,6 +36,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for aggregate merge function. */
 class AggregateMergeFunctionTest {
+
     @Test
     void testDefaultAggFunc() {
         Options options = new Options();
@@ -62,8 +64,89 @@ class AggregateMergeFunctionTest {
         
assertThat(aggregateFunction.getResult().value()).isEqualTo(GenericRow.of(1, 2, 
13, 1, 1));
     }
 
+    @Test
+    void tesListAggFunc() {
+        Options options = new Options();
+        options.set("fields.a.aggregate-function", "listagg");
+        options.set("fields.b.aggregate-function", "listagg");
+        options.set("fields.b.list-agg-delimiter", "-");
+        options.set("fields.d.aggregate-function", "listagg");
+        options.set("fields.d.list-agg-delimiter", "/");
+
+        MergeFunction<KeyValue> aggregateFunction =
+                AggregateMergeFunction.factory(
+                                options,
+                                Arrays.asList("k", "a", "b", "c", "d"),
+                                Arrays.asList(
+                                        DataTypes.INT(),
+                                        DataTypes.STRING(),
+                                        DataTypes.STRING(),
+                                        DataTypes.INT(),
+                                        DataTypes.STRING()),
+                                Collections.singletonList("k"))
+                        .create();
+        aggregateFunction.reset();
+
+        aggregateFunction.add(
+                value(
+                        1,
+                        BinaryString.fromString("1"),
+                        BinaryString.fromString("1"),
+                        1,
+                        BinaryString.fromString("1")));
+        aggregateFunction.add(
+                value(
+                        1,
+                        BinaryString.fromString("2"),
+                        BinaryString.fromString("2"),
+                        2,
+                        BinaryString.fromString("2")));
+        aggregateFunction.add(
+                value(
+                        1,
+                        BinaryString.fromString("3"),
+                        BinaryString.fromString("3"),
+                        3,
+                        BinaryString.fromString("3")));
+        aggregateFunction.add(
+                value(
+                        1,
+                        BinaryString.fromString("4"),
+                        BinaryString.fromString("4"),
+                        4,
+                        BinaryString.fromString("4")));
+        aggregateFunction.add(
+                value(
+                        1,
+                        BinaryString.fromString("5"),
+                        BinaryString.fromString("5"),
+                        5,
+                        BinaryString.fromString("5")));
+        assertThat(aggregateFunction.getResult().value())
+                .isEqualTo(
+                        GenericRow.of(
+                                1,
+                                BinaryString.fromString("1,2,3,4,5"),
+                                BinaryString.fromString("1-2-3-4-5"),
+                                5,
+                                BinaryString.fromString("1/2/3/4/5")));
+    }
+
     private KeyValue value(Integer... values) {
         return new KeyValue()
                 .replace(GenericRow.of(values[0]), RowKind.INSERT, 
GenericRow.of(values));
     }
+
+    private KeyValue value(
+            Integer value1,
+            BinaryString value2,
+            BinaryString value3,
+            Integer value4,
+            BinaryString value5) {
+        return new KeyValue()
+                .replace(
+                        GenericRow.of(value1),
+                        RowKind.INSERT,
+                        GenericRow.of(value1, value2, value3, value4, value5));
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
index 3140ba5f1..7fae50622 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.mergetree.compact.aggregate;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericArray;
@@ -43,6 +44,8 @@ import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.RoaringBitmap32;
 import org.apache.paimon.utils.RoaringBitmap64;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
@@ -123,14 +126,30 @@ public class FieldAggregatorTest {
     }
 
     @Test
-    public void testFieldListaggAgg() {
-        FieldListaggAgg fieldListaggAgg = new FieldListaggAgg(new 
VarCharType());
+    public void testFieldListAggWithDefaultDelimiter() {
+        FieldListaggAgg fieldListaggAgg =
+                new FieldListaggAgg(
+                        new VarCharType(), new CoreOptions(new HashMap<>()), 
"fieldName");
         BinaryString accumulator = BinaryString.fromString("user1");
         BinaryString inputField = BinaryString.fromString("user2");
         assertThat(fieldListaggAgg.agg(accumulator, inputField).toString())
                 .isEqualTo("user1,user2");
     }
 
+    @Test
+    public void testFieldListAggWithCustomDelimiter() {
+        FieldListaggAgg fieldListaggAgg =
+                new FieldListaggAgg(
+                        new VarCharType(),
+                        CoreOptions.fromMap(
+                                
ImmutableMap.of("fields.fieldName.list-agg-delimiter", "-")),
+                        "fieldName");
+        BinaryString accumulator = BinaryString.fromString("user1");
+        BinaryString inputField = BinaryString.fromString("user2");
+        assertThat(fieldListaggAgg.agg(accumulator, inputField).toString())
+                .isEqualTo("user1-user2");
+    }
+
     @Test
     public void testFieldMaxAgg() {
         FieldMaxAgg fieldMaxAgg = new FieldMaxAgg(new IntType());

Reply via email to