This is an automated email from the ASF dual-hosted git repository.
zhangmang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/auron.git
The following commit(s) were added to refs/heads/master by this push:
new 9e48cc72 [AURON #1856] Introduce Flink expression-level converter
framework (#2146)
9e48cc72 is described below
commit 9e48cc72a2eeede8e298df2f61db784f711283f0
Author: Weiqing Yang <[email protected]>
AuthorDate: Wed Apr 1 22:38:32 2026 -0700
[AURON #1856] Introduce Flink expression-level converter framework (#2146)
# Which issue does this PR close?
Closes #1856
# Rationale for this change
Auron's Flink integration has the data exchange layer in place (Arrow
Writer #1850, Arrow Reader #1851) but lacks the conversion
infrastructure — the machinery that decides which Flink expressions and
aggregates can be converted to native execution and how to translate
them into Auron's protobuf representation.
# What changes are included in this PR?
Five foundational Java classes in `auron-flink-planner`:
| Class | Role |
|-------|------|
| `FlinkNodeConverter<T>` | Generic base interface: `getNodeClass()`,
`isSupported()`, `convert()` → `PhysicalExprNode` |
| `FlinkRexNodeConverter` | Sub-interface for Calcite `RexNode`
expressions |
| `FlinkAggCallConverter` | Sub-interface for Calcite `AggregateCall`
aggregates |
| `FlinkNodeConverterFactory` | Singleton registry with separate
`rexConverterMap` + `aggConverterMap`, typed registration and fail-safe
dispatch |
| `ConverterContext` | Immutable holder for input schema (`RowType`),
Flink config, Auron config, and classloader |
pom.xml: `flink-core` scope changed from `test` to `provided` (required
for `ReadableConfig`).
Framework-only — no concrete converter implementations. Follow-up issues
will add RexLiteral, RexInputRef, RexCall, and aggregate converters.
Design doc: `docs/PR-AURON-1856/AURON-1856-DESIGN.md`
Review helper: `docs/reviewhelper/AURON-1856/01-converter-framework.md`
# Are there any user-facing changes?
No.
# How was this patch tested?
8 unit tests. Checkstyle: 0 violations.
---
auron-flink-extension/auron-flink-planner/pom.xml | 2 +-
.../table/planner/converter/ConverterContext.java | 80 +++++++++
.../planner/converter/FlinkAggCallConverter.java | 35 ++++
.../planner/converter/FlinkNodeConverter.java | 64 +++++++
.../converter/FlinkNodeConverterFactory.java | 170 ++++++++++++++++++
.../planner/converter/FlinkRexNodeConverter.java | 37 ++++
.../converter/FlinkNodeConverterFactoryTest.java | 199 +++++++++++++++++++++
7 files changed, 586 insertions(+), 1 deletion(-)
diff --git a/auron-flink-extension/auron-flink-planner/pom.xml
b/auron-flink-extension/auron-flink-planner/pom.xml
index 268418eb..bce3ae82 100644
--- a/auron-flink-extension/auron-flink-planner/pom.xml
+++ b/auron-flink-extension/auron-flink-planner/pom.xml
@@ -201,7 +201,7 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
- <scope>test</scope>
+ <scope>provided</scope>
</dependency>
<dependency>
diff --git
a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/ConverterContext.java
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/ConverterContext.java
new file mode 100644
index 00000000..c19d1a5c
--- /dev/null
+++
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/ConverterContext.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.planner.converter;
+
+import java.util.Objects;
+import org.apache.auron.configuration.AuronConfiguration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Provides shared state to {@link FlinkNodeConverter} implementations during
conversion.
+ *
+ * <p>Carries the input schema, configuration, and classloader needed for
type-aware conversion
+ * of Flink expressions and aggregate calls.
+ */
+public class ConverterContext {
+
+ private final ReadableConfig tableConfig;
+ private final AuronConfiguration auronConfiguration;
+ private final ClassLoader classLoader;
+ private final RowType inputType;
+
+ /**
+ * Creates a new converter context.
+ *
+ * @param tableConfig Flink table-level configuration
+ * @param auronConfiguration Auron-specific configuration, may be {@code
null}
+ * @param classLoader classloader for the current Flink context
+ * @param inputType input schema of the node being converted
+ */
+ public ConverterContext(
+ ReadableConfig tableConfig,
+ AuronConfiguration auronConfiguration,
+ ClassLoader classLoader,
+ RowType inputType) {
+ this.tableConfig = Objects.requireNonNull(tableConfig, "tableConfig
must not be null");
+ this.auronConfiguration = auronConfiguration;
+ this.classLoader = Objects.requireNonNull(classLoader, "classLoader
must not be null");
+ this.inputType = Objects.requireNonNull(inputType, "inputType must not
be null");
+ }
+
+ /** Returns the Flink table-level configuration. */
+ public ReadableConfig getTableConfig() {
+ return tableConfig;
+ }
+
+ /** Returns the Auron-specific configuration, or {@code null} if not
provided. */
+ public AuronConfiguration getAuronConfiguration() {
+ return auronConfiguration;
+ }
+
+ /** Returns the classloader for the current Flink context. */
+ public ClassLoader getClassLoader() {
+ return classLoader;
+ }
+
+ /**
+ * Returns the input schema of the node being converted.
+ *
+ * <p>Converters use this to resolve {@code RexInputRef} column references
to concrete types,
+ * check type support, and determine if casts are needed.
+ */
+ public RowType getInputType() {
+ return inputType;
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkAggCallConverter.java
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkAggCallConverter.java
new file mode 100644
index 00000000..7ae947ac
--- /dev/null
+++
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkAggCallConverter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.planner.converter;
+
+import org.apache.calcite.rel.core.AggregateCall;
+
+/**
+ * Converts a Calcite {@link AggregateCall} to an Auron native
+ * {@link org.apache.auron.protobuf.PhysicalExprNode} (wrapping a
+ * {@code PhysicalAggExprNode}).
+ *
+ * <p>An {@code AggregateCall} represents an aggregate function invocation
(e.g., SUM, COUNT, MAX)
+ * with its argument references, return type, and distinctness. The converter
translates this into
+ * a {@code PhysicalAggExprNode} containing the aggregate function type,
converted child
+ * expressions, and return type.
+ *
+ * <p>Note: {@code AggregateCall} internally references input columns by
index. The converter uses
+ * {@link ConverterContext#getInputType()} to resolve these indices to
concrete types for type
+ * checking and cast insertion.
+ */
+public interface FlinkAggCallConverter extends
FlinkNodeConverter<AggregateCall> {}
diff --git
a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverter.java
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverter.java
new file mode 100644
index 00000000..8e6c76af
--- /dev/null
+++
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverter.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.planner.converter;
+
+import org.apache.auron.protobuf.PhysicalExprNode;
+
+/**
+ * Base interface for converting Flink plan elements to Auron native {@link
PhysicalExprNode}
+ * representations.
+ *
+ * <p>This interface is parameterized by the input type to support different
categories of plan
+ * elements. Two sub-interfaces are provided:
+ * <ul>
+ * <li>{@link FlinkRexNodeConverter} for Calcite {@code RexNode} expressions
+ * <li>{@link FlinkAggCallConverter} for Calcite {@code AggregateCall}
aggregates
+ * </ul>
+ *
+ * @param <T> the type of plan element this converter handles
+ */
+public interface FlinkNodeConverter<T> {
+
+ /**
+ * Returns the concrete class this converter handles.
+ *
+ * <p>Used by {@link FlinkNodeConverterFactory} for lookup dispatch.
+ */
+ Class<? extends T> getNodeClass();
+
+ /**
+ * Checks whether the given element can be converted to native execution.
+ *
+ * <p>A converter may decline based on unsupported types, operand
combinations, or
+ * configuration. This method must not have side effects.
+ *
+ * @param node the plan element to check
+ * @param context shared conversion state (input schema, configuration)
+ * @return {@code true} if the element can be converted
+ */
+ boolean isSupported(T node, ConverterContext context);
+
+ /**
+ * Converts the given element to a native {@link PhysicalExprNode}.
+ *
+ * @param node the plan element to convert
+ * @param context shared conversion state (input schema, configuration)
+ * @return the native expression representation
+ * @throws IllegalArgumentException if the element type does not match
{@link #getNodeClass()}
+ */
+ PhysicalExprNode convert(T node, ConverterContext context);
+}
diff --git
a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactory.java
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactory.java
new file mode 100644
index 00000000..a8610135
--- /dev/null
+++
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactory.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.planner.converter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.auron.protobuf.PhysicalExprNode;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rex.RexNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Singleton registry of {@link FlinkNodeConverter} instances. Dispatches
conversion requests to
+ * the appropriate converter based on the input element's class.
+ *
+ * <p>Maintains separate registries for different converter categories:
+ * <ul>
+ * <li>{@code rexConverterMap} for {@link FlinkRexNodeConverter} instances
+ * (keyed by {@code RexNode} subclass)
+ * <li>{@code aggConverterMap} for {@link FlinkAggCallConverter} instances
+ * (keyed by {@code AggregateCall} class)
+ * </ul>
+ *
+ * <p>Usage:
+ * <pre>
+ * FlinkNodeConverterFactory factory =
FlinkNodeConverterFactory.getInstance();
+ * // Convert a RexNode expression
+ * Optional<PhysicalExprNode> result = factory.convertRexNode(rexNode,
context);
+ * // Convert an AggregateCall
+ * Optional<PhysicalExprNode> aggResult =
factory.convertAggCall(aggCall, context);
+ * </pre>
+ */
+public class FlinkNodeConverterFactory {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkNodeConverterFactory.class);
+
+ private static final FlinkNodeConverterFactory INSTANCE = new
FlinkNodeConverterFactory();
+
+ private final Map<Class<? extends RexNode>, FlinkRexNodeConverter>
rexConverterMap;
+ private final Map<Class<? extends AggregateCall>, FlinkAggCallConverter>
aggConverterMap;
+
+ // Package-private for test isolation (tests create fresh instances)
+ FlinkNodeConverterFactory() {
+ this.rexConverterMap = new HashMap<>();
+ this.aggConverterMap = new HashMap<>();
+ }
+
+ /** Returns the singleton instance. */
+ public static FlinkNodeConverterFactory getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Registers a {@link FlinkRexNodeConverter} for its declared {@code
RexNode} subclass.
+ *
+ * @param converter the converter to register
+ * @throws IllegalArgumentException if a converter is already registered
for the same class
+ */
+ public void registerRexConverter(FlinkRexNodeConverter converter) {
+ Class<? extends RexNode> nodeClass = converter.getNodeClass();
+ if (rexConverterMap.containsKey(nodeClass)) {
+ throw new IllegalArgumentException("Duplicate RexNode converter
for " + nodeClass.getName());
+ }
+ rexConverterMap.put(nodeClass, converter);
+ }
+
+ /**
+ * Registers a {@link FlinkAggCallConverter} for its declared {@code
AggregateCall} class.
+ *
+ * @param converter the converter to register
+ * @throws IllegalArgumentException if a converter is already registered
for the same class
+ */
+ public void registerAggConverter(FlinkAggCallConverter converter) {
+ Class<? extends AggregateCall> nodeClass = converter.getNodeClass();
+ if (aggConverterMap.containsKey(nodeClass)) {
+ throw new IllegalArgumentException("Duplicate AggregateCall
converter for " + nodeClass.getName());
+ }
+ aggConverterMap.put(nodeClass, converter);
+ }
+
+ /**
+ * Attempts to convert the given {@link RexNode} to a native {@link
PhysicalExprNode}.
+ *
+ * <p>Returns the native expression if a matching converter exists and
supports the node.
+ * Returns empty if no converter is registered, the converter does not
support the node,
+ * or conversion fails (fail-safe).
+ *
+ * @param node the RexNode to convert
+ * @param context shared conversion state
+ * @return the converted expression, or empty
+ */
+ public Optional<PhysicalExprNode> convertRexNode(RexNode node,
ConverterContext context) {
+ FlinkRexNodeConverter converter = rexConverterMap.get(node.getClass());
+ if (converter == null) {
+ return Optional.empty();
+ }
+ if (!converter.isSupported(node, context)) {
+ return Optional.empty();
+ }
+ try {
+ return Optional.of(converter.convert(node, context));
+ } catch (Exception e) {
+ LOG.warn("RexNode conversion failed for {}",
node.getClass().getName(), e);
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Attempts to convert the given {@link AggregateCall} to a native {@link
PhysicalExprNode}.
+ *
+ * <p>Returns the native expression if a matching converter exists and
supports the call.
+ * Returns empty if no converter is registered, the converter does not
support the call,
+ * or conversion fails (fail-safe).
+ *
+ * @param aggCall the AggregateCall to convert
+ * @param context shared conversion state
+ * @return the converted expression, or empty
+ */
+ public Optional<PhysicalExprNode> convertAggCall(AggregateCall aggCall,
ConverterContext context) {
+ FlinkAggCallConverter converter =
aggConverterMap.get(aggCall.getClass());
+ if (converter == null) {
+ return Optional.empty();
+ }
+ if (!converter.isSupported(aggCall, context)) {
+ return Optional.empty();
+ }
+ try {
+ return Optional.of(converter.convert(aggCall, context));
+ } catch (Exception e) {
+ LOG.warn(
+ "AggregateCall conversion failed for {}",
aggCall.getClass().getName(), e);
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Returns the converter registered for the given element's class, if any.
+ *
+ * <p>Dispatches by type hierarchy: checks {@code RexNode} converters
first, then
+ * {@code AggregateCall} converters.
+ *
+ * @param nodeClass the class to look up
+ * @return the matching converter, or empty
+ */
+ @SuppressWarnings("unchecked")
+ public Optional<FlinkNodeConverter<?>> getConverter(Class<?> nodeClass) {
+ if (RexNode.class.isAssignableFrom(nodeClass)) {
+ return Optional.ofNullable(rexConverterMap.get((Class<? extends
RexNode>) nodeClass));
+ } else if (AggregateCall.class.isAssignableFrom(nodeClass)) {
+ return Optional.ofNullable(aggConverterMap.get((Class<? extends
AggregateCall>) nodeClass));
+ }
+ return Optional.empty();
+ }
+}
diff --git
a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkRexNodeConverter.java
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkRexNodeConverter.java
new file mode 100644
index 00000000..9778ae01
--- /dev/null
+++
b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/FlinkRexNodeConverter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.planner.converter;
+
+import org.apache.calcite.rex.RexNode;
+
+/**
+ * Converts a Calcite {@link RexNode} expression to an Auron native
+ * {@link org.apache.auron.protobuf.PhysicalExprNode}.
+ *
+ * <p>Implementations handle specific {@code RexNode} subtypes:
+ * <ul>
+ * <li>{@code RexLiteral} — scalar literal values
+ * <li>{@code RexInputRef} — column references (resolved via
+ * {@link ConverterContext#getInputType()})
+ * <li>{@code RexCall} — function/operator calls (arithmetic, comparison,
CAST, etc.)
+ * <li>{@code RexFieldAccess} — nested field access
+ * </ul>
+ *
+ * <p>RexNode converters are reusable across operator types — the same {@code
RexInputRef}
+ * converter works for Calc projections, Agg grouping expressions, and future
operators.
+ */
+public interface FlinkRexNodeConverter extends FlinkNodeConverter<RexNode> {}
diff --git
a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactoryTest.java
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactoryTest.java
new file mode 100644
index 00000000..ac645174
--- /dev/null
+++
b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/FlinkNodeConverterFactoryTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.planner.converter;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Optional;
+import org.apache.auron.protobuf.PhysicalExprNode;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link FlinkNodeConverterFactory}. */
+class FlinkNodeConverterFactoryTest {
+
+ private static final RelDataTypeFactory TYPE_FACTORY = new
JavaTypeFactoryImpl();
+ private static final RexBuilder REX_BUILDER = new RexBuilder(TYPE_FACTORY);
+
+ private FlinkNodeConverterFactory factory;
+ private ConverterContext context;
+ private RexLiteral testLiteral;
+ private AggregateCall testAggCall;
+
+ @BeforeEach
+ void setUp() {
+ factory = new FlinkNodeConverterFactory();
+ context =
+ new ConverterContext(new Configuration(), null,
getClass().getClassLoader(), RowType.of(new IntType()));
+ testLiteral =
REX_BUILDER.makeExactLiteral(java.math.BigDecimal.valueOf(42));
+ testAggCall = AggregateCall.create(
+ SqlStdOperatorTable.COUNT,
+ false,
+ false,
+ false,
+ java.util.Collections.emptyList(),
+ -1,
+ null,
+ org.apache.calcite.rel.RelCollations.EMPTY,
+ TYPE_FACTORY.createSqlType(SqlTypeName.BIGINT),
+ "cnt");
+ }
+
+ @Test
+ void testRexConverterDispatch() {
+ PhysicalExprNode expected = PhysicalExprNode.newBuilder().build();
+ factory.registerRexConverter(new StubRexNodeConverter(true, expected));
+
+ Optional<PhysicalExprNode> result =
factory.convertRexNode(testLiteral, context);
+ assertTrue(result.isPresent());
+ assertEquals(expected, result.get());
+ }
+
+ @Test
+ void testAggConverterDispatch() {
+ PhysicalExprNode expected = PhysicalExprNode.newBuilder().build();
+ factory.registerAggConverter(new StubAggCallConverter(true, expected));
+
+ Optional<PhysicalExprNode> result =
factory.convertAggCall(testAggCall, context);
+ assertTrue(result.isPresent());
+ assertEquals(expected, result.get());
+ }
+
+ @Test
+ void testUnsupportedRexPassthrough() {
+ factory.registerRexConverter(new StubRexNodeConverter(false, null));
+
+ Optional<PhysicalExprNode> result =
factory.convertRexNode(testLiteral, context);
+ assertFalse(result.isPresent());
+ }
+
+ @Test
+ void testConversionFailureFallback() {
+ factory.registerRexConverter(new StubRexNodeConverter(true, null) {
+ @Override
+ public PhysicalExprNode convert(RexNode node, ConverterContext
context) {
+ throw new RuntimeException("conversion error");
+ }
+ });
+
+ Optional<PhysicalExprNode> result =
factory.convertRexNode(testLiteral, context);
+ assertFalse(result.isPresent());
+ }
+
+ @Test
+ void testDuplicateRexConverterRejected() {
+ factory.registerRexConverter(new StubRexNodeConverter(true, null));
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> factory.registerRexConverter(new
StubRexNodeConverter(true, null)));
+ }
+
+ @Test
+ void testGetConverterByClass() {
+ StubRexNodeConverter converter = new StubRexNodeConverter(true, null);
+ factory.registerRexConverter(converter);
+
+ Optional<FlinkNodeConverter<?>> found =
factory.getConverter(RexLiteral.class);
+ assertTrue(found.isPresent());
+ assertEquals(converter, found.get());
+ }
+
+ @Test
+ void testGetConverterByClassAgg() {
+ StubAggCallConverter converter = new StubAggCallConverter(true, null);
+ factory.registerAggConverter(converter);
+
+ Optional<FlinkNodeConverter<?>> found =
factory.getConverter(AggregateCall.class);
+ assertTrue(found.isPresent());
+ assertEquals(converter, found.get());
+ }
+
+ @Test
+ void testGetConverterAbsent() {
+ Optional<FlinkNodeConverter<?>> found =
factory.getConverter(RexLiteral.class);
+ assertFalse(found.isPresent());
+ }
+
+ // ---- Test stubs ----
+
+ /** Stub FlinkRexNodeConverter for testing. */
+ private static class StubRexNodeConverter implements FlinkRexNodeConverter
{
+ private final boolean supported;
+ private final PhysicalExprNode result;
+
+ StubRexNodeConverter(boolean supported, PhysicalExprNode result) {
+ this.supported = supported;
+ this.result = result;
+ }
+
+ @Override
+ public Class<? extends RexNode> getNodeClass() {
+ return RexLiteral.class;
+ }
+
+ @Override
+ public boolean isSupported(RexNode node, ConverterContext context) {
+ return supported;
+ }
+
+ @Override
+ public PhysicalExprNode convert(RexNode node, ConverterContext
context) {
+ return result;
+ }
+ }
+
+ /** Stub FlinkAggCallConverter for testing. */
+ private static class StubAggCallConverter implements FlinkAggCallConverter
{
+ private final boolean supported;
+ private final PhysicalExprNode result;
+
+ StubAggCallConverter(boolean supported, PhysicalExprNode result) {
+ this.supported = supported;
+ this.result = result;
+ }
+
+ @Override
+ public Class<? extends AggregateCall> getNodeClass() {
+ return AggregateCall.class;
+ }
+
+ @Override
+ public boolean isSupported(AggregateCall node, ConverterContext
context) {
+ return supported;
+ }
+
+ @Override
+ public PhysicalExprNode convert(AggregateCall node, ConverterContext
context) {
+ return result;
+ }
+ }
+}