This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 06b6e4e14e [GLUTEN-9518][FLINK] Support watermark assigner (#9541)
06b6e4e14e is described below
commit 06b6e4e14ecf2864253ec3796c1a7788ba0abecf
Author: shuai.xu <[email protected]>
AuthorDate: Mon May 12 21:21:53 2025 +0800
[GLUTEN-9518][FLINK] Support watermark assigner (#9541)
---
.../exec/stream/StreamExecWatermarkAssigner.java | 31 +++++++++++-
.../apache/gluten/rexnode/FunctionMappings.java | 23 +++++----
.../apache/gluten/rexnode/RexNodeConverter.java | 9 ++--
.../functions/DefaultFunctionConverter.java | 37 ++++++++++++++
.../rexnode/functions/FunctionConverter.java | 29 +++++++++++
.../functions/SubtractFunctionConverter.java | 56 ++++++++++++++++++++++
.../apache/gluten/util/LogicalTypeConverter.java | 4 ++
7 files changed, 174 insertions(+), 15 deletions(-)
diff --git
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
index dd88a84061..b02f74c2d4 100644
---
a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
+++
b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWatermarkAssigner.java
@@ -38,7 +38,15 @@ import org.apache.flink.table.types.logical.RowType;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import io.github.zhztheplayer.velox4j.connector.NexmarkTableHandle;
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.plan.ProjectNode;
+import io.github.zhztheplayer.velox4j.plan.PlanNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.plan.WatermarkAssignerNode;
import org.apache.calcite.rex.RexNode;
+import org.apache.gluten.rexnode.RexNodeConverter;
+import org.apache.gluten.rexnode.Utils;
import org.apache.gluten.table.runtime.operators.GlutenSingleInputOperator;
import org.apache.gluten.util.LogicalTypeConverter;
import org.apache.gluten.util.PlanNodeIdGenerator;
@@ -119,12 +127,31 @@ public class StreamExecWatermarkAssigner extends
ExecNodeBase<RowData>
io.github.zhztheplayer.velox4j.type.RowType inputType =
(io.github.zhztheplayer.velox4j.type.RowType)
LogicalTypeConverter.toVLType(inputEdge.getOutputType());
+ List<String> inNames =
Utils.getNamesFromRowType(inputEdge.getOutputType());
+
+ TypedExpr watermarkExprs = RexNodeConverter.toTypedExpr(watermarkExpr,
inNames);
io.github.zhztheplayer.velox4j.type.RowType outputType =
(io.github.zhztheplayer.velox4j.type.RowType)
LogicalTypeConverter.toVLType(getOutputType());
- // Watermark assigner has not been supported in gluten.
+ // This scan can be ignored, it's used only to make ProjectNode valid
+ PlanNode ignore = new TableScanNode(
+ PlanNodeIdGenerator.newId(),
+ outputType,
+ new NexmarkTableHandle("connector-nexmark"),
+ List.of());
+ ProjectNode project = new ProjectNode(
+ PlanNodeIdGenerator.newId(),
+ List.of(ignore),
+ List.of("TIMESTAMP"),
+ List.of(watermarkExprs));
+ PlanNode watermark = new WatermarkAssignerNode(
+ PlanNodeIdGenerator.newId(),
+ null,
+ project,
+ idleTimeout,
+ rowtimeFieldIndex);
final GlutenSingleInputOperator watermarkOperator =
- new GlutenSingleInputOperator(null,
PlanNodeIdGenerator.newId(), inputType, outputType);
+ new GlutenSingleInputOperator(watermark,
PlanNodeIdGenerator.newId(), inputType, outputType);
return ExecNodeUtil.createOneInputTransformation(
inputTransform,
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/FunctionMappings.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/FunctionMappings.java
index 4bb4fb7e04..79915bb5e6 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/FunctionMappings.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/FunctionMappings.java
@@ -16,25 +16,32 @@
*/
package org.apache.gluten.rexnode;
+import org.apache.gluten.rexnode.functions.DefaultFunctionConverter;
+import org.apache.gluten.rexnode.functions.FunctionConverter;
+import org.apache.gluten.rexnode.functions.SubtractFunctionConverter;
+
import java.util.HashMap;
import java.util.Map;
/** Mapping of flink function and velox function. */
public class FunctionMappings {
// A map stores the relationship between flink function name and velox
function.
- private static Map<String, String> functionMappings = new HashMap() {
+ private static Map<String, FunctionConverter> functionMappings = new
HashMap() {
{
// TODO: support more functions.
- put(">", "greaterthan");
- put("<", "lessthan");
- put("=", "equalto");
- put("CAST", "cast");
- put("CASE", "if");
- put("*", "multiply");
+ put(">", new DefaultFunctionConverter("greaterthan"));
+ put("<", new DefaultFunctionConverter("lessthan"));
+ put("=", new DefaultFunctionConverter("equalto"));
+ put("CAST", new DefaultFunctionConverter("cast"));
+ put("CASE", new DefaultFunctionConverter("if"));
+ put("*", new DefaultFunctionConverter("multiply"));
+ put("-", new SubtractFunctionConverter("subtract"));
+ put("MOD", new DefaultFunctionConverter("remainder"));
+ put("AND", new DefaultFunctionConverter("and"));
}
};
- public static String toVeloxFunction(String funcName) {
+ public static FunctionConverter getFunctionConverter(String funcName) {
if (functionMappings.containsKey(funcName)) {
return functionMappings.get(funcName);
} else {
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java
index 736e060838..43ce8839f2 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java
@@ -58,10 +58,8 @@ public class RexNodeConverter {
RexCall rexCall = (RexCall) rexNode;
List<TypedExpr> params = toTypedExpr(rexCall.getOperands(),
inNames);
Type nodeType = toType(rexCall.getType());
- return new CallTypedExpr(
- nodeType,
- params,
-
FunctionMappings.toVeloxFunction(rexCall.getOperator().getName()));
+ return
FunctionMappings.getFunctionConverter(rexCall.getOperator().getName())
+ .toVeloxFunction(nodeType, params);
} else if (rexNode instanceof RexInputRef) {
RexInputRef inputRef = (RexInputRef) rexNode;
return FieldAccessTypedExpr.create(
@@ -109,6 +107,8 @@ public class RexNodeConverter {
case BINARY:
return new VarBinaryValue(literal.getValue().toString());
case DECIMAL:
+ case INTERVAL_SECOND:
+ // interval is used as decimal.
// TODO: fix precision check
BigDecimal bigDecimal = literal.getValueAs(BigDecimal.class);
if (bigDecimal.precision() <= 18) {
@@ -121,5 +121,4 @@ public class RexNodeConverter {
"Unsupported rex node type: " +
literal.getType().getSqlTypeName());
}
}
-
}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/DefaultFunctionConverter.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/DefaultFunctionConverter.java
new file mode 100644
index 0000000000..5f1d35bc38
--- /dev/null
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/DefaultFunctionConverter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.rexnode.functions;
+
+import io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.Type;
+
+import java.util.List;
+
+/** Default convertor for velox function. */
+public class DefaultFunctionConverter implements FunctionConverter {
+ private final String function;
+
+ public DefaultFunctionConverter(String function) {
+ this.function = function;
+ }
+
+ @Override
+ public CallTypedExpr toVeloxFunction(Type nodeType, List<TypedExpr>
params) {
+ return new CallTypedExpr(nodeType, params, function);
+ }
+}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/FunctionConverter.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/FunctionConverter.java
new file mode 100644
index 0000000000..7f29adfeb0
--- /dev/null
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/FunctionConverter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.rexnode.functions;
+
+import io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.Type;
+
+import java.util.List;
+
+/** Interface for converter to velox function. */
+public interface FunctionConverter {
+
+ CallTypedExpr toVeloxFunction(Type nodeType, List<TypedExpr> params);
+}
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SubtractFunctionConverter.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SubtractFunctionConverter.java
new file mode 100644
index 0000000000..483c5cf1ac
--- /dev/null
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/SubtractFunctionConverter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.rexnode.functions;
+
+import io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.TimestampType;
+import io.github.zhztheplayer.velox4j.type.Type;
+
+import org.apache.flink.util.Preconditions;
+import java.util.List;
+
+/** Subtract function converter. */
+public class SubtractFunctionConverter implements FunctionConverter {
+ private final String function;
+
+ public SubtractFunctionConverter(String function) {
+ this.function = function;
+ }
+
+ @Override
+ public CallTypedExpr toVeloxFunction(Type nodeType, List<TypedExpr>
params) {
+ Preconditions.checkNotNull(params.size() == 2, "Subtract must contain
exactly two parameters");
+
+ // TODO: need refine for more type cast
+ if (params.get(0).getReturnType().getClass() == TimestampType.class &&
+ params.get(1).getReturnType().getClass() == BigIntType.class) {
+ // hardcode here for next mark watermark whose param1 is Timestamp
and 2 is BigInt.
+ Type newType = new BigIntType();
+ TypedExpr param0 = new CallTypedExpr(
+ newType,
+ List.of(params.get(0)),
+ "cast");
+ return new CallTypedExpr(
+ newType,
+ List.of(param0, params.get(1)),
+ function);
+ }
+ return new CallTypedExpr(nodeType, params, function);
+ }
+}
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
index 04891d3216..e2a1ccc2e1 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
@@ -20,6 +20,7 @@ import io.github.zhztheplayer.velox4j.type.IntegerType;
import io.github.zhztheplayer.velox4j.type.Type;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -58,6 +59,9 @@ public class LogicalTypeConverter {
return new io.github.zhztheplayer.velox4j.type.DecimalType(
decimalType.getPrecision(),
decimalType.getScale());
+ } else if (logicalType instanceof DayTimeIntervalType) {
+ // TODO: it seems interval now can be used as bigint for nexmark.
+ return new io.github.zhztheplayer.velox4j.type.BigIntType();
} else {
throw new RuntimeException("Unsupported logical type: " +
logicalType);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]