This is an automated email from the ASF dual-hosted git repository.

shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 053427fabc [GLUTEN-10056][FLINK] Support function split_index (#10057)
053427fabc is described below

commit 053427fabc38dc732f9b7cd2ef537c33b7359a85
Author: kevinyhzou <[email protected]>
AuthorDate: Wed Sep 10 18:34:04 2025 +0800

    [GLUTEN-10056][FLINK] Support function split_index (#10057)
    
    * support function split_index
---
 .github/workflows/flink.yml                        |  2 +-
 gluten-flink/docs/Flink.md                         |  2 +-
 .../java/org/apache/gluten/rexnode/TypeUtils.java  |  4 ++
 .../rexnode/functions/RexCallConverterFactory.java |  1 +
 .../functions/SplitIndexRexCallConverter.java      | 70 ++++++++++++++++++++++
 .../runtime/stream/custom/ScalarFunctionsTest.java | 36 +++++++++++
 6 files changed, 113 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index 0026e6669c..c202d31d3d 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -59,7 +59,7 @@ jobs:
           source /opt/rh/gcc-toolset-11/enable
           sudo dnf install -y patchelf
           git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
-          cd velox4j && git reset --hard 
ea2ca5755ae91a8703717a85b77f9eb1620899de
+          cd velox4j && git reset --hard 
0180528e9b98fad22bc9da8a3864d2929ef73eec
           git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
           mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
           cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index 29b16d0a9b..6f98d3e143 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you 
have to use the follow
 ## fetch velox4j code
 git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
 cd velox4j
-git reset --hard ea2ca5755ae91a8703717a85b77f9eb1620899de
+git reset --hard 0180528e9b98fad22bc9da8a3864d2929ef73eec
 mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
 ```
 **Get gluten**
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/TypeUtils.java 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/TypeUtils.java
index 0d21f08939..156174129d 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/TypeUtils.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/TypeUtils.java
@@ -50,6 +50,10 @@ public class TypeUtils {
     return type instanceof VarCharType;
   }
 
+  public static boolean isIntegerType(Type type) {
+    return type instanceof IntegerType;
+  }
+
   public static List<TypedExpr> promoteTypeForArithmeticExpressions(
       TypedExpr leftExpr, TypedExpr rightExpr) {
     Type leftType = leftExpr.getReturnType();
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
index 458ff3cb1b..0d7ddb96d8 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
@@ -78,6 +78,7 @@ public class RexCallConverterFactory {
           Map.entry("CAST", Arrays.asList(() -> new 
DefaultRexCallConverter("cast"))),
           Map.entry("CASE", Arrays.asList(() -> new 
DefaultRexCallConverter("if"))),
           Map.entry("AND", Arrays.asList(() -> new 
DefaultRexCallConverter("and"))),
+          Map.entry("SPLIT_INDEX", Arrays.asList(() -> new 
SplitIndexRexCallConverter())),
           Map.entry("SEARCH", Arrays.asList(() -> new 
DefaultRexCallConverter("in"))),
           Map.entry(
               ">=",
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SplitIndexRexCallConverter.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SplitIndexRexCallConverter.java
new file mode 100644
index 0000000000..ab90e31e5f
--- /dev/null
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SplitIndexRexCallConverter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.rexnode.functions;
+
+import org.apache.gluten.rexnode.RexConversionContext;
+import org.apache.gluten.rexnode.RexNodeConverter;
+import org.apache.gluten.rexnode.TypeUtils;
+
+import io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.ConstantTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.Type;
+import io.github.zhztheplayer.velox4j.type.VarCharType;
+import io.github.zhztheplayer.velox4j.variant.BigIntValue;
+import io.github.zhztheplayer.velox4j.variant.IntegerValue;
+import io.github.zhztheplayer.velox4j.variant.VarCharValue;
+
+import org.apache.calcite.rex.RexCall;
+
+import java.util.List;
+
+public class SplitIndexRexCallConverter extends BaseRexCallConverter {
+
+  private static final String FUNCTION_NAME = "split_index";
+
+  public SplitIndexRexCallConverter() {
+    super(FUNCTION_NAME);
+  }
+
+  @Override
+  public TypedExpr toTypedExpr(RexCall callNode, RexConversionContext context) 
{
+    List<TypedExpr> params = getParams(callNode, context);
+    ConstantTypedExpr delimiterExpr = (ConstantTypedExpr) params.get(1);
+    ConstantTypedExpr indexExpr = (ConstantTypedExpr) params.get(2);
+    // The delimiter may be input by its ascii, so here we need to judge it 
and convert it
+    // from ascii to string.
+    if (TypeUtils.isIntegerType(delimiterExpr.getReturnType())) {
+      IntegerValue intValue = (IntegerValue) delimiterExpr.getValue();
+      int val = intValue.getValue();
+      VarCharValue varcharValue = new VarCharValue(Character.toString((char) 
val));
+      delimiterExpr = new ConstantTypedExpr(new VarCharType(), varcharValue, 
null);
+      params.set(1, delimiterExpr);
+    }
+    // The `Index` parameter start from 0, and it is `Int` type in flink; 
while it start from 1,
+    // and it is `BigInt(int64_t)` type in velox. So we need convert the 
`Index` parameter here.
+    if (TypeUtils.isIntegerType(indexExpr.getReturnType())) {
+      IntegerValue intValue = (IntegerValue) indexExpr.getValue();
+      BigIntValue bigintValue = new BigIntValue(intValue.getValue() + 1);
+      indexExpr = new ConstantTypedExpr(new BigIntType(), bigintValue, null);
+      params.set(2, indexExpr);
+    }
+    Type resultType = RexNodeConverter.toType(callNode.getType());
+    return new CallTypedExpr(resultType, params, functionName);
+  }
+}
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
index 2b544bb6f0..ff99550b81 100644
--- 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
@@ -129,6 +129,42 @@ class ScalarFunctionsTest extends GlutenStreamingTestBase {
     runAndCheck(query4, Arrays.asList("+I[false]", "+I[true]", "+I[false]"));
   }
 
+  @Test
+  void testSplitIndex() {
+    List<Row> rows =
+        Arrays.asList(
+            Row.of(1, 1L, "http://testflink/a/b/c";),
+            Row.of(2, 2L, "http://testflink/a1/b1/c1";),
+            Row.of(3, 3L, "http://testflink/a2/b2/c2";));
+    createSimpleBoundedValuesTable("tblSplitIndex", "a int, b bigint, c 
string", rows);
+    String query1 = "select split_index(c, '/', 2) from tblSplitIndex";
+    runAndCheck(query1, Arrays.asList("+I[testflink]", "+I[testflink]", 
"+I[testflink]"));
+    String query2 = "select split_index(c, '//', 1) from tblSplitIndex";
+    runAndCheck(
+        query2,
+        Arrays.asList("+I[testflink/a/b/c]", "+I[testflink/a1/b1/c1]", 
"+I[testflink/a2/b2/c2]"));
+    // Add some corner case tests from `ScalarFunctionsTest`#testSplitIndex in 
flink.
+    rows = Arrays.asList(Row.of(1, 1L, "AQIDBA=="));
+    createSimpleBoundedValuesTable("tblSplitIndexFlink", "a int, b bigint, c 
string", rows);
+    String queryForInvalidIndex =
+        "select split_index(c, 'I', 7), split_index(c, 'I', -1) from 
tblSplitIndexFlink";
+    String queryForNumbericDelimiter =
+        "select split_index(c, 73, 0), split_index(c, 12, 0) from 
tblSplitIndexFlink";
+    runAndCheck(queryForInvalidIndex, Arrays.asList("+I[null, null]"));
+    runAndCheck(queryForNumbericDelimiter, Arrays.asList("+I[AQ, AQIDBA==]"));
+    rows = Arrays.asList(Row.of(2, 2L, null));
+    createSimpleBoundedValuesTable("tblSplitIndexNullInput", "a int, b bigint, 
c string", rows);
+    String queryForNullInput = "select split_index(c, 'I', 0) from 
tblSplitIndexNullInput";
+    runAndCheck(queryForNullInput, Arrays.asList("+I[null]"));
+    // TODO: The cases when index or delimeter parameters is null can not be 
supported currently.
+    // String queryForIndexNull = "select split_index(c, 'I', cast(null as 
INT)) from
+    // tblSplitIndexFlink";
+    // runAndCheck(queryForIndexNull, Arrays.asList("+I[null]"));
+    // String queryForDelimiterNull = "select split_index(c, cast(null as 
VARCHAR), 0) from
+    // tblSplitIndexFlink";
+    // runAndCheck(queryForDelimiterNull, Arrays.asList("+I[null]"));
+  }
+
   @Disabled
   @Test
   void testReinterpret() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to