This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch ignite-24995
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 3e3355a66754d0399c909d5f069c8cf5ca3e9de5
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Thu Jan 22 19:29:18 2026 +0300

    IGNITE-24995 Optimize correaltes serialization when propagating to another 
node
---
 .../internal/sql/engine/ItCorrelatesTest.java      |  11 +-
 .../sql/engine/exec/ExchangeServiceImpl.java       |   9 +-
 .../internal/sql/engine/exec/ExecutionContext.java |  13 +-
 .../sql/engine/exec/LogicalRelImplementor.java     |   3 +-
 .../internal/sql/engine/exec/SharedState.java      |  57 +++++--
 .../engine/exec/SharedStateMessageConverter.java   | 165 +++++++++++++++++++++
 .../sql/engine/exec/SqlEvaluationContext.java      |   2 +-
 ...atesBuilder.java => CorrelatedFieldGetter.java} |  57 +++----
 .../sql/engine/exec/exp/CorrelatesBuilder.java     |  12 +-
 .../exec/rel/CorrelatedNestedLoopJoinNode.java     |  25 +++-
 .../expressions/SqlExpressionFactoryAdapter.java   |   2 +-
 .../sql/engine/externalize/RelJsonReader.java      |   8 +-
 .../engine/message/QueryBatchRequestMessage.java   |   5 +-
 ...RequestMessage.java => SharedStateMessage.java} |  22 +--
 .../sql/engine/message/SqlQueryMessageGroup.java   |  36 +++++
 .../BooleanFieldMessage.java}                      |  23 +--
 .../ByteFieldMessage.java}                         |  23 +--
 .../DateFieldMessage.java}                         |  23 +--
 .../DateTimeFieldMessage.java}                     |  23 +--
 .../DecimalFieldMessage.java}                      |  23 ++-
 .../DoubleFieldMessage.java}                       |  23 +--
 .../FloatFieldMessage.java}                        |  23 +--
 .../IntFieldMessage.java}                          |  23 +--
 .../LongFieldMessage.java}                         |  23 +--
 .../ShortFieldMessage.java}                        |  23 +--
 .../SingleFieldMessage.java}                       |  22 +--
 .../StringFieldMessage.java}                       |  23 +--
 .../TimeFieldMessage.java}                         |  23 +--
 .../TimestampFieldMessage.java}                    |  23 +--
 .../UuidFieldMessage.java}                         |  24 +--
 .../engine/rel/IgniteCorrelatedNestedLoopJoin.java |  33 ++++-
 .../sql/engine/rule/CorrelateToNestedLoopRule.java |   3 +-
 .../engine/rule/CorrelatedNestedLoopJoinRule.java  |   4 +-
 .../internal/sql/engine/util/IgniteMethod.java     |   4 +-
 .../rel/CorrelatedNestedLoopJoinExecutionTest.java |  21 ++-
 35 files changed, 492 insertions(+), 345 deletions(-)

diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
index 64d68ec529e..9cec7b96b35 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItCorrelatesTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine;
 
 import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsSubPlan;
 
+import java.time.LocalTime;
 import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
@@ -112,17 +113,17 @@ public class ItCorrelatesTest extends 
BaseSqlIntegrationTest {
      */
     @Test
     public void testCorrelatesCollisionRight() {
-        sql("CREATE TABLE test1 (a INTEGER PRIMARY KEY, b INTEGER)");
-        sql("CREATE TABLE test2 (a INTEGER PRIMARY KEY, c INTEGER)");
+        sql("CREATE TABLE test1 (a TIME PRIMARY KEY, b INTEGER)");
+        sql("CREATE TABLE test2 (a TIME PRIMARY KEY, c INTEGER)");
 
-        sql("INSERT INTO test1 VALUES (11, 1), (12, 2), (13, 3)");
-        sql("INSERT INTO test2 VALUES (11, 1), (12, 1), (13, 4)");
+        sql("INSERT INTO test1 VALUES (TIME '11:00:00', 1), (TIME '12:00:00', 
2), (TIME '13:00:00', 3)");
+        sql("INSERT INTO test2 VALUES (TIME '11:00:00', 1), (TIME '12:00:00', 
1), (TIME '13:00:00', 4)");
 
         // Collision by correlate variables in both, left and right hands.
         assertQuery("SELECT /*+ disable_decorrelation */ * FROM test1 WHERE "
                 + "EXISTS(SELECT * FROM test2 WHERE (SELECT test1.a)=test2.a 
AND (SELECT test1.b)<>test2.c) "
                 + "AND NOT EXISTS(SELECT * FROM test2 WHERE (SELECT 
test1.a)=test2.a AND (SELECT test1.b)<test2.c)")
-                .returns(12, 2)
+                .returns(LocalTime.of(12, 0, 0), 2)
                 .check();
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
index 674b8e37f16..2dc7f7fc16d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
 import org.apache.ignite.internal.sql.engine.message.MessageService;
 import org.apache.ignite.internal.sql.engine.message.QueryBatchMessage;
 import org.apache.ignite.internal.sql.engine.message.QueryBatchRequestMessage;
+import org.apache.ignite.internal.sql.engine.message.SharedStateMessage;
 import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
 import org.apache.ignite.internal.util.ExceptionUtils;
