This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-28813 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 5af5bf8dd07b7e7890d7aefd1b9a89933c824f5e Author: Kirill Tkalenko <[email protected]> AuthorDate: Wed Jun 24 14:19:33 2026 +0300 IGNITE-28813 wip --- .../query/calcite/CalciteQueryProcessor.java | 14 ++ .../query/calcite/exec/exp/agg/Accumulators.java | 87 ++++++----- .../exec/exp/agg/PluginAccumulatorsExtension.java | 38 +++++ .../AddAggregatFunctionViaPluginProviderTest.java | 167 +++++++++++++++++++++ .../ignite/testsuites/IntegrationTestSuite.java | 2 + 5 files changed, 271 insertions(+), 37 deletions(-) diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java index ca6fbd6f390..05ef25e19c2 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java @@ -82,6 +82,8 @@ import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecuto import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutService; import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl; import org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorImpl; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulators; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorsExtension; import org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor; import org.apache.ignite.internal.processors.query.calcite.hint.HintsConfig; @@ -313,6 +315,8 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query } distrCfg = new DistributedCalciteConfiguration(ctx, log); + + extendAccumulatorsFromPlugins(ctx); } /** @@ -853,4 +857,14 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query public InjectResourcesService injectService() { return injectSvc; } + + /** */ + private static void extendAccumulatorsFromPlugins(GridKernalContext ctx) { + PluginAccumulatorsExtension[] extensions = ctx.plugins().extensions(PluginAccumulatorsExtension.class); + + if (!F.isEmpty(extensions)) { + for (PluginAccumulatorsExtension extension : extensions) + Accumulators.addPluginAccumulatorFactories(extension.accumulatorFactories()); + } + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java index 9c15e4a7a23..8af20faf6ff 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java @@ -24,8 +24,10 @@ import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.function.Supplier; import org.apache.calcite.avatica.util.ByteString; @@ -37,6 +39,7 @@ import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorsExtension.PluginAccumulatorFactory; import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; import org.apache.ignite.internal.processors.query.calcite.util.Commons; import org.apache.ignite.internal.util.typedef.F; @@ -54,6 +57,9 @@ import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR; * */ public class Accumulators { + /** */ + private static final Map<String, PluginAccumulatorFactory<?>> PLUGIN_FACTORY_BY_NAME = new ConcurrentHashMap<>(); + /** */ public static <Row> Supplier<Accumulator<Row>> accumulatorFactory(AggregateCall call, ExecutionContext<Row> ctx) { Supplier<Accumulator<Row>> supplier = accumulatorFunctionFactory(call, ctx); @@ -71,38 +77,29 @@ public class Accumulators { ) { RowHandler<Row> hnd = ctx.rowHandler(); - switch (call.getAggregation().getName()) { - case "COUNT": - return () -> new LongCount<>(call, hnd); - case "AVG": - return avgFactory(call, hnd); - case "SUM": - return sumFactory(call, hnd); - case "$SUM0": - return sumEmptyIsZeroFactory(call, hnd); - case "MIN": - case "EVERY": - return minFactory(call, hnd); - case "MAX": - case "SOME": - return maxFactory(call, hnd); - case "SINGLE_VALUE": - return () -> new SingleVal<>(call, hnd); - case "LITERAL_AGG": - return () -> new LiteralVal<>(call, hnd); - case "ANY_VALUE": - return () -> new AnyVal<>(call, hnd); - case "LISTAGG": - case "ARRAY_AGG": - case "ARRAY_CONCAT_AGG": - return listAggregateSupplier(call, ctx); - case "BIT_AND": - case "BIT_OR": - case "BIT_XOR": - return bitWiseFactory(call, hnd); - default: - throw new AssertionError(call.getAggregation().getName()); - } + String aggFunName = call.getAggregation().getName(); + + return switch (aggFunName) { + case "COUNT" -> () -> new LongCount<>(call, hnd); + case "AVG" -> avgFactory(call, hnd); + case "SUM" -> sumFactory(call, hnd); + case "$SUM0" -> sumEmptyIsZeroFactory(call, hnd); + case "MIN", "EVERY" -> minFactory(call, hnd); + case "MAX", "SOME" -> maxFactory(call, hnd); + case "SINGLE_VALUE" -> () -> new SingleVal<>(call, hnd); + case "LITERAL_AGG" -> () -> new LiteralVal<>(call, hnd); + case "ANY_VALUE" -> () -> new AnyVal<>(call, hnd); + case "LISTAGG", "ARRAY_AGG", "ARRAY_CONCAT_AGG" -> listAggregateSupplier(call, ctx); + case "BIT_AND", "BIT_OR", "BIT_XOR" -> bitWiseFactory(call, hnd); + default -> { + PluginAccumulatorFactory<Row> factory = (PluginAccumulatorFactory<Row>) PLUGIN_FACTORY_BY_NAME.get(aggFunName); + + if (factory == null) + throw new AssertionError("Accumulator factory not found for: " + aggFunName); + + yield () -> factory.create(call, ctx); + } + }; } /** */ @@ -280,7 +277,7 @@ public class Accumulators { } /** */ - private abstract static class AbstractAccumulator<Row> implements Accumulator<Row> { + public abstract static class AbstractAccumulator<Row> implements Accumulator<Row> { /** */ private final RowHandler<Row> hnd; @@ -288,13 +285,13 @@ public class Accumulators { private final transient AggregateCall aggCall; /** */ - AbstractAccumulator(AggregateCall aggCall, RowHandler<Row> hnd) { + protected AbstractAccumulator(AggregateCall aggCall, RowHandler<Row> hnd) { this.aggCall = aggCall; this.hnd = hnd; } /** */ - <T> T get(int idx, Row row) { + protected <T> T get(int idx, Row row) { assert idx < arguments().size() : "idx=" + idx + "; arguments=" + arguments(); return (T)hnd.get(arguments().get(idx), row); @@ -311,7 +308,7 @@ public class Accumulators { } /** */ - int columnCount(Row row) { + protected int columnCount(Row row) { return hnd.columnCount(row); } } @@ -1344,8 +1341,9 @@ public class Accumulators { if (builder == null) builder = new StringBuilder(); - if (builder.length() != 0) + if (!builder.isEmpty()) builder.append(extractSeparator(row)); + builder.append(val); } @@ -1510,4 +1508,19 @@ public class Accumulators { return acc.returnType(typeFactory); } } + + /** */ + public static void addPluginAccumulatorFactories(Map<String, PluginAccumulatorFactory<?>> factoryByAggFunName) { + for (Map.Entry<String, PluginAccumulatorFactory<?>> e : factoryByAggFunName.entrySet()) { + String aggFunName = e.getKey().trim().toUpperCase(Locale.ROOT); + + if (aggFunName.isBlank()) + throw new AssertionError("Invalid aggregate function name: " + aggFunName); + + PluginAccumulatorFactory<?> prev = PLUGIN_FACTORY_BY_NAME.putIfAbsent(aggFunName, e.getValue()); + +// if (prev != null) +// throw new AssertionError("Duplicate aggregate function name: " + aggFunName); + } + } } diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorsExtension.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorsExtension.java new file mode 100644 index 00000000000..aa80d7f11c7 --- /dev/null +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorsExtension.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg; + +import java.util.Map; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext; +import org.apache.ignite.plugin.Extension; +import org.apache.ignite.plugin.PluginProvider; + +/** Class for extending {@link Accumulators} via {@link PluginProvider plugins}. */ +@FunctionalInterface +public interface PluginAccumulatorsExtension extends Extension { + /** @return Accumulator factories by aggregate function name. Name must be non-empty and unique. */ + Map<String, PluginAccumulatorFactory<?>> accumulatorFactories(); + + /** */ + @FunctionalInterface + interface PluginAccumulatorFactory<Row> { + /** */ + Accumulator<Row> create(AggregateCall call, ExecutionContext<Row> ctx); + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java new file mode 100644 index 00000000000..b53ee861189 --- /dev/null +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.query.calcite.integration; + +import java.util.List; +import java.util.Map; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlAggFunction; +import org.apache.calcite.sql.SqlFunctionCategory; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable; +import org.apache.calcite.sql.util.SqlOperatorTables; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.util.Optionality; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor; +import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulators.AbstractAccumulator; +import org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorsExtension; +import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory; +import org.apache.ignite.plugin.AbstractTestPluginProvider; +import org.apache.ignite.plugin.ExtensionRegistry; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.plugin.PluginProvider; +import org.jspecify.annotations.Nullable; +import org.junit.Test; + +/** Test for adding aggregat function via {@link PluginProvider}. */ +public class AddAggregatFunctionViaPluginProviderTest extends AbstractBasicIntegrationTest { + /** */ + private static final String TEST_SUM_FUN_NAME = "TEST_SUM"; + + /** {@inheritDoc} */ + @Override protected int nodeCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setPluginProviders(new TestPluginProvider()); + } + + /** */ + @Test + public void test() { + assertQuery("SELECT TEST_SUM(x) FROM (VALUES (1), (2), (3)) t(x)") + .returns(6L) + .check(); + } + + /** */ + private static class TestPluginProvider extends AbstractTestPluginProvider { + /** {@inheritDoc} */ + @Override public String name() { + return getClass().getName(); + } + + /** {@inheritDoc} */ + @Override public @Nullable <T> T createComponent(PluginContext ctx, Class<T> cls) { + if (!FrameworkConfig.class.equals(cls)) + return null; + + FrameworkConfig cfg = CalciteQueryProcessor.FRAMEWORK_CONFIG; + + return (T) Frameworks.newConfigBuilder(cfg) + .operatorTable(SqlOperatorTables.chain( + new TestSqlOperatorTable().init(), cfg.getOperatorTable() + )) + .build(); + } + + /** {@inheritDoc} */ + @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) { + registry.registerExtension( + PluginAccumulatorsExtension.class, + () -> Map.of(TEST_SUM_FUN_NAME, (call, ctx1) -> new TestSum<>(call, ctx1.rowHandler())) + ); + } + } + + /** */ + public static class TestSqlSumAggFunction extends SqlAggFunction { + /** */ + public TestSqlSumAggFunction() { + super( + TEST_SUM_FUN_NAME, + null, + SqlKind.SUM, + ReturnTypes.AGG_SUM, + null, + OperandTypes.NUMERIC, + SqlFunctionCategory.NUMERIC, + false, + false, + Optionality.FORBIDDEN + ); + } + } + + /** */ + public static class TestSqlOperatorTable extends ReflectiveSqlOperatorTable { + /** */ + @SuppressWarnings("unused") + public static final SqlAggFunction TEST_SUM = new TestSqlSumAggFunction(); + } + + /** */ + private static class TestSum<Row> extends AbstractAccumulator<Row> { + /** */ + private long sum; + + /** */ + protected TestSum(AggregateCall aggCall, RowHandler<Row> hnd) { + super(aggCall, hnd); + } + + /** {@inheritDoc} */ + @Override public void add(Row row) { + Number val = get(0, row); + + if (val != null) + sum += val.longValue(); + } + + /** {@inheritDoc} */ + @Override public void apply(Accumulator<Row> other) { + sum += ((TestSum<Row>)other).sum; + } + + /** {@inheritDoc} */ + @Override public Object end() { + return sum; + } + + /** {@inheritDoc} */ + @Override public List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory) { + return List.of(typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT), true)); + } + + /** {@inheritDoc} */ + @Override public RelDataType returnType(IgniteTypeFactory typeFactory) { + return typeFactory.createSqlType(org.apache.calcite.sql.type.SqlTypeName.BIGINT); + } + } +} diff --git a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java index f4e123c6a7c..177e07c04a7 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java +++ b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor import org.apache.ignite.internal.processors.query.calcite.CancelTest; import org.apache.ignite.internal.processors.query.calcite.IndexWithSameNameCalciteTest; import org.apache.ignite.internal.processors.query.calcite.SqlFieldsQueryUsageTest; +import org.apache.ignite.internal.processors.query.calcite.integration.AddAggregatFunctionViaPluginProviderTest; import org.apache.ignite.internal.processors.query.calcite.integration.AggregatesIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.AuthorizationIntegrationTest; import org.apache.ignite.internal.processors.query.calcite.integration.CacheStoreTest; @@ -183,6 +184,7 @@ import org.junit.runners.Suite; CacheWithInterceptorIntegrationTest.class, TxThreadLockingTest.class, SelectByKeyFieldTest.class, + AddAggregatFunctionViaPluginProviderTest.class, }) public class IntegrationTestSuite { }
