This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 06b6e4e14e [GLUTEN-9518][FLINK] Support watermark assigner (#9541)
06b6e4e14e is described below

commit 06b6e4e14ecf2864253ec3796c1a7788ba0abecf
Author: shuai.xu <[email protected]>
AuthorDate: Mon May 12 21:21:53 2025 +0800

    [GLUTEN-9518][FLINK] Support watermark assigner (#9541)
---
 .../exec/stream/StreamExecWatermarkAssigner.java   | 31 +++++++++++-
 .../apache/gluten/rexnode/FunctionMappings.java    | 23 +++++----
 .../apache/gluten/rexnode/RexNodeConverter.java    |  9 ++--
 .../functions/DefaultFunctionConverter.java        | 37 ++++++++++++++
 .../rexnode/functions/FunctionConverter.java       | 29 +++++++++++
 .../functions/SubtractFunctionConverter.java       | 56 ++++++++++++++++++++++
 .../apache/gluten/util/LogicalTypeConverter.java   |  4 ++
 7 files changed, 174 insertions(+), 15 deletions(-)

diff --git 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
index dd88a84061..b02f74c2d4 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
@@ -38,7 +38,15 @@ import org.apache.flink.table.types.logical.RowType;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle;
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.plan.ProjectNode;
+import io.github.zhztheplayer.velox4j.plan.PlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.plan.WatermarkAssignerNode;
 import org.apache.calcite.rex.RexNode;
+import org.apache.gluten.rexnode.RexNodeConverter;
+import org.apache.gluten.rexnode.Utils;
 import org.apache.gluten.table.runtime.operators.GlutenSingleInputOperator;
 import org.apache.gluten.util.LogicalTypeConverter;
 import org.apache.gluten.util.PlanNodeIdGenerator;
@@ -119,12 +127,31 @@ public class StreamExecWatermarkAssigner extends 
ExecNodeBase<RowData>
         io.github.zhztheplayer.velox4j.type.RowType inputType =
                 (io.github.zhztheplayer.velox4j.type.RowType)
                         
LogicalTypeConverter.toVLType(inputEdge.getOutputType());
+        List<String> inNames = 
Utils.getNamesFromRowType(inputEdge.getOutputType());
+
+        TypedExpr watermarkExprs = RexNodeConverter.toTypedExpr(watermarkExpr, 
inNames);
         io.github.zhztheplayer.velox4j.type.RowType outputType =
                 (io.github.zhztheplayer.velox4j.type.RowType)
                         LogicalTypeConverter.toVLType(getOutputType());
-        // Watermark assigner has not been supported in gluten.
+        // This scan can be ignored, it's used only to make ProjectNode valid
+        PlanNode ignore = new TableScanNode(
+                PlanNodeIdGenerator.newId(),
+                outputType,
+                new NexmarkTableHandle("connector-nexmark"),
+                List.of());
+        ProjectNode project = new ProjectNode(
+                PlanNodeIdGenerator.newId(),
+                List.of(ignore),
+                List.of("TIMESTAMP"),
+                List.of(watermarkExprs));
+        PlanNode watermark = new WatermarkAssignerNode(
+                PlanNodeIdGenerator.newId(),
+                null,
+                project,
+                idleTimeout,
+                rowtimeFieldIndex);
         final GlutenSingleInputOperator watermarkOperator =
-                new GlutenSingleInputOperator(null, 
PlanNodeIdGenerator.newId(), inputType, outputType);
+                new GlutenSingleInputOperator(watermark, 
PlanNodeIdGenerator.newId(), inputType, outputType);
 
         return ExecNodeUtil.createOneInputTransformation(
                 inputTransform,
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/FunctionMappings.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/FunctionMappings.java
index 4bb4fb7e04..79915bb5e6 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/FunctionMappings.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/FunctionMappings.java
@@ -16,25 +16,32 @@
  */
 package org.apache.gluten.rexnode;
 
+import org.apache.gluten.rexnode.functions.DefaultFunctionConverter;
+import org.apache.gluten.rexnode.functions.FunctionConverter;
+import org.apache.gluten.rexnode.functions.SubtractFunctionConverter;
+
 import java.util.HashMap;
 import java.util.Map;
 
 /** Mapping of flink function and velox function. */
 public class FunctionMappings {
     // A map stores the relationship between flink function name and velox 
function.
-    private static Map<String, String> functionMappings = new HashMap() {
+    private static Map<String, FunctionConverter> functionMappings = new 
HashMap() {
         {
             // TODO: support more functions.
-            put(">", "greaterthan");
-            put("<", "lessthan");
-            put("=", "equalto");
-            put("CAST", "cast");
-            put("CASE", "if");
-            put("*", "multiply");
+            put(">", new DefaultFunctionConverter("greaterthan"));
+            put("<", new DefaultFunctionConverter("lessthan"));
+            put("=", new DefaultFunctionConverter("equalto"));
+            put("CAST", new DefaultFunctionConverter("cast"));
+            put("CASE", new DefaultFunctionConverter("if"));
+            put("*", new DefaultFunctionConverter("multiply"));
+            put("-", new SubtractFunctionConverter("subtract"));
+            put("MOD", new DefaultFunctionConverter("remainder"));
+            put("AND", new DefaultFunctionConverter("and"));
         }
     };
 
-    public static String toVeloxFunction(String funcName) {
+    public static FunctionConverter getFunctionConverter(String funcName) {
         if (functionMappings.containsKey(funcName)) {
             return functionMappings.get(funcName);
         } else {
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java
index 736e060838..43ce8839f2 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java
@@ -58,10 +58,8 @@ public class RexNodeConverter {
             RexCall rexCall = (RexCall) rexNode;
             List<TypedExpr> params = toTypedExpr(rexCall.getOperands(), 
inNames);
             Type nodeType = toType(rexCall.getType());
-            return new CallTypedExpr(
-                    nodeType,
-                    params,
-                    
FunctionMappings.toVeloxFunction(rexCall.getOperator().getName()));
+            return 
FunctionMappings.getFunctionConverter(rexCall.getOperator().getName())
+                    .toVeloxFunction(nodeType, params);
         } else if (rexNode instanceof RexInputRef) {
             RexInputRef inputRef = (RexInputRef) rexNode;
             return FieldAccessTypedExpr.create(
@@ -109,6 +107,8 @@ public class RexNodeConverter {
             case BINARY:
                 return new VarBinaryValue(literal.getValue().toString());
             case DECIMAL:
+            case INTERVAL_SECOND:
+                // interval is used as decimal.
                 // TODO: fix precision check
                 BigDecimal bigDecimal = literal.getValueAs(BigDecimal.class);
                 if (bigDecimal.precision() <= 18) {
@@ -121,5 +121,4 @@ public class RexNodeConverter {
                         "Unsupported rex node type: " + 
literal.getType().getSqlTypeName());
         }
     }
-
 }
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/DefaultFunctionConverter.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/DefaultFunctionConverter.java
new file mode 100644
index 0000000000..5f1d35bc38
--- /dev/null
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/DefaultFunctionConverter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.rexnode.functions;
+
+import io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.Type;
+
+import java.util.List;
+
+/** Default convertor for velox function. */
+public class DefaultFunctionConverter implements FunctionConverter {
+    private final String function;
+
+    public DefaultFunctionConverter(String function) {
+        this.function = function;
+    }
+
+    @Override
+    public CallTypedExpr toVeloxFunction(Type nodeType, List<TypedExpr> 
params) {
+        return new CallTypedExpr(nodeType, params, function);
+    }
+}
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/FunctionConverter.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/FunctionConverter.java
new file mode 100644
index 0000000000..7f29adfeb0
--- /dev/null
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/FunctionConverter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.rexnode.functions;
+
+import io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.Type;
+
+import java.util.List;
+
+/** Interface for converter to velox function. */
+public interface FunctionConverter {
+
+    CallTypedExpr toVeloxFunction(Type nodeType, List<TypedExpr> params);
+}
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SubtractFunctionConverter.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SubtractFunctionConverter.java
new file mode 100644
index 0000000000..483c5cf1ac
--- /dev/null
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SubtractFunctionConverter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.rexnode.functions;
+
+import io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.TimestampType;
+import io.github.zhztheplayer.velox4j.type.Type;
+
+import org.apache.flink.util.Preconditions;
+import java.util.List;
+
+/** Subtract function converter. */
+public class SubtractFunctionConverter implements FunctionConverter {
+    private final String function;
+
+    public SubtractFunctionConverter(String function) {
+        this.function = function;
+    }
+
+    @Override
+    public CallTypedExpr toVeloxFunction(Type nodeType, List<TypedExpr> 
params) {
+        Preconditions.checkNotNull(params.size() == 2, "Subtract must contain 
exactly two parameters");
+
+        // TODO: need refine for more type cast
+        if (params.get(0).getReturnType().getClass() == TimestampType.class &&
+                params.get(1).getReturnType().getClass() == BigIntType.class) {
+            // hardcode here for next mark watermark whose param1 is Timestamp 
and 2 is BigInt.
+            Type newType = new BigIntType();
+            TypedExpr param0 = new CallTypedExpr(
+                    newType,
+                    List.of(params.get(0)),
+                    "cast");
+            return new CallTypedExpr(
+                    newType,
+                    List.of(param0, params.get(1)),
+                    function);
+        }
+        return new CallTypedExpr(nodeType, params, function);
+    }
+}
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
index 04891d3216..e2a1ccc2e1 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
@@ -20,6 +20,7 @@ import io.github.zhztheplayer.velox4j.type.IntegerType;
 import io.github.zhztheplayer.velox4j.type.Type;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -58,6 +59,9 @@ public class LogicalTypeConverter {
             return new io.github.zhztheplayer.velox4j.type.DecimalType(
                     decimalType.getPrecision(),
                     decimalType.getScale());
+        } else if (logicalType instanceof DayTimeIntervalType) {
+            // TODO: it seems interval now can be used as bigint for nexmark.
+            return new io.github.zhztheplayer.velox4j.type.BigIntType();
         } else {
             throw new RuntimeException("Unsupported logical type: " + 
logicalType);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to