This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a9cd48b24c [core] Adjust distinct option behavior in FieldListaggAgg 
operator (#6582)
a9cd48b24c is described below

commit a9cd48b24cb0832a371a34ee33bb218ef4f94651
Author: HOKNANG_LO <[email protected]>
AuthorDate: Tue Nov 25 15:14:38 2025 +0800

    [core] Adjust distinct option behavior in FieldListaggAgg operator (#6582)
---
 .../primary-key-table/merge-engine/aggregation.md  |   1 +
 .../org/apache/paimon/utils/BinaryStringUtils.java |  60 +++++++++
 .../compact/aggregate/FieldListaggAgg.java         |  42 +++++-
 .../compact/aggregate/FieldAggregatorTest.java     | 147 +++++++++++++++++++++
 4 files changed, 249 insertions(+), 1 deletion(-)

diff --git a/docs/content/primary-key-table/merge-engine/aggregation.md 
b/docs/content/primary-key-table/merge-engine/aggregation.md
index a575e48738..ed845eebfb 100644
--- a/docs/content/primary-key-table/merge-engine/aggregation.md
+++ b/docs/content/primary-key-table/merge-engine/aggregation.md
@@ -100,6 +100,7 @@ Current supported aggregate functions and data types are:
   The listagg function concatenates multiple string values into a single 
string.
   It supports STRING data type.
   Each field not part of the primary keys can be given a list agg delimiter, 
specified by the fields.<field-name>.list-agg-delimiter table property, 
otherwise it will use "," as default.
+  You can use `fields.<field-name>.distinct=true` to deduplicate values split 
by the `fields.<field-name>.list-agg-delimiter`.
 
 ### bool_and
   The bool_and function evaluates whether all values in a boolean set are true.
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java
index ae306c3764..74d1c726bf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java
@@ -20,11 +20,13 @@ package org.apache.paimon.utils;
 
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.memory.MemorySegmentUtils;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeChecks;
 
 import java.time.DateTimeException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.TimeZone;
@@ -419,4 +421,62 @@ public class BinaryStringUtils {
         }
         return BinaryString.fromBytes(result);
     }
+
+    public static BinaryString[] splitByWholeSeparatorPreserveAllTokens(
+            BinaryString str, BinaryString delimiter) {
+        int sizeInBytes = str.getSizeInBytes();
+        MemorySegment[] segments = str.getSegments();
+        int offset = str.getOffset();
+
+        if (sizeInBytes == 0) {
+            return EMPTY_STRING_ARRAY;
+        }
+
+        if (delimiter == null || BinaryString.EMPTY_UTF8.equals(delimiter)) {
+            // Split on whitespace.
+            return splitByWholeSeparatorPreserveAllTokens(str, fromString(" 
"));
+        }
+
+        int sepSize = delimiter.getSizeInBytes();
+        MemorySegment[] sepSegs = delimiter.getSegments();
+        int sepOffset = delimiter.getOffset();
+
+        final ArrayList<BinaryString> substrings = new ArrayList<>();
+        int beg = 0;
+        int end = 0;
+        while (end < sizeInBytes) {
+            end =
+                    MemorySegmentUtils.find(
+                                    segments,
+                                    offset + beg,
+                                    sizeInBytes - beg,
+                                    sepSegs,
+                                    sepOffset,
+                                    sepSize)
+                            - offset;
+
+            if (end > -1) {
+                if (end > beg) {
+
+                    // The following is OK, because String.substring( beg, end 
) excludes
+                    // the character at the position 'end'.
+                    substrings.add(BinaryString.fromAddress(segments, offset + 
beg, end - beg));
+
+                    // Set the starting point for the next search.
+                    // The following is equivalent to beg = end + 
(separatorLength - 1) + 1,
+                    // which is the right calculation:
+                } else {
+                    // We found a consecutive occurrence of the separator.
+                    substrings.add(BinaryString.EMPTY_UTF8);
+                }
+                beg = end + sepSize;
+            } else {
+                // String.substring( beg ) goes from 'beg' to the end of the 
String.
+                substrings.add(BinaryString.fromAddress(segments, offset + 
beg, sizeInBytes - beg));
+                end = sizeInBytes;
+            }
+        }
+
+        return substrings.toArray(new BinaryString[0]);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
index 239e40bfac..def24e3ed6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
@@ -23,6 +23,11 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.BinaryStringUtils;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import java.util.Arrays;
+import java.util.List;
+
 /** listagg aggregate a field of a row. */
 public class FieldListaggAgg extends FieldAggregator {
 
@@ -49,10 +54,45 @@ public class FieldListaggAgg extends FieldAggregator {
         BinaryString mergeFieldSD = (BinaryString) accumulator;
         BinaryString inFieldSD = (BinaryString) inputField;
 
-        if (distinct && inFieldSD.getSizeInBytes() > 0 && 
mergeFieldSD.contains(inFieldSD)) {
+        if (inFieldSD.getSizeInBytes() <= 0) {
             return mergeFieldSD;
         }
 
+        if (mergeFieldSD.getSizeInBytes() <= 0) {
+            return inFieldSD;
+        }
+
+        if (distinct) {
+            BinaryString delimiterBinaryString = 
BinaryString.fromString(delimiter);
+            BinaryString[] binaryStrings =
+                    BinaryStringUtils.splitByWholeSeparatorPreserveAllTokens(
+                            inFieldSD, delimiterBinaryString);
+
+            List<BinaryString> concatItems =
+                    Arrays.stream(binaryStrings)
+                            .filter(it -> it.getSizeInBytes() > 0 && 
!mergeFieldSD.contains(it))
+                            .collect(
+                                    () -> Lists.newArrayList(mergeFieldSD),
+                                    (acc, r) -> {
+                                        if (!acc.isEmpty()) {
+                                            acc.add(delimiterBinaryString);
+                                        }
+                                        acc.add(r);
+                                    },
+                                    (l, r) -> {
+                                        if (!l.isEmpty() && !r.isEmpty()) {
+                                            l.add(delimiterBinaryString);
+                                        }
+                                        l.addAll(r);
+                                    });
+
+            if (concatItems.size() == 1) {
+                return concatItems.get(0);
+            }
+
+            return BinaryStringUtils.concat(concatItems);
+        }
+
         return BinaryStringUtils.concat(
                 mergeFieldSD, BinaryString.fromString(delimiter), inFieldSD);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
index 1c8506c25c..0b838c3609 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -187,6 +187,153 @@ public class FieldAggregatorTest {
         assertEquals("user1,user2,user3", result.toString());
     }
 
+    @Test
+    public void testFieldListAggWithCustomDelimiterAndEmptyStrings() {
+        FieldListaggAgg fieldListaggAgg =
+                new FieldListaggAggFactory()
+                        .create(
+                                new VarCharType(),
+                                CoreOptions.fromMap(
+                                        ImmutableMap.of(
+                                                "fields.fieldName.distinct",
+                                                "true",
+                                                
"fields.fieldName.list-agg-delimiter",
+                                                ";")),
+                                "fieldName");
+
+        BinaryString result =
+                Stream.of(BinaryString.fromString(""), 
BinaryString.fromString(""))
+                        .sequential()
+                        .reduce((l, r) -> (BinaryString) 
fieldListaggAgg.agg(l, r))
+                        .orElse(null);
+
+        assertNotNull(result);
+        assertEquals("", result.toString());
+    }
+
+    @Test
+    public void testFieldListAggWithDefaultDelimiterAndDistinctWithMultiUser() 
{
+        FieldListaggAgg fieldListaggAgg =
+                new FieldListaggAggFactory()
+                        .create(
+                                new VarCharType(),
+                                CoreOptions.fromMap(
+                                        
ImmutableMap.of("fields.fieldName.distinct", "true")),
+                                "fieldName");
+
+        BinaryString result =
+                Stream.of(
+                                BinaryString.fromString("user1"),
+                                BinaryString.fromString("user2"),
+                                BinaryString.fromString("user1,user3"))
+                        .sequential()
+                        .reduce((l, r) -> (BinaryString) 
fieldListaggAgg.agg(l, r))
+                        .orElse(null);
+
+        assertNotNull(result);
+        assertEquals("user1,user2,user3", result.toString());
+    }
+
+    @Test
+    public void 
testFieldListAggWithDefaultDelimiterAndDistinctWithEmptyLeftUser() {
+        FieldListaggAgg fieldListaggAgg =
+                new FieldListaggAggFactory()
+                        .create(
+                                new VarCharType(),
+                                CoreOptions.fromMap(
+                                        
ImmutableMap.of("fields.fieldName.distinct", "true")),
+                                "fieldName");
+
+        BinaryString result =
+                Stream.of(
+                                BinaryString.fromString(""),
+                                BinaryString.fromString("user2"),
+                                BinaryString.fromString("user1,user3"))
+                        .sequential()
+                        .reduce((l, r) -> (BinaryString) 
fieldListaggAgg.agg(l, r))
+                        .orElse(null);
+
+        assertNotNull(result);
+        assertEquals("user2,user1,user3", result.toString());
+    }
+
+    @Test
+    public void 
testFieldListAggWithCustomDelimiterAndDistinctWithMultiKvString() {
+        FieldListaggAgg fieldListaggAgg =
+                new FieldListaggAggFactory()
+                        .create(
+                                new VarCharType(),
+                                CoreOptions.fromMap(
+                                        ImmutableMap.of(
+                                                "fields.fieldName.distinct",
+                                                "true",
+                                                
"fields.fieldName.list-agg-delimiter",
+                                                ";")),
+                                "fieldName");
+
+        BinaryString result =
+                Stream.of(
+                                BinaryString.fromString("k1=v1;k2=v2"),
+                                BinaryString.fromString("k1=v1;k3=v3"),
+                                BinaryString.fromString(""))
+                        .sequential()
+                        .reduce((l, r) -> (BinaryString) 
fieldListaggAgg.agg(l, r))
+                        .orElse(null);
+
+        assertNotNull(result);
+        assertEquals("k1=v1;k2=v2;k3=v3", result.toString());
+    }
+
+    @Test
+    public void 
testFieldListAggWithCustomDelimiterDistinctMultiKvStringWithWhiteSpace() {
+        FieldListaggAgg fieldListaggAgg =
+                new FieldListaggAggFactory()
+                        .create(
+                                new VarCharType(),
+                                CoreOptions.fromMap(
+                                        ImmutableMap.of(
+                                                "fields.fieldName.distinct",
+                                                "true",
+                                                
"fields.fieldName.list-agg-delimiter",
+                                                " ")),
+                                "fieldName");
+
+        BinaryString result =
+                Stream.of(
+                                BinaryString.fromString("k1=v1 k2=v2"),
+                                BinaryString.fromString(" k1=v1  k3=v3"),
+                                BinaryString.fromString(" "))
+                        .sequential()
+                        .reduce((l, r) -> (BinaryString) 
fieldListaggAgg.agg(l, r))
+                        .orElse(null);
+
+        assertNotNull(result);
+        assertEquals("k1=v1 k2=v2 k3=v3", result.toString());
+    }
+
+    @Test
+    public void 
testFieldListAggWithDefaultDelimiterAndDistinctWithMultiDuplicatedKvString() {
+        FieldListaggAgg fieldListaggAgg =
+                new FieldListaggAggFactory()
+                        .create(
+                                new VarCharType(),
+                                CoreOptions.fromMap(
+                                        
ImmutableMap.of("fields.fieldName.distinct", "true")),
+                                "fieldName");
+
+        BinaryString result =
+                Stream.of(
+                                BinaryString.fromString("k1=v1,k2=v2"),
+                                BinaryString.fromString("k1=v1,k2=v3"),
+                                BinaryString.fromString(""))
+                        .sequential()
+                        .reduce((l, r) -> (BinaryString) 
fieldListaggAgg.agg(l, r))
+                        .orElse(null);
+
+        assertNotNull(result);
+        assertEquals("k1=v1,k2=v2,k2=v3", result.toString());
+    }
+
     @Test
     public void testFieldListAggWithCustomDelimiter() {
         FieldListaggAgg fieldListaggAgg =

Reply via email to