This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 21504b693c [GLUTEN-10629][FLINK] Support udf count_char (#10674)
21504b693c is described below
commit 21504b693cdd2ca17ec32f2a18d8c9bdd4dcdf87
Author: shuai.xu <[email protected]>
AuthorDate: Thu Sep 18 16:10:09 2025 +0800
[GLUTEN-10629][FLINK] Support udf count_char (#10674)
---
.../main/java/org/apache/gluten/rexnode/RexNodeConverter.java | 2 ++
.../gluten/rexnode/functions/RexCallConverterFactory.java | 4 +++-
.../table/runtime/operators/GlutenVectorSourceFunction.java | 3 +--
.../main/java/org/apache/gluten/util/LogicalTypeConverter.java | 10 +++++++++-
4 files changed, 15 insertions(+), 4 deletions(-)
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java
index ef8858e373..3674fb9d9b 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java
@@ -121,6 +121,8 @@ public class RexNodeConverter {
} else {
return new HugeIntValue(bigDecimal.unscaledValue());
}
+ case SYMBOL:
+ return new VarCharValue(literal.getValue().toString());
default:
throw new RuntimeException(
"Unsupported rex node type: " +
literal.getType().getSqlTypeName());
diff --git
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
index 0d7ddb96d8..3d68f333dc 100644
---
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
+++
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
@@ -99,7 +99,9 @@ public class RexCallConverterFactory {
Map.entry("IS NOT NULL", Arrays.asList(() -> new
DefaultRexCallConverter("is_not_null"))),
Map.entry(
"REGEXP_EXTRACT", Arrays.asList(() -> new
DefaultRexCallConverter("regexp_extract"))),
- Map.entry("LOWER", Arrays.asList(() -> new
DefaultRexCallConverter("lower"))));
+ Map.entry("LOWER", Arrays.asList(() -> new
DefaultRexCallConverter("lower"))),
+ Map.entry("count_char", Arrays.asList(() -> new
DefaultRexCallConverter("count_char"))),
+ Map.entry("EXTRACT", Arrays.asList(() -> new
DefaultRexCallConverter("extract"))));
public static RexCallConverter getConverter(RexCall callNode,
RexConversionContext context) {
String operatorName = callNode.getOperator().getName();
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
index 90ad655f0f..a4d6745163 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
@@ -75,7 +75,6 @@ public class GlutenVectorSourceFunction extends
RichParallelSourceFunction<State
this.outputTypes = outputTypes;
this.id = id;
this.split = split;
- LOG.debug("GlutenSourceFunction {}", outputTypes);
}
public StatefulPlanNode getPlanNode() {
@@ -110,7 +109,6 @@ public class GlutenVectorSourceFunction extends
RichParallelSourceFunction<State
@Override
public void run(SourceContext<StatefulElement> sourceContext) throws
Exception {
- LOG.debug("Running GlutenSourceFunction: " + Serde.toJson(planNode));
while (isRunning) {
UpIterator.State state = task.advance();
if (state == UpIterator.State.AVAILABLE) {
@@ -149,6 +147,7 @@ public class GlutenVectorSourceFunction extends
RichParallelSourceFunction<State
@Override
public void initializeState(FunctionInitializationContext context) throws
Exception {
if (memoryManager == null) {
+ LOG.debug("Running GlutenSourceFunction: " + Serde.toJson(planNode));
memoryManager = MemoryManager.create(AllocationListener.NOOP);
session = Velox4j.newSession(memoryManager);
query = new Query(planNode, Config.empty(), ConnectorConfig.empty());
diff --git
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
index d4e33a17a1..e8cbc00eb5 100644
---
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
+++
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
@@ -31,7 +31,9 @@ import
org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SymbolType;
import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
import org.apache.flink.table.types.logical.VarCharType;
import java.util.List;
@@ -108,7 +110,13 @@ public class LogicalTypeConverter {
// TODO: may need precision
Map.entry(
LocalZonedTimestampType.class,
- logicalType -> new
io.github.zhztheplayer.velox4j.type.TimestampType()));
+ logicalType -> new
io.github.zhztheplayer.velox4j.type.TimestampType()),
+ Map.entry(
+ TinyIntType.class,
+ logicalType -> new
io.github.zhztheplayer.velox4j.type.TinyIntType()),
+ Map.entry(
+ SymbolType.class,
+ logicalType -> new
io.github.zhztheplayer.velox4j.type.VarCharType()));
public static Type toVLType(LogicalType logicalType) {
VLTypeConverter converter = converters.get(logicalType.getClass());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]