This is an automated email from the ASF dual-hosted git repository.
shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 053427fabc [GLUTEN-10056][FLINK] Support function split_index (#10057)
053427fabc is described below
commit 053427fabc38dc732f9b7cd2ef537c33b7359a85
Author: kevinyhzou <[email protected]>
AuthorDate: Wed Sep 10 18:34:04 2025 +0800
[GLUTEN-10056][FLINK] Support function split_index (#10057)
* support function split_index
---
.github/workflows/flink.yml | 2 +-
gluten-flink/docs/Flink.md | 2 +-
.../java/org/apache/gluten/rexnode/TypeUtils.java | 4 ++
.../rexnode/functions/RexCallConverterFactory.java | 1 +
.../functions/SplitIndexRexCallConverter.java | 70 ++++++++++++++++++++++
.../runtime/stream/custom/ScalarFunctionsTest.java | 36 +++++++++++
6 files changed, 113 insertions(+), 2 deletions(-)
diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index 0026e6669c..c202d31d3d 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -59,7 +59,7 @@ jobs:
source /opt/rh/gcc-toolset-11/enable
sudo dnf install -y patchelf
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
- cd velox4j && git reset --hard
ea2ca5755ae91a8703717a85b77f9eb1620899de
+ cd velox4j && git reset --hard
0180528e9b98fad22bc9da8a3864d2929ef73eec
git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index 29b16d0a9b..6f98d3e143 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you
have to use the follow
## fetch velox4j code
git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
cd velox4j
-git reset --hard ea2ca5755ae91a8703717a85b77f9eb1620899de
+git reset --hard 0180528e9b98fad22bc9da8a3864d2929ef73eec
mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
```
**Get gluten**
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/TypeUtils.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/TypeUtils.java
index 0d21f08939..156174129d 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/TypeUtils.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/TypeUtils.java
@@ -50,6 +50,10 @@ public class TypeUtils {
return type instanceof VarCharType;
}
+ public static boolean isIntegerType(Type type) {
+ return type instanceof IntegerType;
+ }
+
public static List<TypedExpr> promoteTypeForArithmeticExpressions(
TypedExpr leftExpr, TypedExpr rightExpr) {
Type leftType = leftExpr.getReturnType();
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
index 458ff3cb1b..0d7ddb96d8 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
@@ -78,6 +78,7 @@ public class RexCallConverterFactory {
Map.entry("CAST", Arrays.asList(() -> new
DefaultRexCallConverter("cast"))),
Map.entry("CASE", Arrays.asList(() -> new
DefaultRexCallConverter("if"))),
Map.entry("AND", Arrays.asList(() -> new
DefaultRexCallConverter("and"))),
+ Map.entry("SPLIT_INDEX", Arrays.asList(() -> new
SplitIndexRexCallConverter())),
Map.entry("SEARCH", Arrays.asList(() -> new
DefaultRexCallConverter("in"))),
Map.entry(
">=",
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SplitIndexRexCallConverter.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SplitIndexRexCallConverter.java
new file mode 100644
index 0000000000..ab90e31e5f
--- /dev/null
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SplitIndexRexCallConverter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.rexnode.functions;
+
+import org.apache.gluten.rexnode.RexConversionContext;
+import org.apache.gluten.rexnode.RexNodeConverter;
+import org.apache.gluten.rexnode.TypeUtils;
+
+import io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.ConstantTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.Type;
+import io.github.zhztheplayer.velox4j.type.VarCharType;
+import io.github.zhztheplayer.velox4j.variant.BigIntValue;
+import io.github.zhztheplayer.velox4j.variant.IntegerValue;
+import io.github.zhztheplayer.velox4j.variant.VarCharValue;
+
+import org.apache.calcite.rex.RexCall;
+
+import java.util.List;
+
+public class SplitIndexRexCallConverter extends BaseRexCallConverter {
+
+ private static final String FUNCTION_NAME = "split_index";
+
+ public SplitIndexRexCallConverter() {
+ super(FUNCTION_NAME);
+ }
+
+ @Override
+ public TypedExpr toTypedExpr(RexCall callNode, RexConversionContext context)
{
+ List<TypedExpr> params = getParams(callNode, context);
+ ConstantTypedExpr delimiterExpr = (ConstantTypedExpr) params.get(1);
+ ConstantTypedExpr indexExpr = (ConstantTypedExpr) params.get(2);
+ // The delimiter may be input by its ascii, so here we need to judge it
and convert it
+ // from ascii to string.
+ if (TypeUtils.isIntegerType(delimiterExpr.getReturnType())) {
+ IntegerValue intValue = (IntegerValue) delimiterExpr.getValue();
+ int val = intValue.getValue();
+ VarCharValue varcharValue = new VarCharValue(Character.toString((char)
val));
+ delimiterExpr = new ConstantTypedExpr(new VarCharType(), varcharValue,
null);
+ params.set(1, delimiterExpr);
+ }
+ // The `Index` parameter start from 0, and it is `Int` type in flink;
while it start from 1,
+ // and it is `BigInt(int64_t)` type in velox. So we need convert the
`Index` parameter here.
+ if (TypeUtils.isIntegerType(indexExpr.getReturnType())) {
+ IntegerValue intValue = (IntegerValue) indexExpr.getValue();
+ BigIntValue bigintValue = new BigIntValue(intValue.getValue() + 1);
+ indexExpr = new ConstantTypedExpr(new BigIntType(), bigintValue, null);
+ params.set(2, indexExpr);
+ }
+ Type resultType = RexNodeConverter.toType(callNode.getType());
+ return new CallTypedExpr(resultType, params, functionName);
+ }
+}
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
index 2b544bb6f0..ff99550b81 100644
---
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
@@ -129,6 +129,42 @@ class ScalarFunctionsTest extends GlutenStreamingTestBase {
runAndCheck(query4, Arrays.asList("+I[false]", "+I[true]", "+I[false]"));
}
+ @Test
+ void testSplitIndex() {
+ List<Row> rows =
+ Arrays.asList(
+ Row.of(1, 1L, "http://testflink/a/b/c"),
+ Row.of(2, 2L, "http://testflink/a1/b1/c1"),
+ Row.of(3, 3L, "http://testflink/a2/b2/c2"));
+ createSimpleBoundedValuesTable("tblSplitIndex", "a int, b bigint, c
string", rows);
+ String query1 = "select split_index(c, '/', 2) from tblSplitIndex";
+ runAndCheck(query1, Arrays.asList("+I[testflink]", "+I[testflink]",
"+I[testflink]"));
+ String query2 = "select split_index(c, '//', 1) from tblSplitIndex";
+ runAndCheck(
+ query2,
+ Arrays.asList("+I[testflink/a/b/c]", "+I[testflink/a1/b1/c1]",
"+I[testflink/a2/b2/c2]"));
+ // Add some corner case tests from `ScalarFunctionsTest`#testSplitIndex in
flink.
+ rows = Arrays.asList(Row.of(1, 1L, "AQIDBA=="));
+ createSimpleBoundedValuesTable("tblSplitIndexFlink", "a int, b bigint, c
string", rows);
+ String queryForInvalidIndex =
+ "select split_index(c, 'I', 7), split_index(c, 'I', -1) from
tblSplitIndexFlink";
+ String queryForNumbericDelimiter =
+ "select split_index(c, 73, 0), split_index(c, 12, 0) from
tblSplitIndexFlink";
+ runAndCheck(queryForInvalidIndex, Arrays.asList("+I[null, null]"));
+ runAndCheck(queryForNumbericDelimiter, Arrays.asList("+I[AQ, AQIDBA==]"));
+ rows = Arrays.asList(Row.of(2, 2L, null));
+ createSimpleBoundedValuesTable("tblSplitIndexNullInput", "a int, b bigint,
c string", rows);
+ String queryForNullInput = "select split_index(c, 'I', 0) from
tblSplitIndexNullInput";
+ runAndCheck(queryForNullInput, Arrays.asList("+I[null]"));
+ // TODO: The cases when index or delimeter parameters is null can not be
supported currently.
+ // String queryForIndexNull = "select split_index(c, 'I', cast(null as
INT)) from
+ // tblSplitIndexFlink";
+ // runAndCheck(queryForIndexNull, Arrays.asList("+I[null]"));
+ // String queryForDelimiterNull = "select split_index(c, cast(null as
VARCHAR), 0) from
+ // tblSplitIndexFlink";
+ // runAndCheck(queryForDelimiterNull, Arrays.asList("+I[null]"));
+ }
+
@Disabled
@Test
void testReinterpret() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]