This is an automated email from the ASF dual-hosted git repository. ppa pushed a commit to branch ignite-24995 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 3e3355a66754d0399c909d5f069c8cf5ca3e9de5 Author: Pavel Pereslegin <[email protected]> AuthorDate: Thu Jan 22 19:29:18 2026 +0300 IGNITE-24995 Optimize correaltes serialization when propagating to another node --- .../internal/sql/engine/ItCorrelatesTest.java | 11 +- .../sql/engine/exec/ExchangeServiceImpl.java | 9 +- .../internal/sql/engine/exec/ExecutionContext.java | 13 +- .../sql/engine/exec/LogicalRelImplementor.java | 3 +- .../internal/sql/engine/exec/SharedState.java | 57 +++++-- .../engine/exec/SharedStateMessageConverter.java | 165 +++++++++++++++++++++ .../sql/engine/exec/SqlEvaluationContext.java | 2 +- ...atesBuilder.java => CorrelatedFieldGetter.java} | 57 +++---- .../sql/engine/exec/exp/CorrelatesBuilder.java | 12 +- .../exec/rel/CorrelatedNestedLoopJoinNode.java | 25 +++- .../expressions/SqlExpressionFactoryAdapter.java | 2 +- .../sql/engine/externalize/RelJsonReader.java | 8 +- .../engine/message/QueryBatchRequestMessage.java | 5 +- ...RequestMessage.java => SharedStateMessage.java} | 22 +-- .../sql/engine/message/SqlQueryMessageGroup.java | 36 +++++ .../BooleanFieldMessage.java} | 23 +-- .../ByteFieldMessage.java} | 23 +-- .../DateFieldMessage.java} | 23 +-- .../DateTimeFieldMessage.java} | 23 +-- .../DecimalFieldMessage.java} | 23 ++- .../DoubleFieldMessage.java} | 23 +-- .../FloatFieldMessage.java} | 23 +-- .../IntFieldMessage.java} | 23 +-- .../LongFieldMessage.java} | 23 +-- .../ShortFieldMessage.java} | 23 +-- .../SingleFieldMessage.java} | 22 +-- .../StringFieldMessage.java} | 23 +-- .../TimeFieldMessage.java} | 23 +-- .../TimestampFieldMessage.java} | 23 +-- .../UuidFieldMessage.java} | 24 +-- .../engine/rel/IgniteCorrelatedNestedLoopJoin.java | 33 ++++- .../sql/engine/rule/CorrelateToNestedLoopRule.java | 3 +- .../engine/rule/CorrelatedNestedLoopJoinRule.java | 4 +- .../internal/sql/engine/util/IgniteMethod.java | 4 +- .../rel/CorrelatedNestedLoopJoinExecutionTest.java | 21 ++- 35 files changed, 492 insertions(+), 345 deletions(-) diff --git a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java index 64d68ec529e..9cec7b96b35 100644 --- a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java +++ b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine; import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsSubPlan; +import java.time.LocalTime; import org.apache.ignite.internal.sql.BaseSqlIntegrationTest; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -112,17 +113,17 @@ public class ItCorrelatesTest extends BaseSqlIntegrationTest { */ @Test public void testCorrelatesCollisionRight() { - sql("CREATE TABLE test1 (a INTEGER PRIMARY KEY, b INTEGER)"); - sql("CREATE TABLE test2 (a INTEGER PRIMARY KEY, c INTEGER)"); + sql("CREATE TABLE test1 (a TIME PRIMARY KEY, b INTEGER)"); + sql("CREATE TABLE test2 (a TIME PRIMARY KEY, c INTEGER)"); - sql("INSERT INTO test1 VALUES (11, 1), (12, 2), (13, 3)"); - sql("INSERT INTO test2 VALUES (11, 1), (12, 1), (13, 4)"); + sql("INSERT INTO test1 VALUES (TIME '11:00:00', 1), (TIME '12:00:00', 2), (TIME '13:00:00', 3)"); + sql("INSERT INTO test2 VALUES (TIME '11:00:00', 1), (TIME '12:00:00', 1), (TIME '13:00:00', 4)"); // Collision by correlate variables in both, left and right hands. assertQuery("SELECT /*+ disable_decorrelation */ * FROM test1 WHERE " + "EXISTS(SELECT * FROM test2 WHERE (SELECT test1.a)=test2.a AND (SELECT test1.b)<>test2.c) " + "AND NOT EXISTS(SELECT * FROM test2 WHERE (SELECT test1.a)=test2.a AND (SELECT test1.b)<test2.c)") - .returns(12, 2) + .returns(LocalTime.of(12, 0, 0), 2) .check(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java index 674b8e37f16..2dc7f7fc16d 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.sql.engine.exec.rel.Outbox; import org.apache.ignite.internal.sql.engine.message.MessageService; import org.apache.ignite.internal.sql.engine.message.QueryBatchMessage; import org.apache.ignite.internal.sql.engine.message.QueryBatchRequestMessage; +import org.apache.ignite.internal.sql.engine.message.SharedStateMessage; import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory; import org.apache.ignite.internal.util.ExceptionUtils; @@ -112,7 +113,7 @@ public class ExchangeServiceImpl implements ExchangeService { .fragmentId(fragmentId) .exchangeId(exchangeId) .amountOfBatches(amountOfBatches) - .sharedState(state) + .sharedStateMessage(SharedStateMessageConverter.toMessage(state)) .build() ); } @@ -153,8 +154,10 @@ public class ExchangeServiceImpl implements ExchangeService { Consumer<Outbox<?>> onRequestHandler = outbox -> { try { - SharedState state = msg.sharedState(); - if (state != null) { + SharedStateMessage sharedStateMessage = msg.sharedStateMessage(); + if (sharedStateMessage != null) { + SharedState state = SharedStateMessageConverter.fromMessage(sharedStateMessage); + outbox.onRewindRequest(node.name(), state, msg.amountOfBatches()); } else { outbox.onRequest(node.name(), msg.amountOfBatches()); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java index 5716c1fbaa4..603f74b57ee 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.type.NativeTypes; import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.lang.IgniteCheckedException; import org.apache.ignite.lang.IgniteException; +import org.apache.ignite.sql.ColumnType; import org.jetbrains.annotations.Nullable; /** @@ -359,18 +360,20 @@ public class ExecutionContext<RowT> implements SqlEvaluationContext<RowT> { } @Override - public RowT correlatedVariable(int id) { - return cast(sharedState.correlatedVariable(id)); + public RowT correlatedVariable(int id, int idx) { + return cast(sharedState.correlatedVariable(id, idx)); } /** * Sets correlated value. * - * @param id Correlation ID. + * @param corrId Correlation ID. + * @param fieldIdx Field index. + * @param type Value type. * @param value Correlated value. */ - public void correlatedVariable(Object value, int id) { - sharedState.correlatedVariable(id, value); + public void correlatedVariable(int corrId, int fieldIdx, ColumnType type, Object value) { + sharedState.correlatedVariable(corrId, fieldIdx, type, value); } /** diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java index dfd117016ca..3d5144074ba 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java @@ -370,7 +370,8 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>> RowFactory<RowT> rightRowFactory = ctx.rowFactoryFactory().create(convertStructuredType(rightType)); Node<RowT> node = new CorrelatedNestedLoopJoinNode<>(ctx, cond, rel.getVariablesSet(), - rel.getJoinType(), rightRowFactory, joinProjection); + rel.getJoinType(), rightRowFactory, joinProjection, convertStructuredType(leftType), + rel.getRequiredColumns()); Node<RowT> leftInput = visit(rel.getLeft()); Node<RowT> rightInput = visit(rel.getRight()); diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java index ccb3dbd8ff7..373066afac0 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java @@ -17,41 +17,68 @@ package org.apache.ignite.internal.sql.engine.exec; -import static org.apache.ignite.internal.sql.engine.util.Commons.checkRange; - -import java.io.Serializable; -import org.apache.ignite.internal.sql.engine.util.Commons; +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectMaps; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import org.apache.ignite.sql.ColumnType; +import org.jetbrains.annotations.Nullable; /** * This class represents the volatile state that may be propagated from parent to its children * during rewind. */ -public class SharedState implements Serializable { - private static final long serialVersionUID = 42L; +public class SharedState { + private final Long2ObjectMap<ValueWithType> correlations; + + public SharedState() { + this(new Long2ObjectOpenHashMap<>()); + } - private Object[] correlations = new Object[16]; + SharedState(Long2ObjectMap<ValueWithType> correlations) { + this.correlations = correlations; + } /** * Gets correlated value. * - * @param id Correlation ID. + * @param corrId Correlation ID. * @return Correlated value. */ - public Object correlatedVariable(int id) { - checkRange(correlations, id); + public Object correlatedVariable(int corrId, int fieldIdx) { + long key = packToLong(corrId, fieldIdx); - return correlations[id]; + return correlations.get(key).value; } /** * Sets correlated value. * - * @param id Correlation ID. + * @param corrId Correlation ID. + * @param fieldIdx Field index. + * @param type Value type. * @param value Correlated value. */ - public void correlatedVariable(int id, Object value) { - correlations = Commons.ensureCapacity(correlations, id + 1); + public void correlatedVariable(int corrId, int fieldIdx, ColumnType type, Object value) { + long key = packToLong(corrId, fieldIdx); + + correlations.put(key, new ValueWithType(type, value)); + } + + Long2ObjectMap<ValueWithType> correlations() { + return Long2ObjectMaps.unmodifiable(correlations); + } + + private static long packToLong(int corrId, int fieldIdx) { + return ((((long) corrId) << 32 | fieldIdx)); + } + + static class ValueWithType { + final @Nullable ColumnType type; + final Object value; - correlations[id] = value; + ValueWithType(@Nullable ColumnType type, Object value) { + this.type = type; + this.value = value; + } } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverter.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverter.java new file mode 100644 index 00000000000..781ef9d4760 --- /dev/null +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverter.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.sql.engine.exec; + +import it.unimi.dsi.fastutil.longs.Long2ObjectMap; +import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap; +import java.math.BigDecimal; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.network.NetworkMessage; +import org.apache.ignite.internal.sql.engine.exec.SharedState.ValueWithType; +import org.apache.ignite.internal.sql.engine.message.SharedStateMessage; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory; +import org.apache.ignite.internal.sql.engine.message.field.SingleFieldMessage; +import org.apache.ignite.internal.util.IgniteUtils; +import org.apache.ignite.sql.ColumnType; +import org.jetbrains.annotations.Nullable; + +/** + * Converter between {@link SharedState} and {@link SharedStateMessage}. + */ +public class SharedStateMessageConverter { + /** Message factory. */ + private static final SqlQueryMessagesFactory MESSAGE_FACTORY = new SqlQueryMessagesFactory(); + + static @Nullable SharedStateMessage toMessage(@Nullable SharedState state) { + if (state == null) { + return null; + } + + Long2ObjectMap<ValueWithType> correlations = state.correlations(); + Map<Long, NetworkMessage> result = IgniteUtils.newHashMap(correlations.size()); + + for (Long2ObjectMap.Entry<ValueWithType> entry : correlations.long2ObjectEntrySet()) { + ValueWithType valueWithType = entry.getValue(); + ColumnType type = valueWithType.type; + + assert type != null; + + SingleFieldMessage<?> msg = toSingleFieldMessage(type, valueWithType.value); + + result.put(entry.getLongKey(), msg); + } + + return MESSAGE_FACTORY.sharedStateMessage() + .sharedState(result) + .build(); + } + + static SharedState fromMessage(SharedStateMessage sharedStateMessage) { + int size = sharedStateMessage.sharedState().size(); + Long2ObjectMap<ValueWithType> correlations = new Long2ObjectOpenHashMap<>(size); + + for (Map.Entry<Long, NetworkMessage> e : sharedStateMessage.sharedState().entrySet()) { + SingleFieldMessage<Object> msg = ((SingleFieldMessage<Object>) e.getValue()); + + ColumnType type = columnTypeByMessage(msg); + + correlations.put(e.getKey().longValue(), new ValueWithType(type, msg.field())); + } + + return new SharedState(correlations); + } + + private static ColumnType columnTypeByMessage(SingleFieldMessage<?> message) { + switch (message.messageType()) { + case SqlQueryMessageGroup.BOOLEAN_FIELD_MESSAGE: + return ColumnType.BOOLEAN; + + case SqlQueryMessageGroup.BYTE_FIELD_MESSAGE: + return ColumnType.INT8; + + case SqlQueryMessageGroup.SHORT_FIELD_MESSAGE: + return ColumnType.INT16; + + case SqlQueryMessageGroup.INT_FIELD_MESSAGE: + return ColumnType.INT32; + + case SqlQueryMessageGroup.LONG_FIELD_MESSAGE: + return ColumnType.INT64; + + case SqlQueryMessageGroup.FLOAT_FIELD_MESSAGE: + return ColumnType.FLOAT; + + case SqlQueryMessageGroup.DOUBLE_FIELD_MESSAGE: + return ColumnType.DOUBLE; + + case SqlQueryMessageGroup.DECIMAL_FIELD_MESSAGE: + return ColumnType.DECIMAL; + + case SqlQueryMessageGroup.DATE_FIELD_MESSAGE: + return ColumnType.DATE; + + case SqlQueryMessageGroup.TIME_FIELD_MESSAGE: + return ColumnType.TIME; + + case SqlQueryMessageGroup.DATETIME_FIELD_MESSAGE: + return ColumnType.DATETIME; + + case SqlQueryMessageGroup.TIMESTAMP_FIELD_MESSAGE: + return ColumnType.TIMESTAMP; + + case SqlQueryMessageGroup.UUID_FIELD_MESSAGE: + return ColumnType.UUID; + + case SqlQueryMessageGroup.STRING_FIELD_MESSAGE: + return ColumnType.STRING; + + default: + throw new IllegalArgumentException("Unsupported message type: " + message.messageType()); + } + } + + private static SingleFieldMessage<?> toSingleFieldMessage(ColumnType type, Object value) { + switch (type) { + case BOOLEAN: + return MESSAGE_FACTORY.booleanFieldMessage().field((Boolean) value).build(); + case INT8: + return MESSAGE_FACTORY.byteFieldMessage().field((Byte) value).build(); + case INT16: + return MESSAGE_FACTORY.shortFieldMessage().field((Short) value).build(); + case INT32: + return MESSAGE_FACTORY.intFieldMessage().field((Integer) value).build(); + case INT64: + return MESSAGE_FACTORY.longFieldMessage().field((Long) value).build(); + case FLOAT: + return MESSAGE_FACTORY.floatFieldMessage().field((Float) value).build(); + case DOUBLE: + return MESSAGE_FACTORY.doubleFieldMessage().field((Double) value).build(); + case DECIMAL: + return MESSAGE_FACTORY.decimalFieldMessage().field((BigDecimal) value).build(); + case DATE: + return MESSAGE_FACTORY.dateFieldMessage().field((Integer) value).build(); + case TIME: + return MESSAGE_FACTORY.timeFieldMessage().field((Integer) value).build(); + case DATETIME: + return MESSAGE_FACTORY.dateTimeFieldMessage().field((Long) value).build(); + case TIMESTAMP: + return MESSAGE_FACTORY.timestampFieldMessage().field((Long) value).build(); + case UUID: + return MESSAGE_FACTORY.uuidFieldMessage().field((UUID) value).build(); + case STRING: + return MESSAGE_FACTORY.stringFieldMessage().field((String) value).build(); + + default: + throw new IllegalArgumentException("Unsupported type: " + type); + } + } +} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java index bd7aa510c17..5ad8c063359 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java @@ -39,5 +39,5 @@ public interface SqlEvaluationContext<RowT> extends DataContext { RowFactoryFactory<RowT> rowFactoryFactory(); /** Returns row representing correlation source by given correlation id. */ - RowT correlatedVariable(int id); + RowT correlatedVariable(int id, int idx); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatedFieldGetter.java similarity index 56% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatedFieldGetter.java index 0db383a5e92..cb6f59236d2 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatedFieldGetter.java @@ -17,60 +17,45 @@ package org.apache.ignite.internal.sql.engine.exec.exp; -import java.util.HashMap; -import java.util.Map; -import org.apache.calcite.linq4j.function.Function1; +import java.lang.reflect.Type; +import org.apache.calcite.adapter.enumerable.EnumUtils; import org.apache.calcite.linq4j.tree.BlockBuilder; import org.apache.calcite.linq4j.tree.Expression; import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.linq4j.tree.Primitive; import org.apache.calcite.rex.RexCorrelVariable; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.rex.RexShuttle; import org.apache.ignite.internal.sql.engine.exec.exp.RexToLixTranslator.InputGetter; +import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.IgniteMethod; +import org.jetbrains.annotations.Nullable; -class CorrelatesBuilder extends RexShuttle { - private final BlockBuilder builder; - +class CorrelatedFieldGetter implements InputGetter { private final Expression ctx; + private final RexCorrelVariable variable; - private final Expression hnd; - - private Map<String, FieldGetter> correlates; - - CorrelatesBuilder(BlockBuilder builder, Expression ctx, Expression hnd) { - this.builder = builder; - this.hnd = hnd; + CorrelatedFieldGetter(Expression ctx, RexCorrelVariable variable) { this.ctx = ctx; - } - - public Function1<String, InputGetter> build(Iterable<RexNode> nodes) { - try { - for (RexNode node : nodes) { - if (node != null) { - node.accept(this); - } - } - - return correlates == null ? null : correlates::get; - } finally { - correlates = null; - } + this.variable = variable; } /** {@inheritDoc} */ @Override - public RexNode visitCorrelVariable(RexCorrelVariable variable) { - Expression corr = builder.append("corr", + public Expression field(BlockBuilder builder, int index, @Nullable Type storageType) { + Expression expr = builder.append("corr", Expressions.call(ctx, IgniteMethod.CONTEXT_GET_CORRELATED_VALUE.method(), - Expressions.constant(variable.id.getId()))); + Expressions.constant(variable.id.getId()), Expressions.constant(index))); + + // TODO Remove code duplication (see CommonFieldGetter). + Type fieldType = Commons.typeFactory().getJavaClass(variable.getType().getFieldList().get(index).getType()); - if (correlates == null) { - correlates = new HashMap<>(); + Primitive p = Primitive.of(fieldType); + if (p == null) { + // In case of non-primitive types we can simply do casting like this: (RequiredType) fieldValue. + return Expressions.convert_(expr, fieldType); } - correlates.put(variable.getName(), new FieldGetter(hnd, corr, variable.getType())); + expr = Expressions.convert_(expr, p.getBoxClass()); - return variable; + return EnumUtils.convert(expr, p.getBoxClass(), fieldType); } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java index 0db383a5e92..2efef537c1c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java @@ -22,12 +22,10 @@ import java.util.Map; import org.apache.calcite.linq4j.function.Function1; import org.apache.calcite.linq4j.tree.BlockBuilder; import org.apache.calcite.linq4j.tree.Expression; -import org.apache.calcite.linq4j.tree.Expressions; import org.apache.calcite.rex.RexCorrelVariable; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexShuttle; import org.apache.ignite.internal.sql.engine.exec.exp.RexToLixTranslator.InputGetter; -import org.apache.ignite.internal.sql.engine.util.IgniteMethod; class CorrelatesBuilder extends RexShuttle { private final BlockBuilder builder; @@ -36,7 +34,7 @@ class CorrelatesBuilder extends RexShuttle { private final Expression hnd; - private Map<String, FieldGetter> correlates; + private Map<String, InputGetter> correlates; CorrelatesBuilder(BlockBuilder builder, Expression ctx, Expression hnd) { this.builder = builder; @@ -61,15 +59,13 @@ class CorrelatesBuilder extends RexShuttle { /** {@inheritDoc} */ @Override public RexNode visitCorrelVariable(RexCorrelVariable variable) { - Expression corr = builder.append("corr", - Expressions.call(ctx, IgniteMethod.CONTEXT_GET_CORRELATED_VALUE.method(), - Expressions.constant(variable.id.getId()))); - if (correlates == null) { correlates = new HashMap<>(); } - correlates.put(variable.getName(), new FieldGetter(hnd, corr, variable.getType())); + InputGetter inputGetter = new CorrelatedFieldGetter(ctx, variable); + + correlates.put(variable.getName(), inputGetter); return variable; } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java index ecc5bbabe81..b9ee5c37d56 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java @@ -28,10 +28,13 @@ import java.util.Set; import java.util.function.BiPredicate; import org.apache.calcite.rel.core.CorrelationId; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.util.ImmutableBitSet; import org.apache.ignite.internal.lang.IgniteStringBuilder; import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.exec.exp.SqlJoinProjection; +import org.apache.ignite.internal.type.StructNativeType; +import org.apache.ignite.sql.ColumnType; /** * CorrelatedNestedLoopJoinNode. @@ -54,6 +57,10 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends AbstractNode<RowT> { private final RowT rightEmptyRow; + private final StructNativeType leftSchema; + + private final ImmutableBitSet requiredColumns; + private int requested; private int waitingLeft; @@ -83,6 +90,8 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends AbstractNode<RowT> { * @param joinType Join rel type. * @param rightRowFactory Right row factory. * @param joinProjection Output row factory. + * @param leftSchemaType Left type. + * @param requiredColumns TODO Required columns. */ public CorrelatedNestedLoopJoinNode( ExecutionContext<RowT> ctx, @@ -90,7 +99,9 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends AbstractNode<RowT> { Set<CorrelationId> correlationIds, JoinRelType joinType, RowFactory<RowT> rightRowFactory, - SqlJoinProjection joinProjection + SqlJoinProjection joinProjection, + StructNativeType leftSchemaType, + ImmutableBitSet requiredColumns ) { super(ctx); @@ -101,6 +112,8 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends AbstractNode<RowT> { this.correlationIds = new ArrayList<>(correlationIds); this.joinType = joinType; this.joinProjection = joinProjection; + this.leftSchema = leftSchemaType; + this.requiredColumns = requiredColumns; leftInBufferSize = correlationIds.size(); rightInBufferSize = inBufSize; @@ -485,7 +498,15 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends AbstractNode<RowT> { private void prepareCorrelations() { for (int i = 0; i < correlationIds.size(); i++) { RowT row = i < leftInBuf.size() ? leftInBuf.get(i) : first(leftInBuf); - context().correlatedVariable(row, correlationIds.get(i).getId()); + int corrId = correlationIds.get(i).getId(); + + for (int idx = requiredColumns.nextSetBit(0); idx != -1; + idx = requiredColumns.nextSetBit(idx + 1)) { + Object value = context().rowAccessor().get(idx, row); + ColumnType type = leftSchema.fields().get(idx).type().spec(); + + context().correlatedVariable(corrId, idx, type, value); + } } } } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/expressions/SqlExpressionFactoryAdapter.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/expressions/SqlExpressionFactoryAdapter.java index be3376377d7..6884abfdb1b 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/expressions/SqlExpressionFactoryAdapter.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/expressions/SqlExpressionFactoryAdapter.java @@ -303,7 +303,7 @@ public class SqlExpressionFactoryAdapter implements ExpressionFactory { } @Override - public @Nullable RowT correlatedVariable(int id) { + public @Nullable RowT correlatedVariable(int id, int idx) { return null; } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java index 458e7fdb0eb..171c8772ee7 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java @@ -203,7 +203,13 @@ public class RelJsonReader { /** {@inheritDoc} */ @Override public ImmutableBitSet getBitSet(String tag) { - return ImmutableBitSet.of(getIntegerList(tag)); + var list = getIntegerList(tag); + + if (list == null) { + return ImmutableBitSet.of(); + } + + return ImmutableBitSet.of(list); } /** {@inheritDoc} */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java index d224c4e50dd..23d502daacf 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java @@ -17,9 +17,7 @@ package org.apache.ignite.internal.sql.engine.message; -import org.apache.ignite.internal.network.annotations.Marshallable; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.sql.engine.exec.SharedState; import org.jetbrains.annotations.Nullable; /** @@ -34,6 +32,5 @@ public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage { int amountOfBatches(); /** Returns a state that has should be propagated to the target fragment. */ - @Marshallable - @Nullable SharedState sharedState(); + @Nullable SharedStateMessage sharedStateMessage(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SharedStateMessage.java similarity index 56% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SharedStateMessage.java index d224c4e50dd..2e72efb962c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SharedStateMessage.java @@ -17,23 +17,15 @@ package org.apache.ignite.internal.sql.engine.message; -import org.apache.ignite.internal.network.annotations.Marshallable; +import java.io.Serializable; +import java.util.Map; +import org.apache.ignite.internal.network.NetworkMessage; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.sql.engine.exec.SharedState; -import org.jetbrains.annotations.Nullable; /** - * A message to notify remote fragment (aka remote source) that more batches required to fulfil the result. + * A message that contains map with correlations. */ -@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST) -public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage { - /** Returns an identifier of the exchange to request batches from. */ - long exchangeId(); - - /** Returns amount of batches to request. */ - int amountOfBatches(); - - /** Returns a state that has should be propagated to the target fragment. */ - @Marshallable - @Nullable SharedState sharedState(); +@Transferable(SqlQueryMessageGroup.SHARED_STATE_MESSAGE) +public interface SharedStateMessage extends NetworkMessage, Serializable { + Map<Long, NetworkMessage> sharedState(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java index ea0d1f1509a..a9dc4484f8d 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.message; import static org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup.GROUP_TYPE; import org.apache.ignite.internal.network.annotations.MessageGroup; +import org.apache.ignite.internal.sql.engine.message.field.IntFieldMessage; /** * Message types for the sql query processing module. @@ -46,4 +47,39 @@ public final class SqlQueryMessageGroup { /** See {@link CancelOperationResponse} for the details. */ public static final short OPERATION_CANCEL_RESPONSE = 7; + + /** See {@link SharedStateMessage} for the details. */ + public static final short SHARED_STATE_MESSAGE = 8; + + public static final short BOOLEAN_FIELD_MESSAGE = 9; + + public static final short BYTE_FIELD_MESSAGE = 10; + + public static final short SHORT_FIELD_MESSAGE = 11; + + /** See {@link IntFieldMessage} for the details. */ + public static final short INT_FIELD_MESSAGE = 12; + + public static final short LONG_FIELD_MESSAGE = 13; + + public static final short FLOAT_FIELD_MESSAGE = 14; + + public static final short DOUBLE_FIELD_MESSAGE = 15; + + public static final short DECIMAL_FIELD_MESSAGE = 16; + + public static final short DATE_FIELD_MESSAGE = 17; + + public static final short TIME_FIELD_MESSAGE = 18; + + public static final short DATETIME_FIELD_MESSAGE = 19; + + public static final short TIMESTAMP_FIELD_MESSAGE = 20; + + public static final short UUID_FIELD_MESSAGE = 21; + + public static final short STRING_FIELD_MESSAGE = 22; + + // TODO + // public static final short BYTE_ARRAY_FIELD_MESSAGE = 22; } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/BooleanFieldMessage.java similarity index 52% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/BooleanFieldMessage.java index d224c4e50dd..8ba820c5635 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/BooleanFieldMessage.java @@ -15,25 +15,16 @@ * limitations under the License. */ -package org.apache.ignite.internal.sql.engine.message; +package org.apache.ignite.internal.sql.engine.message.field; -import org.apache.ignite.internal.network.annotations.Marshallable; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.sql.engine.exec.SharedState; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; /** - * A message to notify remote fragment (aka remote source) that more batches required to fulfil the result. + * A message that contains a single {@link Boolean} field. */ -@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST) -public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage { - /** Returns an identifier of the exchange to request batches from. */ - long exchangeId(); - - /** Returns amount of batches to request. */ - int amountOfBatches(); - - /** Returns a state that has should be propagated to the target fragment. */ - @Marshallable - @Nullable SharedState sharedState(); +@Transferable(SqlQueryMessageGroup.BOOLEAN_FIELD_MESSAGE) +public interface BooleanFieldMessage extends SingleFieldMessage<Boolean> { + @Override + Boolean field(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/ByteFieldMessage.java similarity index 52% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/ByteFieldMessage.java index d224c4e50dd..fd98d6333c8 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/ByteFieldMessage.java @@ -15,25 +15,16 @@ * limitations under the License. */ -package org.apache.ignite.internal.sql.engine.message; +package org.apache.ignite.internal.sql.engine.message.field; -import org.apache.ignite.internal.network.annotations.Marshallable; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.sql.engine.exec.SharedState; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; /** - * A message to notify remote fragment (aka remote source) that more batches required to fulfil the result. + * A message that contains a single {@link Byte} field. */ -@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST) -public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage { - /** Returns an identifier of the exchange to request batches from. */ - long exchangeId(); - - /** Returns amount of batches to request. */ - int amountOfBatches(); - - /** Returns a state that has should be propagated to the target fragment. */ - @Marshallable - @Nullable SharedState sharedState(); +@Transferable(SqlQueryMessageGroup.BYTE_FIELD_MESSAGE) +public interface ByteFieldMessage extends SingleFieldMessage<Byte> { + @Override + Byte field(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DateFieldMessage.java similarity index 52% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DateFieldMessage.java index d224c4e50dd..c3e08bed528 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DateFieldMessage.java @@ -15,25 +15,16 @@ * limitations under the License. */ -package org.apache.ignite.internal.sql.engine.message; +package org.apache.ignite.internal.sql.engine.message.field; -import org.apache.ignite.internal.network.annotations.Marshallable; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.sql.engine.exec.SharedState; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; /** - * A message to notify remote fragment (aka remote source) that more batches required to fulfil the result. + * A message that contains a {@link java.time.LocalDate} field as represented by the SQL engine. */ -@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST) -public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage { - /** Returns an identifier of the exchange to request batches from. */ - long exchangeId(); - - /** Returns amount of batches to request. */ - int amountOfBatches(); - - /** Returns a state that has should be propagated to the target fragment. */ - @Marshallable - @Nullable SharedState sharedState(); +@Transferable(SqlQueryMessageGroup.DATE_FIELD_MESSAGE) +public interface DateFieldMessage extends SingleFieldMessage<Integer> { + @Override + Integer field(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DateTimeFieldMessage.java similarity index 52% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DateTimeFieldMessage.java index d224c4e50dd..42e35971a61 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DateTimeFieldMessage.java @@ -15,25 +15,16 @@ * limitations under the License. */ -package org.apache.ignite.internal.sql.engine.message; +package org.apache.ignite.internal.sql.engine.message.field; -import org.apache.ignite.internal.network.annotations.Marshallable; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.sql.engine.exec.SharedState; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; /** - * A message to notify remote fragment (aka remote source) that more batches required to fulfil the result. + * A message that contains a {@link java.time.LocalDateTime} field as represented by the SQL engine. */ -@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST) -public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage { - /** Returns an identifier of the exchange to request batches from. */ - long exchangeId(); - - /** Returns amount of batches to request. */ - int amountOfBatches(); - - /** Returns a state that has should be propagated to the target fragment. */ - @Marshallable - @Nullable SharedState sharedState(); +@Transferable(SqlQueryMessageGroup.DATETIME_FIELD_MESSAGE) +public interface DateTimeFieldMessage extends SingleFieldMessage<Long> { + @Override + Long field(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DecimalFieldMessage.java similarity index 57% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DecimalFieldMessage.java index d224c4e50dd..141df674d6a 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DecimalFieldMessage.java @@ -15,25 +15,20 @@ * limitations under the License. */ -package org.apache.ignite.internal.sql.engine.message; +package org.apache.ignite.internal.sql.engine.message.field; +import java.math.BigDecimal; import org.apache.ignite.internal.network.annotations.Marshallable; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.sql.engine.exec.SharedState; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; /** - * A message to notify remote fragment (aka remote source) that more batches required to fulfil the result. + * A message that contains a single {@link BigDecimal} field. */ -@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST) -public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage { - /** Returns an identifier of the exchange to request batches from. */ - long exchangeId(); - - /** Returns amount of batches to request. */ - int amountOfBatches(); - - /** Returns a state that has should be propagated to the target fragment. */ +@Transferable(SqlQueryMessageGroup.DECIMAL_FIELD_MESSAGE) +public interface DecimalFieldMessage extends SingleFieldMessage<BigDecimal> { + @Override + // TODO replace with byte[] @Marshallable - @Nullable SharedState sharedState(); + BigDecimal field(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DoubleFieldMessage.java similarity index 52% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DoubleFieldMessage.java index d224c4e50dd..5436d1e1b0a 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DoubleFieldMessage.java @@ -15,25 +15,16 @@ * limitations under the License. */ -package org.apache.ignite.internal.sql.engine.message; +package org.apache.ignite.internal.sql.engine.message.field; -import org.apache.ignite.internal.network.annotations.Marshallable; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.sql.engine.exec.SharedState; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; /** - * A message to notify remote fragment (aka remote source) that more batches required to fulfil the result. + * A message that contains a single {@link Double} field. */ -@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST) -public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage { - /** Returns an identifier of the exchange to request batches from. */ - long exchangeId(); - - /** Returns amount of batches to request. */ - int amountOfBatches(); - - /** Returns a state that has should be propagated to the target fragment. */ - @Marshallable - @Nullable SharedState sharedState(); +@Transferable(SqlQueryMessageGroup.DOUBLE_FIELD_MESSAGE) +public interface DoubleFieldMessage extends SingleFieldMessage<Double> { + @Override + Double field(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/FloatFieldMessage.java similarity index 52% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/FloatFieldMessage.java index d224c4e50dd..d5c000e9572 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/FloatFieldMessage.java @@ -15,25 +15,16 @@ * limitations under the License. */ -package org.apache.ignite.internal.sql.engine.message; +package org.apache.ignite.internal.sql.engine.message.field; -import org.apache.ignite.internal.network.annotations.Marshallable; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.sql.engine.exec.SharedState; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; /** - * A message to notify remote fragment (aka remote source) that more batches required to fulfil the result. + * A message that contains a single {@link Float} field. */ -@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST) -public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage { - /** Returns an identifier of the exchange to request batches from. */ - long exchangeId(); - - /** Returns amount of batches to request. */ - int amountOfBatches(); - - /** Returns a state that has should be propagated to the target fragment. */ - @Marshallable - @Nullable SharedState sharedState(); +@Transferable(SqlQueryMessageGroup.FLOAT_FIELD_MESSAGE) +public interface FloatFieldMessage extends SingleFieldMessage<Float> { + @Override + Float field(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/IntFieldMessage.java similarity index 52% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/IntFieldMessage.java index d224c4e50dd..bba8f7c712f 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/IntFieldMessage.java @@ -15,25 +15,16 @@ * limitations under the License. */ -package org.apache.ignite.internal.sql.engine.message; +package org.apache.ignite.internal.sql.engine.message.field; -import org.apache.ignite.internal.network.annotations.Marshallable; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.sql.engine.exec.SharedState; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; /** - * A message to notify remote fragment (aka remote source) that more batches required to fulfil the result. + * A message that contains a single {@link Integer} field. */ -@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST) -public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage { - /** Returns an identifier of the exchange to request batches from. */ - long exchangeId(); - - /** Returns amount of batches to request. */ - int amountOfBatches(); - - /** Returns a state that has should be propagated to the target fragment. */ - @Marshallable - @Nullable SharedState sharedState(); +@Transferable(SqlQueryMessageGroup.INT_FIELD_MESSAGE) +public interface IntFieldMessage extends SingleFieldMessage<Integer> { + @Override + Integer field(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/LongFieldMessage.java similarity index 52% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/LongFieldMessage.java index d224c4e50dd..892b8ccc319 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/LongFieldMessage.java @@ -15,25 +15,16 @@ * limitations under the License. */ -package org.apache.ignite.internal.sql.engine.message; +package org.apache.ignite.internal.sql.engine.message.field; -import org.apache.ignite.internal.network.annotations.Marshallable; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.sql.engine.exec.SharedState; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; /** - * A message to notify remote fragment (aka remote source) that more batches required to fulfil the result. + * A message that contains a single {@link Long} field. */ -@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST) -public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage { - /** Returns an identifier of the exchange to request batches from. */ - long exchangeId(); - - /** Returns amount of batches to request. */ - int amountOfBatches(); - - /** Returns a state that has should be propagated to the target fragment. */ - @Marshallable - @Nullable SharedState sharedState(); +@Transferable(SqlQueryMessageGroup.LONG_FIELD_MESSAGE) +public interface LongFieldMessage extends SingleFieldMessage<Long> { + @Override + Long field(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/ShortFieldMessage.java similarity index 52% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/ShortFieldMessage.java index d224c4e50dd..def4abf41d9 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/ShortFieldMessage.java @@ -15,25 +15,16 @@ * limitations under the License. */ -package org.apache.ignite.internal.sql.engine.message; +package org.apache.ignite.internal.sql.engine.message.field; -import org.apache.ignite.internal.network.annotations.Marshallable; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.sql.engine.exec.SharedState; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; /** - * A message to notify remote fragment (aka remote source) that more batches required to fulfil the result. + * A message that contains a single {@link Short} field. */ -@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST) -public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage { - /** Returns an identifier of the exchange to request batches from. */ - long exchangeId(); - - /** Returns amount of batches to request. */ - int amountOfBatches(); - - /** Returns a state that has should be propagated to the target fragment. */ - @Marshallable - @Nullable SharedState sharedState(); +@Transferable(SqlQueryMessageGroup.SHORT_FIELD_MESSAGE) +public interface ShortFieldMessage extends SingleFieldMessage<Short> { + @Override + Short field(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/SingleFieldMessage.java similarity index 51% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/SingleFieldMessage.java index d224c4e50dd..6db5eb283d2 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/SingleFieldMessage.java @@ -15,25 +15,15 @@ * limitations under the License. */ -package org.apache.ignite.internal.sql.engine.message; +package org.apache.ignite.internal.sql.engine.message.field; -import org.apache.ignite.internal.network.annotations.Marshallable; -import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.sql.engine.exec.SharedState; +import java.io.Serializable; +import org.apache.ignite.internal.network.NetworkMessage; import org.jetbrains.annotations.Nullable; /** - * A message to notify remote fragment (aka remote source) that more batches required to fulfil the result. + * A message that contains a single field. */ -@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST) -public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage { - /** Returns an identifier of the exchange to request batches from. */ - long exchangeId(); - - /** Returns amount of batches to request. */ - int amountOfBatches(); - - /** Returns a state that has should be propagated to the target fragment. */ - @Marshallable - @Nullable SharedState sharedState(); +public interface SingleFieldMessage<T> extends NetworkMessage, Serializable { + @Nullable T field(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/StringFieldMessage.java similarity index 52% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/StringFieldMessage.java index d224c4e50dd..d3c20c2961a 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/StringFieldMessage.java @@ -15,25 +15,16 @@ * limitations under the License. */ -package org.apache.ignite.internal.sql.engine.message; +package org.apache.ignite.internal.sql.engine.message.field; -import org.apache.ignite.internal.network.annotations.Marshallable; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.sql.engine.exec.SharedState; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; /** - * A message to notify remote fragment (aka remote source) that more batches required to fulfil the result. + * A message that contains a single {@link String} field. */ -@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST) -public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage { - /** Returns an identifier of the exchange to request batches from. */ - long exchangeId(); - - /** Returns amount of batches to request. */ - int amountOfBatches(); - - /** Returns a state that has should be propagated to the target fragment. */ - @Marshallable - @Nullable SharedState sharedState(); +@Transferable(SqlQueryMessageGroup.STRING_FIELD_MESSAGE) +public interface StringFieldMessage extends SingleFieldMessage<String> { + @Override + String field(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/TimeFieldMessage.java similarity index 52% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/TimeFieldMessage.java index d224c4e50dd..a619a3471a1 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/TimeFieldMessage.java @@ -15,25 +15,16 @@ * limitations under the License. */ -package org.apache.ignite.internal.sql.engine.message; +package org.apache.ignite.internal.sql.engine.message.field; -import org.apache.ignite.internal.network.annotations.Marshallable; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.sql.engine.exec.SharedState; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; /** - * A message to notify remote fragment (aka remote source) that more batches required to fulfil the result. + * A message that contains a {@link java.time.LocalTime} field as represented by the SQL engine. */ -@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST) -public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage { - /** Returns an identifier of the exchange to request batches from. */ - long exchangeId(); - - /** Returns amount of batches to request. */ - int amountOfBatches(); - - /** Returns a state that has should be propagated to the target fragment. */ - @Marshallable - @Nullable SharedState sharedState(); +@Transferable(SqlQueryMessageGroup.TIME_FIELD_MESSAGE) +public interface TimeFieldMessage extends SingleFieldMessage<Integer> { + @Override + Integer field(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/TimestampFieldMessage.java similarity index 52% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/TimestampFieldMessage.java index d224c4e50dd..8b1c088e29c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/TimestampFieldMessage.java @@ -15,25 +15,16 @@ * limitations under the License. */ -package org.apache.ignite.internal.sql.engine.message; +package org.apache.ignite.internal.sql.engine.message.field; -import org.apache.ignite.internal.network.annotations.Marshallable; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.sql.engine.exec.SharedState; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; /** - * A message to notify remote fragment (aka remote source) that more batches required to fulfil the result. + * A message that contains a {@link java.time.Instant} field as represented by the SQL engine. */ -@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST) -public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage { - /** Returns an identifier of the exchange to request batches from. */ - long exchangeId(); - - /** Returns amount of batches to request. */ - int amountOfBatches(); - - /** Returns a state that has should be propagated to the target fragment. */ - @Marshallable - @Nullable SharedState sharedState(); +@Transferable(SqlQueryMessageGroup.TIMESTAMP_FIELD_MESSAGE) +public interface TimestampFieldMessage extends SingleFieldMessage<Long> { + @Override + Long field(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/UuidFieldMessage.java similarity index 52% copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java copy to modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/UuidFieldMessage.java index d224c4e50dd..e062d859e9c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/UuidFieldMessage.java @@ -15,25 +15,17 @@ * limitations under the License. */ -package org.apache.ignite.internal.sql.engine.message; +package org.apache.ignite.internal.sql.engine.message.field; -import org.apache.ignite.internal.network.annotations.Marshallable; +import java.util.UUID; import org.apache.ignite.internal.network.annotations.Transferable; -import org.apache.ignite.internal.sql.engine.exec.SharedState; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup; /** - * A message to notify remote fragment (aka remote source) that more batches required to fulfil the result. + * A message that contains a single {@link UUID} field. */ -@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST) -public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage { - /** Returns an identifier of the exchange to request batches from. */ - long exchangeId(); - - /** Returns amount of batches to request. */ - int amountOfBatches(); - - /** Returns a state that has should be propagated to the target fragment. */ - @Marshallable - @Nullable SharedState sharedState(); +@Transferable(SqlQueryMessageGroup.UUID_FIELD_MESSAGE) +public interface UuidFieldMessage extends SingleFieldMessage<UUID> { + @Override + UUID field(); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteCorrelatedNestedLoopJoin.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteCorrelatedNestedLoopJoin.java index 4a7246eba39..32591095333 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteCorrelatedNestedLoopJoin.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteCorrelatedNestedLoopJoin.java @@ -30,11 +30,13 @@ import org.apache.calcite.rel.RelCollation; import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelInput; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.core.CorrelationId; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; import org.apache.calcite.util.Pair; import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCost; import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory; @@ -51,6 +53,8 @@ import org.apache.ignite.internal.sql.engine.util.Commons; public class IgniteCorrelatedNestedLoopJoin extends AbstractIgniteJoin { private static final String REL_TYPE_NAME = "CorrelatedNestedLoopJoin"; + private final ImmutableBitSet requiredColumns; + /** * Creates a Join. * @@ -64,8 +68,10 @@ public class IgniteCorrelatedNestedLoopJoin extends AbstractIgniteJoin { * tree */ public IgniteCorrelatedNestedLoopJoin(RelOptCluster cluster, RelTraitSet traitSet, RelNode left, RelNode right, - RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType) { + RexNode condition, Set<CorrelationId> variablesSet, JoinRelType joinType, ImmutableBitSet requiredColumns) { super(cluster, traitSet, left, right, condition, variablesSet, joinType); + + this.requiredColumns = requiredColumns; } /** @@ -79,14 +85,15 @@ public class IgniteCorrelatedNestedLoopJoin extends AbstractIgniteJoin { input.getInputs().get(1), input.getExpression("condition"), Set.copyOf(Commons.transform(input.getIntegerList("variablesSet"), CorrelationId::new)), - input.getEnum("joinType", JoinRelType.class)); + input.getEnum("joinType", JoinRelType.class), + input.getBitSet("requiredColumns")); } /** {@inheritDoc} */ @Override public Join copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) { - return new IgniteCorrelatedNestedLoopJoin(getCluster(), traitSet, left, right, condition, variablesSet, joinType); + return new IgniteCorrelatedNestedLoopJoin(getCluster(), traitSet, left, right, condition, variablesSet, joinType, requiredColumns); } /** {@inheritDoc} */ @@ -148,6 +155,20 @@ public class IgniteCorrelatedNestedLoopJoin extends AbstractIgniteJoin { List.of(left.replace(RelCollations.EMPTY), right)); } + @Override public RelWriter explainTerms(RelWriter pw) { + // TODO use super.explainTerms(...) + return super.explainTerms(pw) + .item("condition", condition) + .item("joinType", joinType.lowerName) + // TODO do not write if empty + .item("requiredColumns", requiredColumns) + .itemIf("variablesSet", variablesSet, !variablesSet.isEmpty()) + .itemIf( + "systemFields", + getSystemFieldList(), + !getSystemFieldList().isEmpty()); + } + /** {@inheritDoc} */ @Override public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { @@ -175,7 +196,11 @@ public class IgniteCorrelatedNestedLoopJoin extends AbstractIgniteJoin { @Override public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) { return new IgniteCorrelatedNestedLoopJoin(cluster, getTraitSet(), inputs.get(0), inputs.get(1), getCondition(), - getVariablesSet(), getJoinType()); + getVariablesSet(), getJoinType(), getRequiredColumns()); + } + + public ImmutableBitSet getRequiredColumns() { + return requiredColumns; } /** {@inheritDoc} */ diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelateToNestedLoopRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelateToNestedLoopRule.java index 004185cab65..345926905d1 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelateToNestedLoopRule.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelateToNestedLoopRule.java @@ -73,7 +73,8 @@ public class CorrelateToNestedLoopRule extends AbstractIgniteConverterRule<Logic right, cluster.getRexBuilder().makeLiteral(true), correlationIds, - rel.getJoinType() + rel.getJoinType(), + rel.getRequiredColumns() ); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java index af9519ad1c9..573f7f08065 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java @@ -39,6 +39,7 @@ import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexShuttle; import org.apache.calcite.tools.RelBuilder; +import org.apache.calcite.util.ImmutableBitSet; import org.apache.ignite.internal.sql.engine.rel.IgniteConvention; import org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin; @@ -132,7 +133,8 @@ public class CorrelatedNestedLoopJoinRule extends AbstractIgniteConverterRule<Lo right, rel.getCondition(), correlationIds, - joinType + joinType, + ImmutableBitSet.of() ); } diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java index b85dffbdb2a..8b1354f3bc7 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java @@ -50,8 +50,8 @@ public enum IgniteMethod { /** See {@link SqlEvaluationContext#rowAccessor()}. */ CONTEXT_ROW_HANDLER(SqlEvaluationContext.class, "rowAccessor"), - /** See {@link SqlEvaluationContext#correlatedVariable(int)}. */ - CONTEXT_GET_CORRELATED_VALUE(SqlEvaluationContext.class, "correlatedVariable", int.class), + /** See {@link SqlEvaluationContext#correlatedVariable(int, int)}. */ + CONTEXT_GET_CORRELATED_VALUE(SqlEvaluationContext.class, "correlatedVariable", int.class, int.class), /** See {@link IgniteSqlDateTimeUtils#subtractTimeZoneOffset(long, TimeZone)}. **/ SUBTRACT_TIMEZONE_OFFSET(IgniteSqlDateTimeUtils.class, "subtractTimeZoneOffset", long.class, TimeZone.class), diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java index bae4ea2971a..e9019087d79 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java @@ -35,6 +35,7 @@ import java.util.stream.StreamSupport; import org.apache.calcite.rel.core.CorrelationId; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.util.ImmutableBitSet; import org.apache.ignite.internal.sql.engine.api.expressions.RowFactoryFactory; import org.apache.ignite.internal.sql.engine.exec.ExecutionContext; import org.apache.ignite.internal.sql.engine.exec.RowHandler; @@ -44,6 +45,7 @@ import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory; import org.apache.ignite.internal.sql.engine.util.Commons; import org.apache.ignite.internal.sql.engine.util.TypeUtils; import org.apache.ignite.internal.type.NativeTypes; +import org.apache.ignite.internal.type.StructNativeType; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -75,6 +77,11 @@ public class CorrelatedNestedLoopJoinExecutionTest extends AbstractExecutionTest IgniteTypeFactory tf = ctx.getTypeFactory(); RelDataType rightType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING)); + StructNativeType leftSchema = NativeTypes.structBuilder() + .addField("C0", NativeTypes.INT32, true) + .addField("C1", NativeTypes.STRING, true) + .addField("C2", NativeTypes.INT32, true) + .build(); CorrelatedNestedLoopJoinNode<Object[]> join = new CorrelatedNestedLoopJoinNode<>( ctx, @@ -82,7 +89,10 @@ public class CorrelatedNestedLoopJoinExecutionTest extends AbstractExecutionTest Set.of(new CorrelationId(0)), joinType, ctx.rowFactoryFactory().create(convertStructuredType(rightType)), - identityProjection() + identityProjection(), + // TODO + leftSchema, + ImmutableBitSet.of() ); join.register(Arrays.asList(left, right)); @@ -229,6 +239,11 @@ public class CorrelatedNestedLoopJoinExecutionTest extends AbstractExecutionTest JoinRelType joinType) { IgniteTypeFactory tf = ctx.getTypeFactory(); RelDataType rightType = TypeUtils.createRowType(tf, TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING)); + StructNativeType leftSchema = NativeTypes.structBuilder() + .addField("C0", NativeTypes.INT32, true) + .addField("C1", NativeTypes.STRING, true) + .addField("C2", NativeTypes.INT32, true) + .build(); RowHandler<Object[]> hnd = ctx.rowAccessor(); @@ -238,7 +253,9 @@ public class CorrelatedNestedLoopJoinExecutionTest extends AbstractExecutionTest Set.of(new CorrelationId(0)), joinType, ctx.rowFactoryFactory().create(convertStructuredType(rightType)), - identityProjection() + identityProjection(), + leftSchema, + ImmutableBitSet.of() ); }