@@ -112,7 +113,7 @@ public class ExchangeServiceImpl implements ExchangeService 
{
                         .fragmentId(fragmentId)
                         .exchangeId(exchangeId)
                         .amountOfBatches(amountOfBatches)
-                        .sharedState(state)
+                        
.sharedStateMessage(SharedStateMessageConverter.toMessage(state))
                         .build()
         );
     }
@@ -153,8 +154,10 @@ public class ExchangeServiceImpl implements 
ExchangeService {
 
         Consumer<Outbox<?>> onRequestHandler = outbox -> {
             try {
-                SharedState state = msg.sharedState();
-                if (state != null) {
+                SharedStateMessage sharedStateMessage = 
msg.sharedStateMessage();
+                if (sharedStateMessage != null) {
+                    SharedState state = 
SharedStateMessageConverter.fromMessage(sharedStateMessage);
+
                     outbox.onRewindRequest(node.name(), state, 
msg.amountOfBatches());
                 } else {
                     outbox.onRequest(node.name(), msg.amountOfBatches());
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index 5716c1fbaa4..603f74b57ee 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.IgniteCheckedException;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.ColumnType;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -359,18 +360,20 @@ public class ExecutionContext<RowT> implements 
SqlEvaluationContext<RowT> {
     }
 
     @Override
-    public RowT correlatedVariable(int id) {
-        return cast(sharedState.correlatedVariable(id));
+    public RowT correlatedVariable(int id, int idx) {
+        return cast(sharedState.correlatedVariable(id, idx));
     }
 
     /**
      * Sets correlated value.
      *
-     * @param id Correlation ID.
+     * @param corrId Correlation ID.
+     * @param fieldIdx Field index.
+     * @param type Value type.
      * @param value Correlated value.
      */
-    public void correlatedVariable(Object value, int id) {
-        sharedState.correlatedVariable(id, value);
+    public void correlatedVariable(int corrId, int fieldIdx, ColumnType type, 
Object value) {
+        sharedState.correlatedVariable(corrId, fieldIdx, type, value);
     }
 
     /**
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index dfd117016ca..3d5144074ba 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -370,7 +370,8 @@ public class LogicalRelImplementor<RowT> implements 
IgniteRelVisitor<Node<RowT>>
         RowFactory<RowT> rightRowFactory = 
ctx.rowFactoryFactory().create(convertStructuredType(rightType));
 
         Node<RowT> node = new CorrelatedNestedLoopJoinNode<>(ctx, cond, 
rel.getVariablesSet(),
-                rel.getJoinType(), rightRowFactory, joinProjection);
+                rel.getJoinType(), rightRowFactory, joinProjection, 
convertStructuredType(leftType),
+                rel.getRequiredColumns());
 
         Node<RowT> leftInput = visit(rel.getLeft());
         Node<RowT> rightInput = visit(rel.getRight());
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java
index ccb3dbd8ff7..373066afac0 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java
@@ -17,41 +17,68 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
-import static org.apache.ignite.internal.sql.engine.util.Commons.checkRange;
-
-import java.io.Serializable;
-import org.apache.ignite.internal.sql.engine.util.Commons;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import org.apache.ignite.sql.ColumnType;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * This class represents the volatile state that may be propagated from parent 
to its children
  * during rewind.
  */
-public class SharedState implements Serializable {
-    private static final long serialVersionUID = 42L;
+public class SharedState {
+    private final Long2ObjectMap<ValueWithType> correlations;
+
+    public SharedState() {
+        this(new Long2ObjectOpenHashMap<>());
+    }
 
-    private Object[] correlations = new Object[16];
+    SharedState(Long2ObjectMap<ValueWithType> correlations) {
+        this.correlations = correlations;
+    }
 
     /**
      * Gets correlated value.
      *
-     * @param id Correlation ID.
+     * @param corrId Correlation ID.
      * @return Correlated value.
      */
-    public Object correlatedVariable(int id) {
-        checkRange(correlations, id);
+    public Object correlatedVariable(int corrId, int fieldIdx) {
+        long key = packToLong(corrId, fieldIdx);
 
-        return correlations[id];
+        return correlations.get(key).value;
     }
 
     /**
      * Sets correlated value.
      *
-     * @param id Correlation ID.
+     * @param corrId Correlation ID.
+     * @param fieldIdx Field index.
+     * @param type Value type.
      * @param value Correlated value.
      */
-    public void correlatedVariable(int id, Object value) {
-        correlations = Commons.ensureCapacity(correlations, id + 1);
+    public void correlatedVariable(int corrId, int fieldIdx, ColumnType type, 
Object value) {
+        long key = packToLong(corrId, fieldIdx);
+
+        correlations.put(key, new ValueWithType(type, value));
+    }
+
+    Long2ObjectMap<ValueWithType> correlations() {
+        return Long2ObjectMaps.unmodifiable(correlations);
+    }
+
+    private static long packToLong(int corrId, int fieldIdx) {
+        return ((((long) corrId) << 32 | fieldIdx));
+    }
+
+    static class ValueWithType {
+        final @Nullable ColumnType type;
+        final Object value;
 
-        correlations[id] = value;
+        ValueWithType(@Nullable ColumnType type, Object value) {
+            this.type = type;
+            this.value = value;
+        }
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverter.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverter.java
new file mode 100644
index 00000000000..781ef9d4760
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverter.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.math.BigDecimal;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.sql.engine.exec.SharedState.ValueWithType;
+import org.apache.ignite.internal.sql.engine.message.SharedStateMessage;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
+import org.apache.ignite.internal.sql.engine.message.field.SingleFieldMessage;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.sql.ColumnType;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converter between {@link SharedState} and {@link SharedStateMessage}.
+ */
+public class SharedStateMessageConverter {
+    /** Message factory. */
+    private static final SqlQueryMessagesFactory MESSAGE_FACTORY = new 
SqlQueryMessagesFactory();
+
+    static @Nullable SharedStateMessage toMessage(@Nullable SharedState state) 
{
+        if (state == null) {
+            return null;
+        }
+
+        Long2ObjectMap<ValueWithType> correlations = state.correlations();
+        Map<Long, NetworkMessage> result = 
IgniteUtils.newHashMap(correlations.size());
+
+        for (Long2ObjectMap.Entry<ValueWithType> entry : 
correlations.long2ObjectEntrySet()) {
+            ValueWithType valueWithType = entry.getValue();
+            ColumnType type = valueWithType.type;
+
+            assert type != null;
+
+            SingleFieldMessage<?> msg = toSingleFieldMessage(type, 
valueWithType.value);
+
+            result.put(entry.getLongKey(), msg);
+        }
+
+        return MESSAGE_FACTORY.sharedStateMessage()
+                .sharedState(result)
+                .build();
+    }
+
+    static SharedState fromMessage(SharedStateMessage sharedStateMessage) {
+        int size = sharedStateMessage.sharedState().size();
+        Long2ObjectMap<ValueWithType> correlations = new 
Long2ObjectOpenHashMap<>(size);
+
+        for (Map.Entry<Long, NetworkMessage> e : 
sharedStateMessage.sharedState().entrySet()) {
+            SingleFieldMessage<Object> msg = ((SingleFieldMessage<Object>) 
e.getValue());
+
+            ColumnType type = columnTypeByMessage(msg);
+
+            correlations.put(e.getKey().longValue(), new ValueWithType(type, 
msg.field()));
+        }
+
+        return new SharedState(correlations);
+    }
+
+    private static ColumnType columnTypeByMessage(SingleFieldMessage<?> 
message) {
+        switch (message.messageType()) {
+            case SqlQueryMessageGroup.BOOLEAN_FIELD_MESSAGE:
+                return ColumnType.BOOLEAN;
+
+            case SqlQueryMessageGroup.BYTE_FIELD_MESSAGE:
+                return ColumnType.INT8;
+
+            case SqlQueryMessageGroup.SHORT_FIELD_MESSAGE:
+                return ColumnType.INT16;
+
+            case SqlQueryMessageGroup.INT_FIELD_MESSAGE:
+                return ColumnType.INT32;
+
+            case SqlQueryMessageGroup.LONG_FIELD_MESSAGE:
+                return ColumnType.INT64;
+
+            case SqlQueryMessageGroup.FLOAT_FIELD_MESSAGE:
+                return ColumnType.FLOAT;
+
+            case SqlQueryMessageGroup.DOUBLE_FIELD_MESSAGE:
+                return ColumnType.DOUBLE;
+
+            case SqlQueryMessageGroup.DECIMAL_FIELD_MESSAGE:
+                return ColumnType.DECIMAL;
+
+            case SqlQueryMessageGroup.DATE_FIELD_MESSAGE:
+                return ColumnType.DATE;
+
+            case SqlQueryMessageGroup.TIME_FIELD_MESSAGE:
+                return ColumnType.TIME;
+
+            case SqlQueryMessageGroup.DATETIME_FIELD_MESSAGE:
+                return ColumnType.DATETIME;
+
+            case SqlQueryMessageGroup.TIMESTAMP_FIELD_MESSAGE:
+                return ColumnType.TIMESTAMP;
+
+            case SqlQueryMessageGroup.UUID_FIELD_MESSAGE:
+                return ColumnType.UUID;
+
+            case SqlQueryMessageGroup.STRING_FIELD_MESSAGE:
+                return ColumnType.STRING;
+
+            default:
+                throw new IllegalArgumentException("Unsupported message type: 
" + message.messageType());
+        }
+    }
+
+    private static SingleFieldMessage<?> toSingleFieldMessage(ColumnType type, 
Object value) {
+        switch (type) {
+            case BOOLEAN:
+                return MESSAGE_FACTORY.booleanFieldMessage().field((Boolean) 
value).build();
+            case INT8:
+                return MESSAGE_FACTORY.byteFieldMessage().field((Byte) 
value).build();
+            case INT16:
+                return MESSAGE_FACTORY.shortFieldMessage().field((Short) 
value).build();
+            case INT32:
+                return MESSAGE_FACTORY.intFieldMessage().field((Integer) 
value).build();
+            case INT64:
+                return MESSAGE_FACTORY.longFieldMessage().field((Long) 
value).build();
+            case FLOAT:
+                return MESSAGE_FACTORY.floatFieldMessage().field((Float) 
value).build();
+            case DOUBLE:
+                return MESSAGE_FACTORY.doubleFieldMessage().field((Double) 
value).build();
+            case DECIMAL:
+                return 
MESSAGE_FACTORY.decimalFieldMessage().field((BigDecimal) value).build();
+            case DATE:
+                return MESSAGE_FACTORY.dateFieldMessage().field((Integer) 
value).build();
+            case TIME:
+                return MESSAGE_FACTORY.timeFieldMessage().field((Integer) 
value).build();
+            case DATETIME:
+                return MESSAGE_FACTORY.dateTimeFieldMessage().field((Long) 
value).build();
+            case TIMESTAMP:
+                return MESSAGE_FACTORY.timestampFieldMessage().field((Long) 
value).build();
+            case UUID:
+                return MESSAGE_FACTORY.uuidFieldMessage().field((UUID) 
value).build();
+            case STRING:
+                return MESSAGE_FACTORY.stringFieldMessage().field((String) 
value).build();
+
+            default:
+                throw new IllegalArgumentException("Unsupported type: " + 
type);
+        }
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java
index bd7aa510c17..5ad8c063359 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java
@@ -39,5 +39,5 @@ public interface SqlEvaluationContext<RowT> extends 
DataContext {
     RowFactoryFactory<RowT> rowFactoryFactory();
 
     /** Returns row representing correlation source by given correlation id. */
-    RowT correlatedVariable(int id);
+    RowT correlatedVariable(int id, int idx);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatedFieldGetter.java
similarity index 56%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatedFieldGetter.java
index 0db383a5e92..cb6f59236d2 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatedFieldGetter.java
@@ -17,60 +17,45 @@
 
 package org.apache.ignite.internal.sql.engine.exec.exp;
 
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.calcite.linq4j.function.Function1;
+import java.lang.reflect.Type;
+import org.apache.calcite.adapter.enumerable.EnumUtils;
 import org.apache.calcite.linq4j.tree.BlockBuilder;
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.linq4j.tree.Primitive;
 import org.apache.calcite.rex.RexCorrelVariable;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexShuttle;
 import 
org.apache.ignite.internal.sql.engine.exec.exp.RexToLixTranslator.InputGetter;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.IgniteMethod;
+import org.jetbrains.annotations.Nullable;
 
-class CorrelatesBuilder extends RexShuttle {
-    private final BlockBuilder builder;
-
+class CorrelatedFieldGetter implements InputGetter {
     private final Expression ctx;
+    private final RexCorrelVariable variable;
 
-    private final Expression hnd;
-
-    private Map<String, FieldGetter> correlates;
-
-    CorrelatesBuilder(BlockBuilder builder, Expression ctx, Expression hnd) {
-        this.builder = builder;
-        this.hnd = hnd;
+    CorrelatedFieldGetter(Expression ctx, RexCorrelVariable variable) {
         this.ctx = ctx;
-    }
-
-    public Function1<String, InputGetter> build(Iterable<RexNode> nodes) {
-        try {
-            for (RexNode node : nodes) {
-                if (node != null) {
-                    node.accept(this);
-                }
-            }
-
-            return correlates == null ? null : correlates::get;
-        } finally {
-            correlates = null;
-        }
+        this.variable = variable;
     }
 
     /** {@inheritDoc} */
     @Override
-    public RexNode visitCorrelVariable(RexCorrelVariable variable) {
-        Expression corr = builder.append("corr",
+    public Expression field(BlockBuilder builder, int index, @Nullable Type 
storageType) {
+        Expression expr = builder.append("corr",
                 Expressions.call(ctx, 
IgniteMethod.CONTEXT_GET_CORRELATED_VALUE.method(),
-                        Expressions.constant(variable.id.getId())));
+                        Expressions.constant(variable.id.getId()), 
Expressions.constant(index)));
+
+        // TODO Remove code duplication (see CommonFieldGetter).
+        Type fieldType = 
Commons.typeFactory().getJavaClass(variable.getType().getFieldList().get(index).getType());
 
-        if (correlates == null) {
-            correlates = new HashMap<>();
+        Primitive p = Primitive.of(fieldType);
+        if (p == null) {
+            // In case of non-primitive types we can simply do casting like 
this: (RequiredType) fieldValue.
+            return Expressions.convert_(expr, fieldType);
         }
 
-        correlates.put(variable.getName(), new FieldGetter(hnd, corr, 
variable.getType()));
+        expr = Expressions.convert_(expr, p.getBoxClass());
 
-        return variable;
+        return EnumUtils.convert(expr, p.getBoxClass(), fieldType);
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java
index 0db383a5e92..2efef537c1c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java
@@ -22,12 +22,10 @@ import java.util.Map;
 import org.apache.calcite.linq4j.function.Function1;
 import org.apache.calcite.linq4j.tree.BlockBuilder;
 import org.apache.calcite.linq4j.tree.Expression;
-import org.apache.calcite.linq4j.tree.Expressions;
 import org.apache.calcite.rex.RexCorrelVariable;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexShuttle;
 import 
org.apache.ignite.internal.sql.engine.exec.exp.RexToLixTranslator.InputGetter;
-import org.apache.ignite.internal.sql.engine.util.IgniteMethod;
 
 class CorrelatesBuilder extends RexShuttle {
     private final BlockBuilder builder;
@@ -36,7 +34,7 @@ class CorrelatesBuilder extends RexShuttle {
 
     private final Expression hnd;
 
-    private Map<String, FieldGetter> correlates;
+    private Map<String, InputGetter> correlates;
 
     CorrelatesBuilder(BlockBuilder builder, Expression ctx, Expression hnd) {
         this.builder = builder;
@@ -61,15 +59,13 @@ class CorrelatesBuilder extends RexShuttle {
     /** {@inheritDoc} */
     @Override
     public RexNode visitCorrelVariable(RexCorrelVariable variable) {
-        Expression corr = builder.append("corr",
-                Expressions.call(ctx, 
IgniteMethod.CONTEXT_GET_CORRELATED_VALUE.method(),
-                        Expressions.constant(variable.id.getId())));
-
         if (correlates == null) {
             correlates = new HashMap<>();
         }
 
-        correlates.put(variable.getName(), new FieldGetter(hnd, corr, 
variable.getType()));
+        InputGetter inputGetter = new CorrelatedFieldGetter(ctx, variable);
+
+        correlates.put(variable.getName(), inputGetter);
 
         return variable;
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
index ecc5bbabe81..b9ee5c37d56 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
@@ -28,10 +28,13 @@ import java.util.Set;
 import java.util.function.BiPredicate;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.lang.IgniteStringBuilder;
 import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.exp.SqlJoinProjection;
+import org.apache.ignite.internal.type.StructNativeType;
+import org.apache.ignite.sql.ColumnType;
 
 /**
  * CorrelatedNestedLoopJoinNode.
@@ -54,6 +57,10 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends 
AbstractNode<RowT> {
 
     private final RowT rightEmptyRow;
 
+    private final StructNativeType leftSchema;
+
+    private final ImmutableBitSet requiredColumns;
+
     private int requested;
 
     private int waitingLeft;
@@ -83,6 +90,8 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends 
AbstractNode<RowT> {
      * @param joinType Join rel type.
      * @param rightRowFactory Right row factory.
      * @param joinProjection Output row factory.
+     * @param leftSchemaType Left type.
+     * @param requiredColumns TODO Required columns.
      */
     public CorrelatedNestedLoopJoinNode(
             ExecutionContext<RowT> ctx,
@@ -90,7 +99,9 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends 
AbstractNode<RowT> {
             Set<CorrelationId> correlationIds,
             JoinRelType joinType,
             RowFactory<RowT> rightRowFactory,
-            SqlJoinProjection joinProjection
+            SqlJoinProjection joinProjection,
+            StructNativeType leftSchemaType,
+            ImmutableBitSet requiredColumns
     ) {
         super(ctx);
 
@@ -101,6 +112,8 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends 
AbstractNode<RowT> {
         this.correlationIds = new ArrayList<>(correlationIds);
         this.joinType = joinType;
         this.joinProjection = joinProjection;
+        this.leftSchema = leftSchemaType;
+        this.requiredColumns = requiredColumns;
 
         leftInBufferSize = correlationIds.size();
         rightInBufferSize = inBufSize;
@@ -485,7 +498,15 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends 
AbstractNode<RowT> {
     private void prepareCorrelations() {
         for (int i = 0; i < correlationIds.size(); i++) {
             RowT row = i < leftInBuf.size() ? leftInBuf.get(i) : 
first(leftInBuf);
-            context().correlatedVariable(row, correlationIds.get(i).getId());
+            int corrId = correlationIds.get(i).getId();
+
+            for (int idx = requiredColumns.nextSetBit(0); idx != -1;
+                    idx = requiredColumns.nextSetBit(idx + 1)) {
+                Object value = context().rowAccessor().get(idx, row);
+                ColumnType type = leftSchema.fields().get(idx).type().spec();
+
+                context().correlatedVariable(corrId, idx, type, value);
+            }
         }
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/expressions/SqlExpressionFactoryAdapter.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/expressions/SqlExpressionFactoryAdapter.java
index be3376377d7..6884abfdb1b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/expressions/SqlExpressionFactoryAdapter.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/expressions/SqlExpressionFactoryAdapter.java
@@ -303,7 +303,7 @@ public class SqlExpressionFactoryAdapter implements 
ExpressionFactory {
         }
 
         @Override
-        public @Nullable RowT correlatedVariable(int id) {
+        public @Nullable RowT correlatedVariable(int id, int idx) {
             return null;
         }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
index 458e7fdb0eb..171c8772ee7 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
@@ -203,7 +203,13 @@ public class RelJsonReader {
         /** {@inheritDoc} */
         @Override
         public ImmutableBitSet getBitSet(String tag) {
-            return ImmutableBitSet.of(getIntegerList(tag));
+            var list = getIntegerList(tag);
+
+            if (list == null) {
+                return ImmutableBitSet.of();
+            }
+
+            return ImmutableBitSet.of(list);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
index d224c4e50dd..23d502daacf 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
@@ -17,9 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.message;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -34,6 +32,5 @@ public interface QueryBatchRequestMessage extends 
ExecutionContextAwareMessage {
     int amountOfBatches();
 
     /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+    @Nullable SharedStateMessage sharedStateMessage();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SharedStateMessage.java
similarity index 56%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SharedStateMessage.java
index d224c4e50dd..2e72efb962c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SharedStateMessage.java
@@ -17,23 +17,15 @@
 
 package org.apache.ignite.internal.sql.engine.message;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains map with correlations.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.SHARED_STATE_MESSAGE)
+public interface SharedStateMessage extends NetworkMessage, Serializable {
+    Map<Long, NetworkMessage> sharedState();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
index ea0d1f1509a..a9dc4484f8d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.sql.engine.message;
 import static 
org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup.GROUP_TYPE;
 
 import org.apache.ignite.internal.network.annotations.MessageGroup;
+import org.apache.ignite.internal.sql.engine.message.field.IntFieldMessage;
 
 /**
  * Message types for the sql query processing module.
@@ -46,4 +47,39 @@ public final class SqlQueryMessageGroup {
 
     /** See {@link CancelOperationResponse} for the details. */
     public static final short OPERATION_CANCEL_RESPONSE = 7;
+
+    /** See {@link SharedStateMessage} for the details. */
+    public static final short SHARED_STATE_MESSAGE = 8;
+
+    public static final short BOOLEAN_FIELD_MESSAGE = 9;
+
+    public static final short BYTE_FIELD_MESSAGE = 10;
+
+    public static final short SHORT_FIELD_MESSAGE = 11;
+
+    /** See {@link IntFieldMessage} for the details. */
+    public static final short INT_FIELD_MESSAGE = 12;
+
+    public static final short LONG_FIELD_MESSAGE = 13;
+
+    public static final short FLOAT_FIELD_MESSAGE = 14;
+
+    public static final short DOUBLE_FIELD_MESSAGE = 15;
+
+    public static final short DECIMAL_FIELD_MESSAGE = 16;
+
+    public static final short DATE_FIELD_MESSAGE = 17;
+
+    public static final short TIME_FIELD_MESSAGE = 18;
+
+    public static final short DATETIME_FIELD_MESSAGE = 19;
+
+    public static final short TIMESTAMP_FIELD_MESSAGE = 20;
+
+    public static final short UUID_FIELD_MESSAGE = 21;
+
+    public static final short STRING_FIELD_MESSAGE = 22;
+
+    // TODO
+    // public static final short BYTE_ARRAY_FIELD_MESSAGE = 22;
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/BooleanFieldMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/BooleanFieldMessage.java
index d224c4e50dd..8ba820c5635 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/BooleanFieldMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.sql.engine.message.field;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link Boolean} field.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.BOOLEAN_FIELD_MESSAGE)
+public interface BooleanFieldMessage extends SingleFieldMessage<Boolean> {
+    @Override
+    Boolean field();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/ByteFieldMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/ByteFieldMessage.java
index d224c4e50dd..fd98d6333c8 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/ByteFieldMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.sql.engine.message.field;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link Byte} field.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.BYTE_FIELD_MESSAGE)
+public interface ByteFieldMessage extends SingleFieldMessage<Byte> {
+    @Override
+    Byte field();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DateFieldMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DateFieldMessage.java
index d224c4e50dd..c3e08bed528 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DateFieldMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.sql.engine.message.field;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a {@link java.time.LocalDate} field as represented 
by the SQL engine.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.DATE_FIELD_MESSAGE)
+public interface DateFieldMessage extends SingleFieldMessage<Integer> {
+    @Override
+    Integer field();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DateTimeFieldMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DateTimeFieldMessage.java
index d224c4e50dd..42e35971a61 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DateTimeFieldMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.sql.engine.message.field;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a {@link java.time.LocalDateTime} field as 
represented by the SQL engine.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.DATETIME_FIELD_MESSAGE)
+public interface DateTimeFieldMessage extends SingleFieldMessage<Long> {
+    @Override
+    Long field();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DecimalFieldMessage.java
similarity index 57%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DecimalFieldMessage.java
index d224c4e50dd..141df674d6a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DecimalFieldMessage.java
@@ -15,25 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.sql.engine.message.field;
 
+import java.math.BigDecimal;
 import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link BigDecimal} field.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
+@Transferable(SqlQueryMessageGroup.DECIMAL_FIELD_MESSAGE)
+public interface DecimalFieldMessage extends SingleFieldMessage<BigDecimal> {
+    @Override
+    // TODO replace with byte[]
     @Marshallable
-    @Nullable SharedState sharedState();
+    BigDecimal field();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DoubleFieldMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DoubleFieldMessage.java
index d224c4e50dd..5436d1e1b0a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/DoubleFieldMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.sql.engine.message.field;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link Double} field.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.DOUBLE_FIELD_MESSAGE)
+public interface DoubleFieldMessage extends SingleFieldMessage<Double> {
+    @Override
+    Double field();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/FloatFieldMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/FloatFieldMessage.java
index d224c4e50dd..d5c000e9572 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/FloatFieldMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.sql.engine.message.field;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link Float} field.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.FLOAT_FIELD_MESSAGE)
+public interface FloatFieldMessage extends SingleFieldMessage<Float> {
+    @Override
+    Float field();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/IntFieldMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/IntFieldMessage.java
index d224c4e50dd..bba8f7c712f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/IntFieldMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.sql.engine.message.field;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link Integer} field.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.INT_FIELD_MESSAGE)
+public interface IntFieldMessage extends SingleFieldMessage<Integer> {
+    @Override
+    Integer field();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/LongFieldMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/LongFieldMessage.java
index d224c4e50dd..892b8ccc319 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/LongFieldMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.sql.engine.message.field;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link Long} field.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.LONG_FIELD_MESSAGE)
+public interface LongFieldMessage extends SingleFieldMessage<Long> {
+    @Override
+    Long field();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/ShortFieldMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/ShortFieldMessage.java
index d224c4e50dd..def4abf41d9 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/ShortFieldMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.sql.engine.message.field;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link Short} field.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.SHORT_FIELD_MESSAGE)
+public interface ShortFieldMessage extends SingleFieldMessage<Short> {
+    @Override
+    Short field();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/SingleFieldMessage.java
similarity index 51%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/SingleFieldMessage.java
index d224c4e50dd..6db5eb283d2 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/SingleFieldMessage.java
@@ -15,25 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.sql.engine.message.field;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
+import java.io.Serializable;
+import org.apache.ignite.internal.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single field.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+public interface SingleFieldMessage<T> extends NetworkMessage, Serializable {
+    @Nullable T field();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/StringFieldMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/StringFieldMessage.java
index d224c4e50dd..d3c20c2961a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/StringFieldMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.sql.engine.message.field;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link String} field.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.STRING_FIELD_MESSAGE)
+public interface StringFieldMessage extends SingleFieldMessage<String> {
+    @Override
+    String field();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/TimeFieldMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/TimeFieldMessage.java
index d224c4e50dd..a619a3471a1 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/TimeFieldMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.sql.engine.message.field;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a {@link java.time.LocalTime} field as represented 
by the SQL engine.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.TIME_FIELD_MESSAGE)
+public interface TimeFieldMessage extends SingleFieldMessage<Integer> {
+    @Override
+    Integer field();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/TimestampFieldMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/TimestampFieldMessage.java
index d224c4e50dd..8b1c088e29c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/TimestampFieldMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.sql.engine.message.field;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a {@link java.time.Instant} field as represented by 
the SQL engine.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.TIMESTAMP_FIELD_MESSAGE)
+public interface TimestampFieldMessage extends SingleFieldMessage<Long> {
+    @Override
+    Long field();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/UuidFieldMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/UuidFieldMessage.java
index d224c4e50dd..e062d859e9c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/field/UuidFieldMessage.java
@@ -15,25 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.sql.engine.message.field;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import java.util.UUID;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link UUID} field.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.UUID_FIELD_MESSAGE)
+public interface UuidFieldMessage extends SingleFieldMessage<UUID> {
+    @Override
+    UUID field();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteCorrelatedNestedLoopJoin.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteCorrelatedNestedLoopJoin.java
index 4a7246eba39..32591095333 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteCorrelatedNestedLoopJoin.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteCorrelatedNestedLoopJoin.java
@@ -30,11 +30,13 @@ import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
@@ -51,6 +53,8 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
 public class IgniteCorrelatedNestedLoopJoin extends AbstractIgniteJoin {
     private static final String REL_TYPE_NAME = "CorrelatedNestedLoopJoin";
 
+    private final ImmutableBitSet requiredColumns;
+
     /**
      * Creates a Join.
      *
@@ -64,8 +68,10 @@ public class IgniteCorrelatedNestedLoopJoin extends 
AbstractIgniteJoin {
      *                     tree
      */
     public IgniteCorrelatedNestedLoopJoin(RelOptCluster cluster, RelTraitSet 
traitSet, RelNode left, RelNode right,
-            RexNode condition, Set<CorrelationId> variablesSet, JoinRelType 
joinType) {
+            RexNode condition, Set<CorrelationId> variablesSet, JoinRelType 
joinType, ImmutableBitSet requiredColumns) {
         super(cluster, traitSet, left, right, condition, variablesSet, 
joinType);
+
+        this.requiredColumns = requiredColumns;
     }
 
     /**
@@ -79,14 +85,15 @@ public class IgniteCorrelatedNestedLoopJoin extends 
AbstractIgniteJoin {
                 input.getInputs().get(1),
                 input.getExpression("condition"),
                 
Set.copyOf(Commons.transform(input.getIntegerList("variablesSet"), 
CorrelationId::new)),
-                input.getEnum("joinType", JoinRelType.class));
+                input.getEnum("joinType", JoinRelType.class),
+                input.getBitSet("requiredColumns"));
     }
 
     /** {@inheritDoc} */
     @Override
     public Join copy(RelTraitSet traitSet, RexNode condition, RelNode left, 
RelNode right, JoinRelType joinType,
             boolean semiJoinDone) {
-        return new IgniteCorrelatedNestedLoopJoin(getCluster(), traitSet, 
left, right, condition, variablesSet, joinType);
+        return new IgniteCorrelatedNestedLoopJoin(getCluster(), traitSet, 
left, right, condition, variablesSet, joinType, requiredColumns);
     }
 
     /** {@inheritDoc} */
@@ -148,6 +155,20 @@ public class IgniteCorrelatedNestedLoopJoin extends 
AbstractIgniteJoin {
                 List.of(left.replace(RelCollations.EMPTY), right));
     }
 
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        // TODO use super.explainTerms(...)
+        return super.explainTerms(pw)
+                .item("condition", condition)
+                .item("joinType", joinType.lowerName)
+                // TODO do not write if empty
+                .item("requiredColumns", requiredColumns)
+                .itemIf("variablesSet", variablesSet, !variablesSet.isEmpty())
+                .itemIf(
+                        "systemFields",
+                        getSystemFieldList(),
+                        !getSystemFieldList().isEmpty());
+    }
+
     /** {@inheritDoc} */
     @Override
     public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
@@ -175,7 +196,11 @@ public class IgniteCorrelatedNestedLoopJoin extends 
AbstractIgniteJoin {
     @Override
     public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
         return new IgniteCorrelatedNestedLoopJoin(cluster, getTraitSet(), 
inputs.get(0), inputs.get(1), getCondition(),
-                getVariablesSet(), getJoinType());
+                getVariablesSet(), getJoinType(), getRequiredColumns());
+    }
+
+    public ImmutableBitSet getRequiredColumns() {
+        return requiredColumns;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelateToNestedLoopRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelateToNestedLoopRule.java
index 004185cab65..345926905d1 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelateToNestedLoopRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelateToNestedLoopRule.java
@@ -73,7 +73,8 @@ public class CorrelateToNestedLoopRule extends 
AbstractIgniteConverterRule<Logic
                 right,
                 cluster.getRexBuilder().makeLiteral(true),
                 correlationIds,
-                rel.getJoinType()
+                rel.getJoinType(),
+                rel.getRequiredColumns()
         );
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java
index af9519ad1c9..573f7f08065 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java
@@ -39,6 +39,7 @@ import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexShuttle;
 import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
 import 
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
 
@@ -132,7 +133,8 @@ public class CorrelatedNestedLoopJoinRule extends 
AbstractIgniteConverterRule<Lo
                 right,
                 rel.getCondition(),
                 correlationIds,
-                joinType
+                joinType,
+                ImmutableBitSet.of()
         );
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
index b85dffbdb2a..8b1354f3bc7 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
@@ -50,8 +50,8 @@ public enum IgniteMethod {
     /** See {@link SqlEvaluationContext#rowAccessor()}. */
     CONTEXT_ROW_HANDLER(SqlEvaluationContext.class, "rowAccessor"),
 
-    /** See {@link SqlEvaluationContext#correlatedVariable(int)}. */
-    CONTEXT_GET_CORRELATED_VALUE(SqlEvaluationContext.class, 
"correlatedVariable", int.class),
+    /** See {@link SqlEvaluationContext#correlatedVariable(int, int)}. */
+    CONTEXT_GET_CORRELATED_VALUE(SqlEvaluationContext.class, 
"correlatedVariable", int.class, int.class),
 
     /** See {@link IgniteSqlDateTimeUtils#subtractTimeZoneOffset(long, 
TimeZone)}. **/
     SUBTRACT_TIMEZONE_OFFSET(IgniteSqlDateTimeUtils.class, 
"subtractTimeZoneOffset", long.class, TimeZone.class),
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java
index bae4ea2971a..e9019087d79 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java
@@ -35,6 +35,7 @@ import java.util.stream.StreamSupport;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.sql.engine.api.expressions.RowFactoryFactory;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
@@ -44,6 +45,7 @@ import 
org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.type.StructNativeType;
 import org.junit.jupiter.api.Assumptions;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -75,6 +77,11 @@ public class CorrelatedNestedLoopJoinExecutionTest extends 
AbstractExecutionTest
 
         IgniteTypeFactory tf = ctx.getTypeFactory();
         RelDataType rightType = TypeUtils.createRowType(tf, 
TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
+        StructNativeType leftSchema = NativeTypes.structBuilder()
+                .addField("C0", NativeTypes.INT32, true)
+                .addField("C1", NativeTypes.STRING, true)
+                .addField("C2", NativeTypes.INT32, true)
+                .build();
 
         CorrelatedNestedLoopJoinNode<Object[]> join = new 
CorrelatedNestedLoopJoinNode<>(
                 ctx,
@@ -82,7 +89,10 @@ public class CorrelatedNestedLoopJoinExecutionTest extends 
AbstractExecutionTest
                 Set.of(new CorrelationId(0)),
                 joinType,
                 
ctx.rowFactoryFactory().create(convertStructuredType(rightType)),
-                identityProjection()
+                identityProjection(),
+                // TODO
+                leftSchema,
+                ImmutableBitSet.of()
         );
 
         join.register(Arrays.asList(left, right));
@@ -229,6 +239,11 @@ public class CorrelatedNestedLoopJoinExecutionTest extends 
AbstractExecutionTest
             JoinRelType joinType) {
         IgniteTypeFactory tf = ctx.getTypeFactory();
         RelDataType rightType = TypeUtils.createRowType(tf, 
TypeUtils.native2relationalTypes(tf, NativeTypes.INT32, NativeTypes.STRING));
+        StructNativeType leftSchema = NativeTypes.structBuilder()
+                .addField("C0", NativeTypes.INT32, true)
+                .addField("C1", NativeTypes.STRING, true)
+                .addField("C2", NativeTypes.INT32, true)
+                .build();
 
         RowHandler<Object[]> hnd = ctx.rowAccessor();
 
@@ -238,7 +253,9 @@ public class CorrelatedNestedLoopJoinExecutionTest extends 
AbstractExecutionTest
                 Set.of(new CorrelationId(0)),
                 joinType,
                 
ctx.rowFactoryFactory().create(convertStructuredType(rightType)),
-                identityProjection()
+                identityProjection(),
+                leftSchema,
+                ImmutableBitSet.of()
         );
     }
 

Reply via email to