This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 01f149382d7 IGNITE-24995 Optimize correaltes serialization when 
propagating to another node (#7471)
01f149382d7 is described below

commit 01f149382d71af5f5c48408fe0ec8a3e89bd6522
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Thu Feb 5 15:05:49 2026 +0300

    IGNITE-24995 Optimize correaltes serialization when propagating to another 
node (#7471)
---
 .../internal/network/NetworkMessageTypes.java      |  71 ++++++++
 .../message/value/BooleanValueMessage.java}        |  23 +--
 .../message/value/ByteArrayValueMessage.java}      |  23 +--
 .../network/message/value/ByteValueMessage.java}   |  23 +--
 .../network/message/value/DoubleValueMessage.java} |  23 +--
 .../network/message/value/FloatValueMessage.java}  |  23 +--
 .../network/message/value/IntValueMessage.java}    |  23 +--
 .../network/message/value/LongValueMessage.java}   |  23 +--
 .../network/message/value/NullValueMessage.java}   |  22 +--
 .../network/message/value/ShortValueMessage.java}  |  23 +--
 .../network/message/value/SingleValueMessage.java} |  21 +--
 .../network/message/value/StringValueMessage.java} |  23 +--
 .../network/message/value/UuidValueMessage.java}   |  24 +--
 .../internal/sql/engine/ItSqlOperatorsTest.java    |   2 +-
 .../integrationTest/sql/group1/explain/join.test   |   1 +
 .../integrationTest/sql/group1/explain/scan.test   |   2 +
 .../sql/engine/exec/ExchangeServiceImpl.java       |   7 +-
 .../internal/sql/engine/exec/ExecutionContext.java |   9 +-
 .../sql/engine/exec/LogicalRelImplementor.java     |   2 +-
 .../internal/sql/engine/exec/SharedState.java      |  42 +++--
 .../engine/exec/SharedStateMessageConverter.java   | 188 +++++++++++++++++++++
 .../sql/engine/exec/SqlEvaluationContext.java      |   5 +-
 .../sql/engine/exec/exp/CorrelatesBuilder.java     |  21 +--
 .../engine/exec/exp/CorrelationValueGetter.java    |  45 +++++
 .../engine/exec/exp/JoinPredicateImplementor.java  |   2 +-
 .../engine/exec/exp/JoinProjectionImplementor.java |   2 +-
 .../sql/engine/exec/exp/PredicateImplementor.java  |   2 +-
 .../sql/engine/exec/exp/ProjectionImplementor.java |   2 +-
 .../engine/exec/exp/RowProviderImplementor.java    |   4 +-
 .../sql/engine/exec/exp/ScalarImplementor.java     |   5 +-
 .../exec/rel/CorrelatedNestedLoopJoinNode.java     |  21 ++-
 .../expressions/SqlExpressionFactoryAdapter.java   |   4 +-
 .../sql/engine/externalize/RelJsonReader.java      |   8 +-
 ...equestMessage.java => DecimalValueMessage.java} |  22 +--
 .../engine/message/QueryBatchRequestMessage.java   |   7 +-
 ...RequestMessage.java => SharedStateMessage.java} |  21 +--
 .../sql/engine/message/SqlQueryMessageGroup.java   |   6 +
 .../engine/rel/IgniteCorrelatedNestedLoopJoin.java |  47 ++++--
 .../sql/engine/rel/explain/IgniteRelWriter.java    |   9 +
 .../engine/rel/explain/RelTreeToTextWriter.java    |   8 +
 .../sql/engine/rule/CorrelateToNestedLoopRule.java |   1 +
 .../engine/rule/CorrelatedNestedLoopJoinRule.java  |   7 +
 .../ignite/internal/sql/engine/util/Commons.java   |  11 ++
 .../internal/sql/engine/util/IgniteMethod.java     |   4 +-
 .../exec/SharedStateMessageConverterTest.java      | 141 ++++++++++++++++
 .../internal/sql/engine/exec/SharedStateTest.java  |  77 +++++++++
 .../rel/CorrelatedNestedLoopJoinExecutionTest.java |  78 ++++++++-
 .../CorrelatedNestedLoopJoinPlannerTest.java       |  35 +++-
 .../internal/sql/engine/util/CommonsTest.java      |  29 ++++
 .../src/test/resources/mapping/correlated.test     |   7 +
 .../resources/mapping/test_partition_pruning.test  |   2 +
 51 files changed, 929 insertions(+), 302 deletions(-)

diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
index 5ef73fe44ba..fbbd5b4964a 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
@@ -25,6 +25,17 @@ import 
org.apache.ignite.internal.network.message.FieldDescriptorMessage;
 import org.apache.ignite.internal.network.message.InvokeRequest;
 import org.apache.ignite.internal.network.message.InvokeResponse;
 import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.network.message.value.BooleanValueMessage;
+import org.apache.ignite.internal.network.message.value.ByteArrayValueMessage;
+import org.apache.ignite.internal.network.message.value.ByteValueMessage;
+import org.apache.ignite.internal.network.message.value.DoubleValueMessage;
+import org.apache.ignite.internal.network.message.value.FloatValueMessage;
+import org.apache.ignite.internal.network.message.value.IntValueMessage;
+import org.apache.ignite.internal.network.message.value.LongValueMessage;
+import org.apache.ignite.internal.network.message.value.NullValueMessage;
+import org.apache.ignite.internal.network.message.value.ShortValueMessage;
+import org.apache.ignite.internal.network.message.value.StringValueMessage;
+import org.apache.ignite.internal.network.message.value.UuidValueMessage;
 import 
org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
 import 
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
@@ -101,4 +112,64 @@ public class NetworkMessageTypes {
      * Type for {@link ProbeMessage}.
      */
     public static final short PROBE_MESSAGE = 12;
+
+    /**
+     * Message types that contain a single value of a certain type.
+     */
+    public interface SingleValueMessages {
+        /**
+         * Type for {@link BooleanValueMessage}.
+         */
+        short BOOLEAN_VALUE_MESSAGE = 101;
+
+        /**
+         * Type for {@link ByteValueMessage}.
+         */
+        short BYTE_VALUE_MESSAGE = 102;
+
+        /**
+         * Type for {@link ShortValueMessage}.
+         */
+        short SHORT_VALUE_MESSAGE = 103;
+
+        /**
+         * Type for {@link IntValueMessage}.
+         */
+        short INT_VALUE_MESSAGE = 104;
+
+        /**
+         * Type for {@link LongValueMessage}.
+         */
+        short LONG_VALUE_MESSAGE = 105;
+
+        /**
+         * Type for {@link FloatValueMessage}.
+         */
+        short FLOAT_VALUE_MESSAGE = 106;
+
+        /**
+         * Type for {@link DoubleValueMessage}.
+         */
+        short DOUBLE_VALUE_MESSAGE = 107;
+
+        /**
+         * Type for {@link UuidValueMessage}.
+         */
+        short UUID_VALUE_MESSAGE = 108;
+
+        /**
+         * Type for {@link StringValueMessage}.
+         */
+        short STRING_VALUE_MESSAGE = 109;
+
+        /**
+         * Type for {@link ByteArrayValueMessage}.
+         */
+        short BYTE_ARRAY_VALUE_MESSAGE = 110;
+
+        /**
+         * Type for {@link NullValueMessage}.
+         */
+        short NULL_VALUE_MESSAGE = 111;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/BooleanValueMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/BooleanValueMessage.java
index d224c4e50dd..24fa39537ba 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/BooleanValueMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import 
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link Boolean} value.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.BOOLEAN_VALUE_MESSAGE)
+public interface BooleanValueMessage extends SingleValueMessage<Boolean> {
+    @Override
+    Boolean value();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ByteArrayValueMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ByteArrayValueMessage.java
index d224c4e50dd..a141b12e42b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ByteArrayValueMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import 
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@code byte[]} value.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.BYTE_ARRAY_VALUE_MESSAGE)
+public interface ByteArrayValueMessage extends SingleValueMessage<byte[]> {
+    @Override
+    byte[] value();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ByteValueMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ByteValueMessage.java
index d224c4e50dd..ec66655f6eb 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ByteValueMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import 
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link Byte} value.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.BYTE_VALUE_MESSAGE)
+public interface ByteValueMessage extends SingleValueMessage<Byte> {
+    @Override
+    Byte value();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/DoubleValueMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/DoubleValueMessage.java
index d224c4e50dd..2ade90598b6 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/DoubleValueMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import 
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link Double} value.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.DOUBLE_VALUE_MESSAGE)
+public interface DoubleValueMessage extends SingleValueMessage<Double> {
+    @Override
+    Double value();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/FloatValueMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/FloatValueMessage.java
index d224c4e50dd..443e897c40a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/FloatValueMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import 
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link Float} value.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.FLOAT_VALUE_MESSAGE)
+public interface FloatValueMessage extends SingleValueMessage<Float> {
+    @Override
+    Float value();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/IntValueMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/IntValueMessage.java
index d224c4e50dd..514d39f6121 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/IntValueMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import 
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link Integer} value.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.INT_VALUE_MESSAGE)
+public interface IntValueMessage extends SingleValueMessage<Integer> {
+    @Override
+    Integer value();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/LongValueMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/LongValueMessage.java
index d224c4e50dd..4f3506c8be2 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/LongValueMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import 
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link Long} value.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.LONG_VALUE_MESSAGE)
+public interface LongValueMessage extends SingleValueMessage<Long> {
+    @Override
+    Long value();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/NullValueMessage.java
similarity index 55%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/NullValueMessage.java
index d224c4e50dd..b341c61551e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/NullValueMessage.java
@@ -15,25 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import 
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@code null} value.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.NULL_VALUE_MESSAGE)
+public interface NullValueMessage extends SingleValueMessage<Boolean> {
+    @Override
+    @Nullable Boolean value();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ShortValueMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ShortValueMessage.java
index d224c4e50dd..3cfca1f9a5e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ShortValueMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import 
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link Short} value.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.SHORT_VALUE_MESSAGE)
+public interface ShortValueMessage extends SingleValueMessage<Short> {
+    @Override
+    Short value();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/SingleValueMessage.java
similarity index 51%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/SingleValueMessage.java
index d224c4e50dd..5776e024649 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/SingleValueMessage.java
@@ -15,25 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
+import org.apache.ignite.internal.network.NetworkMessage;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single value.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+public interface SingleValueMessage<T> extends NetworkMessage {
+    @Nullable T value();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/StringValueMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/StringValueMessage.java
index d224c4e50dd..185d053f99d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/StringValueMessage.java
@@ -15,25 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import 
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link String} value.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.STRING_VALUE_MESSAGE)
+public interface StringValueMessage extends SingleValueMessage<String> {
+    @Override
+    String value();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/UuidValueMessage.java
similarity index 52%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/UuidValueMessage.java
index d224c4e50dd..623d4a09ad1 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/UuidValueMessage.java
@@ -15,25 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import java.util.UUID;
+import 
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link UUID} value.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.UUID_VALUE_MESSAGE)
+public interface UuidValueMessage extends SingleValueMessage<UUID> {
+    @Override
+    UUID value();
 }
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlOperatorsTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlOperatorsTest.java
index 3129d720b9f..ceaf1baed21 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlOperatorsTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlOperatorsTest.java
@@ -265,7 +265,7 @@ public class ItSqlOperatorsTest extends 
BaseSqlIntegrationTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20162";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19332";)
     public void testCollections() {
         assertExpression("MAP['a', 1, 'A', 2]").returns(Map.of("a", 1, "A", 
2)).check();
         assertExpression("ARRAY[1, 2, 3]").returns(List.of(1, 2, 3)).check();
diff --git 
a/modules/sql-engine/src/integrationTest/sql/group1/explain/join.test 
b/modules/sql-engine/src/integrationTest/sql/group1/explain/join.test
index ef0c072b2a7..aecb770b2fc 100644
--- a/modules/sql-engine/src/integrationTest/sql/group1/explain/join.test
+++ b/modules/sql-engine/src/integrationTest/sql/group1/explain/join.test
@@ -224,6 +224,7 @@ CorrelatedNestedLoopJoin
     predicate: true
     type: inner
     correlates: [$cor1]
+    correlationFieldNames: [X]
     est: (rows=100)
   TableFunctionScan
       fieldNames: [X]
diff --git 
a/modules/sql-engine/src/integrationTest/sql/group1/explain/scan.test 
b/modules/sql-engine/src/integrationTest/sql/group1/explain/scan.test
index 71639c28a33..33c50e3fb3e 100644
--- a/modules/sql-engine/src/integrationTest/sql/group1/explain/scan.test
+++ b/modules/sql-engine/src/integrationTest/sql/group1/explain/scan.test
@@ -216,6 +216,7 @@ Project
       predicate: true
       type: left
       correlates: [$cor0]
+      correlationFieldNames: [RENAMED_C1]
       est: (rows=1)
     Exchange
         distribution: single
@@ -256,6 +257,7 @@ Project
       predicate: true
       type: left
       correlates: [$cor0]
+      correlationFieldNames: [DOUBLED_C1]
       est: (rows=1)
     Exchange
         distribution: single
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
index 674b8e37f16..2130c3c1368 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
 import org.apache.ignite.internal.sql.engine.message.MessageService;
 import org.apache.ignite.internal.sql.engine.message.QueryBatchMessage;
 import org.apache.ignite.internal.sql.engine.message.QueryBatchRequestMessage;
+import org.apache.ignite.internal.sql.engine.message.SharedStateMessage;
 import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
 import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
 import org.apache.ignite.internal.util.ExceptionUtils;
@@ -112,7 +113,7 @@ public class ExchangeServiceImpl implements ExchangeService 
{
                         .fragmentId(fragmentId)
                         .exchangeId(exchangeId)
                         .amountOfBatches(amountOfBatches)
-                        .sharedState(state)
+                        
.sharedStateMessage(SharedStateMessageConverter.toMessage(state))
                         .build()
         );
     }
@@ -153,7 +154,9 @@ public class ExchangeServiceImpl implements ExchangeService 
{
 
         Consumer<Outbox<?>> onRequestHandler = outbox -> {
             try {
-                SharedState state = msg.sharedState();
+                SharedStateMessage sharedStateMessage = 
msg.sharedStateMessage();
+                SharedState state = 
SharedStateMessageConverter.fromMessage(sharedStateMessage);
+
                 if (state != null) {
                     outbox.onRewindRequest(node.name(), state, 
msg.amountOfBatches());
                 } else {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index 5716c1fbaa4..d66109f15a9 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.sql.engine.exec;
 
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
-import static org.apache.ignite.internal.sql.engine.util.Commons.cast;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import java.time.Clock;
@@ -359,17 +358,17 @@ public class ExecutionContext<RowT> implements 
SqlEvaluationContext<RowT> {
     }
 
     @Override
-    public RowT correlatedVariable(int id) {
-        return cast(sharedState.correlatedVariable(id));
+    public @Nullable Object correlatedVariable(long id) {
+        return sharedState.correlatedVariable(id);
     }
 
     /**
      * Sets correlated value.
      *
-     * @param id Correlation ID.
+     * @param id Composite identifier consisting of the correlated variable ID 
and the field index.
      * @param value Correlated value.
      */
-    public void correlatedVariable(Object value, int id) {
+    public void correlatedVariable(long id, @Nullable Object value) {
         sharedState.correlatedVariable(id, value);
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index dfd117016ca..525f55e7f9b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -370,7 +370,7 @@ public class LogicalRelImplementor<RowT> implements 
IgniteRelVisitor<Node<RowT>>
         RowFactory<RowT> rightRowFactory = 
ctx.rowFactoryFactory().create(convertStructuredType(rightType));
 
         Node<RowT> node = new CorrelatedNestedLoopJoinNode<>(ctx, cond, 
rel.getVariablesSet(),
-                rel.getJoinType(), rightRowFactory, joinProjection);
+                rel.getCorrelationColumns(), rel.getJoinType(), 
rightRowFactory, joinProjection);
 
         Node<RowT> leftInput = visit(rel.getLeft());
         Node<RowT> rightInput = visit(rel.getRight());
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java
index ccb3dbd8ff7..ad3ae378bc0 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java
@@ -17,41 +17,53 @@
 
 package org.apache.ignite.internal.sql.engine.exec;
 
-import static org.apache.ignite.internal.sql.engine.util.Commons.checkRange;
-
-import java.io.Serializable;
-import org.apache.ignite.internal.sql.engine.util.Commons;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * This class represents the volatile state that may be propagated from parent 
to its children
  * during rewind.
  */
-public class SharedState implements Serializable {
-    private static final long serialVersionUID = 42L;
+public class SharedState {
+    private final Long2ObjectMap<Object> correlations;
+
+    public SharedState() {
+        this(new Long2ObjectOpenHashMap<>());
+    }
 
-    private Object[] correlations = new Object[16];
+    SharedState(Long2ObjectMap<Object> correlations) {
+        this.correlations = correlations;
+    }
 
     /**
      * Gets correlated value.
      *
-     * @param id Correlation ID.
+     * @param id Composite identifier consisting of the correlated variable ID 
and the field index.
      * @return Correlated value.
      */
-    public Object correlatedVariable(int id) {
-        checkRange(correlations, id);
+    public @Nullable Object correlatedVariable(long id) {
+        Object value = correlations.getOrDefault(id, this);
+
+        if (value == this) {
+            throw new IllegalStateException("Correlated variable is not set 
[id=" + id + "]");
+        }
 
-        return correlations[id];
+        return value;
     }
 
     /**
      * Sets correlated value.
      *
-     * @param id Correlation ID.
+     * @param id Composite identifier consisting of the correlated variable ID 
and the field index.
      * @param value Correlated value.
      */
-    public void correlatedVariable(int id, Object value) {
-        correlations = Commons.ensureCapacity(correlations, id + 1);
+    public void correlatedVariable(long id, @Nullable Object value) {
+        correlations.put(id, value);
+    }
 
-        correlations[id] = value;
+    Long2ObjectMap<Object> correlations() {
+        return Long2ObjectMaps.unmodifiable(correlations);
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverter.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverter.java
new file mode 100644
index 00000000000..e31c3499d6c
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverter.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.calcite.avatica.util.ByteString;
+import org.apache.ignite.internal.network.NetworkMessage;
+import 
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.value.SingleValueMessage;
+import org.apache.ignite.internal.sql.engine.message.SharedStateMessage;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Contract;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converter between {@link SharedState} and {@link SharedStateMessage}.
+ */
+public class SharedStateMessageConverter {
+    /** Message factory. */
+    private static final SqlQueryMessagesFactory SQL_MESSAGE_FACTORY = new 
SqlQueryMessagesFactory();
+
+    private static final NetworkMessagesFactory NETOWRK_MESSAGE_FACTORY = new 
NetworkMessagesFactory();
+
+    @Contract("null -> null; !null -> !null")
+    static @Nullable SharedStateMessage toMessage(@Nullable SharedState state) 
{
+        if (state == null) {
+            return null;
+        }
+
+        Long2ObjectMap<Object> correlations = state.correlations();
+        Map<Long, NetworkMessage> result = 
IgniteUtils.newHashMap(correlations.size());
+
+        for (Long2ObjectMap.Entry<Object> entry : 
correlations.long2ObjectEntrySet()) {
+            SingleValueMessage<?> msg = toSingleValueMessage(entry.getValue());
+
+            result.put(entry.getLongKey(), msg);
+        }
+
+        return SQL_MESSAGE_FACTORY.sharedStateMessage()
+                .sharedState(result)
+                .build();
+    }
+
+    @Contract("null -> null; !null -> !null")
+    static @Nullable SharedState fromMessage(@Nullable SharedStateMessage 
sharedStateMessage) {
+        if (sharedStateMessage == null) {
+            return null;
+        }
+
+        int size = sharedStateMessage.sharedState().size();
+        Long2ObjectMap<Object> correlations = new 
Long2ObjectOpenHashMap<>(size);
+
+        for (Map.Entry<Long, NetworkMessage> e : 
sharedStateMessage.sharedState().entrySet()) {
+            NetworkMessage networkMessage = e.getValue();
+
+            if (!(networkMessage instanceof SingleValueMessage)) {
+                throw new IllegalArgumentException("Unexpected message type "
+                        + "[type=" + networkMessage.messageType() + ", class=" 
+ networkMessage.getClass() + ']');
+            }
+
+            SingleValueMessage<Object> singleFieldMessage = 
((SingleValueMessage<Object>) networkMessage);
+
+            correlations.put(e.getKey().longValue(), 
extractFieldValue(singleFieldMessage));
+        }
+
+        return new SharedState(correlations);
+    }
+
+    private static @Nullable Object 
extractFieldValue(SingleValueMessage<Object> msg) {
+        Object value = msg.value();
+
+        if (value == null) {
+            return null;
+        }
+
+        if (msg.groupType() == SqlQueryMessageGroup.GROUP_TYPE) {
+            assert msg.messageType() == 
SqlQueryMessageGroup.DECIMAL_VALUE_MESSAGE : msg.messageType();
+
+            return decimalFromBytes((byte[]) value);
+        }
+
+        switch (msg.messageType()) {
+            case SingleValueMessages.BYTE_ARRAY_VALUE_MESSAGE:
+                return new ByteString((byte[]) value);
+
+            case SingleValueMessages.NULL_VALUE_MESSAGE:
+                return null;
+
+            default:
+                return value;
+        }
+    }
+
+    private static BigDecimal decimalFromBytes(byte[] value) {
+        ByteBuffer buffer = 
ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN);
+
+        short valScale = buffer.getShort();
+
+        BigInteger integer = new BigInteger(value, Short.BYTES, value.length - 
Short.BYTES);
+
+        return new BigDecimal(integer, valScale);
+    }
+
+    private static byte[] decimalToBytes(BigDecimal value) {
+        if (value.scale() > Short.MAX_VALUE || value.scale() < 
Short.MIN_VALUE) {
+            throw new UnsupportedOperationException("Decimal scale is out of 
range: " + value.scale());
+        }
+
+        byte[] unscaledBytes = value.unscaledValue().toByteArray();
+
+        ByteBuffer buffer = ByteBuffer.allocate(Short.BYTES + 
unscaledBytes.length)
+                .order(ByteOrder.LITTLE_ENDIAN);
+
+        buffer.putShort((short) value.scale());
+        buffer.put(unscaledBytes);
+
+        return buffer.array();
+    }
+
+    private static SingleValueMessage<?> toSingleValueMessage(Object value) {
+        if (value == null) {
+            return NETOWRK_MESSAGE_FACTORY.nullValueMessage().build();
+        }
+
+        if (value instanceof Boolean) {
+            return 
NETOWRK_MESSAGE_FACTORY.booleanValueMessage().value((Boolean) value).build();
+        }
+        if (value instanceof Byte) {
+            return NETOWRK_MESSAGE_FACTORY.byteValueMessage().value((Byte) 
value).build();
+        }
+        if (value instanceof Short) {
+            return NETOWRK_MESSAGE_FACTORY.shortValueMessage().value((Short) 
value).build();
+        }
+        if (value instanceof Integer) {
+            return NETOWRK_MESSAGE_FACTORY.intValueMessage().value((Integer) 
value).build();
+        }
+        if (value instanceof Long) {
+            return NETOWRK_MESSAGE_FACTORY.longValueMessage().value((Long) 
value).build();
+        }
+        if (value instanceof Float) {
+            return NETOWRK_MESSAGE_FACTORY.floatValueMessage().value((Float) 
value).build();
+        }
+        if (value instanceof Double) {
+            return NETOWRK_MESSAGE_FACTORY.doubleValueMessage().value((Double) 
value).build();
+        }
+        if (value instanceof BigDecimal) {
+            return 
SQL_MESSAGE_FACTORY.decimalValueMessage().value(decimalToBytes((BigDecimal) 
value)).build();
+        }
+        if (value instanceof UUID) {
+            return NETOWRK_MESSAGE_FACTORY.uuidValueMessage().value((UUID) 
value).build();
+        }
+        if (value instanceof String) {
+            return NETOWRK_MESSAGE_FACTORY.stringValueMessage().value((String) 
value).build();
+        }
+        if (value instanceof ByteString) {
+            ByteString byteString = (ByteString) value;
+
+            return 
NETOWRK_MESSAGE_FACTORY.byteArrayValueMessage().value(byteString.getBytes()).build();
+        }
+
+        throw new IllegalArgumentException("Unsupported type: " + 
value.getClass());
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java
index bd7aa510c17..dd7bf8321f4 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java
@@ -21,6 +21,7 @@ import org.apache.calcite.DataContext;
 import org.apache.ignite.internal.sql.engine.api.expressions.RowAccessor;
 import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
 import org.apache.ignite.internal.sql.engine.api.expressions.RowFactoryFactory;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Provides the contextual environment required for evaluating SQL expressions.
@@ -38,6 +39,6 @@ public interface SqlEvaluationContext<RowT> extends 
DataContext {
     /** Returns a factory capable of producing {@link RowFactory} instances 
for the {@code RowT} row representation. */
     RowFactoryFactory<RowT> rowFactoryFactory();
 
-    /** Returns row representing correlation source by given correlation id. */
-    RowT correlatedVariable(int id);
+    /** Returns the value of a correlated variable identified by the given 
correlation id. */
+    @Nullable Object correlatedVariable(long corrId);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java
index 0db383a5e92..3be34a6b8b0 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java
@@ -20,27 +20,18 @@ package org.apache.ignite.internal.sql.engine.exec.exp;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.calcite.linq4j.function.Function1;
-import org.apache.calcite.linq4j.tree.BlockBuilder;
 import org.apache.calcite.linq4j.tree.Expression;
-import org.apache.calcite.linq4j.tree.Expressions;
 import org.apache.calcite.rex.RexCorrelVariable;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexShuttle;
 import 
org.apache.ignite.internal.sql.engine.exec.exp.RexToLixTranslator.InputGetter;
-import org.apache.ignite.internal.sql.engine.util.IgniteMethod;
 
 class CorrelatesBuilder extends RexShuttle {
-    private final BlockBuilder builder;
-
     private final Expression ctx;
 
-    private final Expression hnd;
-
-    private Map<String, FieldGetter> correlates;
+    private Map<String, InputGetter> correlates;
 
-    CorrelatesBuilder(BlockBuilder builder, Expression ctx, Expression hnd) {
-        this.builder = builder;
-        this.hnd = hnd;
+    CorrelatesBuilder(Expression ctx) {
         this.ctx = ctx;
     }
 
@@ -61,15 +52,13 @@ class CorrelatesBuilder extends RexShuttle {
     /** {@inheritDoc} */
     @Override
     public RexNode visitCorrelVariable(RexCorrelVariable variable) {
-        Expression corr = builder.append("corr",
-                Expressions.call(ctx, 
IgniteMethod.CONTEXT_GET_CORRELATED_VALUE.method(),
-                        Expressions.constant(variable.id.getId())));
-
         if (correlates == null) {
             correlates = new HashMap<>();
         }
 
-        correlates.put(variable.getName(), new FieldGetter(hnd, corr, 
variable.getType()));
+        InputGetter inputGetter = new CorrelationValueGetter(ctx, variable);
+
+        correlates.put(variable.getName(), inputGetter);
 
         return variable;
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelationValueGetter.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelationValueGetter.java
new file mode 100644
index 00000000000..7989fa98fb8
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelationValueGetter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp;
+
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.IgniteMethod;
+
+class CorrelationValueGetter extends CommonFieldGetter {
+    private final Expression ctx;
+    private final int correlationId;
+
+    CorrelationValueGetter(Expression ctx, RexCorrelVariable variable) {
+        super(null, null, variable.getType());
+
+        this.ctx = ctx;
+        this.correlationId = variable.id.getId();
+    }
+
+    @Override
+    protected Expression fillExpressions(BlockBuilder list, int index) {
+        long id = Commons.packIntsToLong(correlationId, index);
+
+        return list.append("corr",
+                Expressions.call(ctx, 
IgniteMethod.CONTEXT_GET_CORRELATED_VALUE.method(), Expressions.constant(id)));
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/JoinPredicateImplementor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/JoinPredicateImplementor.java
index b923210f6f2..42f1ac8941f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/JoinPredicateImplementor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/JoinPredicateImplementor.java
@@ -100,7 +100,7 @@ class JoinPredicateImplementor {
 
         InputGetter inputGetter = new BiFieldGetter(rowHandler, left, right, 
type, firstRowSize);
 
-        Function1<String, InputGetter> correlates = new 
CorrelatesBuilder(builder, ctx, rowHandler)
+        Function1<String, InputGetter> correlates = new CorrelatesBuilder(ctx)
                 .build(List.of(predicateExpression));
 
         Expression condition = RexToLixTranslator.translateCondition(program, 
typeFactory, builder,
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/JoinProjectionImplementor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/JoinProjectionImplementor.java
index 1060c59b9b5..1e285eba138 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/JoinProjectionImplementor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/JoinProjectionImplementor.java
@@ -113,7 +113,7 @@ class JoinProjectionImplementor {
 
         InputGetter inputGetter = new BiFieldGetter(rowHandler, left, right, 
type, firstRowSize);
 
-        Function1<String, InputGetter> correlates = new 
CorrelatesBuilder(builder, ctx, rowHandler).build(projections);
+        Function1<String, InputGetter> correlates = new 
CorrelatesBuilder(ctx).build(projections);
 
         List<Expression> projects = 
RexToLixTranslator.translateProjects(program, typeFactory, conformance,
                 builder, null, null, ctx, inputGetter, correlates);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/PredicateImplementor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/PredicateImplementor.java
index 463c4700009..2c340be9ef3 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/PredicateImplementor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/PredicateImplementor.java
@@ -98,7 +98,7 @@ class PredicateImplementor {
 
         InputGetter inputGetter = new FieldGetter(rowHandler, row, type);
 
-        Function1<String, InputGetter> correlates = new 
CorrelatesBuilder(builder, ctx, rowHandler)
+        Function1<String, InputGetter> correlates = new CorrelatesBuilder(ctx)
                 .build(List.of(predicateExpression));
 
         Expression condition = RexToLixTranslator.translateCondition(program, 
typeFactory, builder,
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ProjectionImplementor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ProjectionImplementor.java
index 029e601f784..23f57503169 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ProjectionImplementor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ProjectionImplementor.java
@@ -111,7 +111,7 @@ class ProjectionImplementor {
 
         InputGetter inputGetter = new FieldGetter(rowHandler, row, 
inputRowType);
 
-        Function1<String, InputGetter> correlates = new 
CorrelatesBuilder(builder, ctx, rowHandler).build(projections);
+        Function1<String, InputGetter> correlates = new 
CorrelatesBuilder(ctx).build(projections);
 
         List<Expression> projects = 
RexToLixTranslator.translateProjects(program, typeFactory, conformance,
                 builder, null, null, ctx, inputGetter, correlates);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RowProviderImplementor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RowProviderImplementor.java
index 47c7f723039..9680f7ec706 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RowProviderImplementor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RowProviderImplementor.java
@@ -133,9 +133,7 @@ class RowProviderImplementor {
                 Expressions.declare(Modifier.FINAL, DataContext.ROOT, 
Expressions.convert_(ctx, DataContext.class))
         );
 
-        Expression rowHandler = builder.append("hnd", Expressions.call(ctx, 
IgniteMethod.CONTEXT_ROW_HANDLER.method()));
-
-        Function1<String, InputGetter> correlates = new 
CorrelatesBuilder(builder, ctx, rowHandler).build(values);
+        Function1<String, InputGetter> correlates = new 
CorrelatesBuilder(ctx).build(values);
 
         List<Expression> projects = 
RexToLixTranslator.translateProjects(program, typeFactory, conformance,
                 builder, null, null, ctx, NoOpFieldGetter.INSTANCE, 
correlates);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ScalarImplementor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ScalarImplementor.java
index 8ef8cd33c8b..8057d7f3732 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ScalarImplementor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ScalarImplementor.java
@@ -43,7 +43,6 @@ import org.apache.calcite.sql.validate.SqlConformance;
 import org.apache.ignite.internal.sql.engine.exec.SqlEvaluationContext;
 import 
org.apache.ignite.internal.sql.engine.exec.exp.RexToLixTranslator.InputGetter;
 import org.apache.ignite.internal.sql.engine.util.Commons;
-import org.apache.ignite.internal.sql.engine.util.IgniteMethod;
 import org.apache.ignite.internal.sql.engine.util.Primitives;
 import org.apache.ignite.internal.sql.engine.util.RexUtils;
 import org.apache.ignite.internal.sql.engine.util.cache.Cache;
@@ -109,11 +108,9 @@ class ScalarImplementor {
 
         ParameterExpression ctx = 
Expressions.parameter(SqlEvaluationContext.class, "ctx");
 
-        Expression rowHandler = builder.append("hnd", Expressions.call(ctx, 
IgniteMethod.CONTEXT_ROW_HANDLER.method()));
-
         Function1<String, InputGetter> correlates = scalarValue instanceof 
RexDynamicParam
                 ? null
-                : new CorrelatesBuilder(builder, ctx, 
rowHandler).build(List.of(scalarValue));
+                : new CorrelatesBuilder(ctx).build(List.of(scalarValue));
 
         List<Expression> projects = 
RexToLixTranslator.translateProjects(program, typeFactory, conformance,
                 builder, null, null, ctx, NoOpFieldGetter.INSTANCE, 
correlates);
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
index ecc5bbabe81..0d269190deb 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
@@ -28,10 +28,12 @@ import java.util.Set;
 import java.util.function.BiPredicate;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.lang.IgniteStringBuilder;
 import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.exp.SqlJoinProjection;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 
 /**
  * CorrelatedNestedLoopJoinNode.
@@ -54,6 +56,8 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends 
AbstractNode<RowT> {
 
     private final RowT rightEmptyRow;
 
+    private final ImmutableBitSet correlationColumns;
+
     private int requested;
 
     private int waitingLeft;
@@ -77,9 +81,10 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends 
AbstractNode<RowT> {
     /**
      * Creates CorrelatedNestedLoopJoin node.
      *
-     * @param ctx  Execution context.
+     * @param ctx Execution context.
      * @param cond Join expression.
-     * @param correlationIds Set of collections ids.
+     * @param correlationIds Set of correlation ids.
+     * @param correlationColumns Set of columns that are used by correlation.
      * @param joinType Join rel type.
      * @param rightRowFactory Right row factory.
      * @param joinProjection Output row factory.
@@ -88,6 +93,7 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends 
AbstractNode<RowT> {
             ExecutionContext<RowT> ctx,
             BiPredicate<RowT, RowT> cond,
             Set<CorrelationId> correlationIds,
+            ImmutableBitSet correlationColumns,
             JoinRelType joinType,
             RowFactory<RowT> rightRowFactory,
             SqlJoinProjection joinProjection
@@ -101,6 +107,7 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends 
AbstractNode<RowT> {
         this.correlationIds = new ArrayList<>(correlationIds);
         this.joinType = joinType;
         this.joinProjection = joinProjection;
+        this.correlationColumns = correlationColumns;
 
         leftInBufferSize = correlationIds.size();
         rightInBufferSize = inBufSize;
@@ -485,7 +492,15 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends 
AbstractNode<RowT> {
     private void prepareCorrelations() {
         for (int i = 0; i < correlationIds.size(); i++) {
             RowT row = i < leftInBuf.size() ? leftInBuf.get(i) : 
first(leftInBuf);
-            context().correlatedVariable(row, correlationIds.get(i).getId());
+            int corrId = correlationIds.get(i).getId();
+
+            for (int fieldIndex = correlationColumns.nextSetBit(0); fieldIndex 
!= -1;
+                    fieldIndex = correlationColumns.nextSetBit(fieldIndex + 
1)) {
+                Object value = context().rowAccessor().get(fieldIndex, row);
+                long id = Commons.packIntsToLong(corrId, fieldIndex);
+
+                context().correlatedVariable(id, value);
+            }
         }
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/expressions/SqlExpressionFactoryAdapter.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/expressions/SqlExpressionFactoryAdapter.java
index be3376377d7..3bc84a213fc 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/expressions/SqlExpressionFactoryAdapter.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/expressions/SqlExpressionFactoryAdapter.java
@@ -303,8 +303,8 @@ public class SqlExpressionFactoryAdapter implements 
ExpressionFactory {
         }
 
         @Override
-        public @Nullable RowT correlatedVariable(int id) {
-            return null;
+        public @Nullable Object correlatedVariable(long id) {
+            throw new AssertionError("Should not get here");
         }
 
         @Override
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
index 458e7fdb0eb..171c8772ee7 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
@@ -203,7 +203,13 @@ public class RelJsonReader {
         /** {@inheritDoc} */
         @Override
         public ImmutableBitSet getBitSet(String tag) {
-            return ImmutableBitSet.of(getIntegerList(tag));
+            var list = getIntegerList(tag);
+
+            if (list == null) {
+                return ImmutableBitSet.of();
+            }
+
+            return ImmutableBitSet.of(list);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/DecimalValueMessage.java
similarity index 56%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/DecimalValueMessage.java
index d224c4e50dd..11ea304cf55 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/DecimalValueMessage.java
@@ -17,23 +17,15 @@
 
 package org.apache.ignite.internal.sql.engine.message;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import java.math.BigDecimal;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.network.message.value.SingleValueMessage;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a single {@link BigDecimal} value in a byte array 
representation.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.DECIMAL_VALUE_MESSAGE)
+public interface DecimalValueMessage extends SingleValueMessage<byte[]> {
+    @Override
+    byte[] value();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
index d224c4e50dd..2777e9bb196 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
@@ -17,9 +17,7 @@
 
 package org.apache.ignite.internal.sql.engine.message;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -33,7 +31,6 @@ public interface QueryBatchRequestMessage extends 
ExecutionContextAwareMessage {
     /** Returns amount of batches to request. */
     int amountOfBatches();
 
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+    /** Returns a state that should be propagated to the target fragment. */
+    @Nullable SharedStateMessage sharedStateMessage();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SharedStateMessage.java
similarity index 56%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SharedStateMessage.java
index d224c4e50dd..53dd91f1b89 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SharedStateMessage.java
@@ -17,23 +17,14 @@
 
 package org.apache.ignite.internal.sql.engine.message;
 
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import java.util.Map;
+import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
 
 /**
- * A message to notify remote fragment (aka remote source) that more batches 
required to fulfil the result.
+ * A message that contains a map with correlations.
  */
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage 
{
-    /** Returns an identifier of the exchange to request batches from. */
-    long exchangeId();
-
-    /** Returns amount of batches to request. */
-    int amountOfBatches();
-
-    /** Returns a state that has should be propagated to the target fragment. 
*/
-    @Marshallable
-    @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.SHARED_STATE_MESSAGE)
+public interface SharedStateMessage extends NetworkMessage {
+    Map<Long, NetworkMessage> sharedState();
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
index ea0d1f1509a..85d24072c5b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
@@ -46,4 +46,10 @@ public final class SqlQueryMessageGroup {
 
     /** See {@link CancelOperationResponse} for the details. */
     public static final short OPERATION_CANCEL_RESPONSE = 7;
+
+    /** See {@link SharedStateMessage} for the details. */
+    public static final short SHARED_STATE_MESSAGE = 8;
+
+    /** See {@link DecimalValueMessage} for the details. */
+    public static final short DECIMAL_VALUE_MESSAGE = 9;
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteCorrelatedNestedLoopJoin.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteCorrelatedNestedLoopJoin.java
index 4a7246eba39..8d8b448fca3 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteCorrelatedNestedLoopJoin.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteCorrelatedNestedLoopJoin.java
@@ -30,11 +30,13 @@ import org.apache.calcite.rel.RelCollation;
 import org.apache.calcite.rel.RelCollations;
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Join;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Pair;
 import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCost;
 import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
@@ -51,21 +53,26 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
 public class IgniteCorrelatedNestedLoopJoin extends AbstractIgniteJoin {
     private static final String REL_TYPE_NAME = "CorrelatedNestedLoopJoin";
 
+    private final ImmutableBitSet correlationColumns;
+
     /**
      * Creates a Join.
      *
-     * @param cluster      Cluster
-     * @param traitSet     Trait set
-     * @param left         Left input
-     * @param right        Right input
-     * @param condition    Join condition
-     * @param joinType     Join type
-     * @param variablesSet Set variables that are set by the LHS and used by 
the RHS and are not available to nodes above this Join in the
-     *                     tree
+     * @param cluster Cluster
+     * @param traitSet Trait set
+     * @param left Left input
+     * @param right Right input
+     * @param condition Join condition
+     * @param variablesSet Set variables that are set by the LHS and used by 
the RHS and are not available to nodes above this Join
+     *         in the tree
+     * @param correlationColumns Set of columns that are used by correlation.
+     * @param joinType Join type
      */
     public IgniteCorrelatedNestedLoopJoin(RelOptCluster cluster, RelTraitSet 
traitSet, RelNode left, RelNode right,
-            RexNode condition, Set<CorrelationId> variablesSet, JoinRelType 
joinType) {
+            RexNode condition, Set<CorrelationId> variablesSet, 
ImmutableBitSet correlationColumns, JoinRelType joinType) {
         super(cluster, traitSet, left, right, condition, variablesSet, 
joinType);
+
+        this.correlationColumns = correlationColumns;
     }
 
     /**
@@ -79,14 +86,17 @@ public class IgniteCorrelatedNestedLoopJoin extends 
AbstractIgniteJoin {
                 input.getInputs().get(1),
                 input.getExpression("condition"),
                 
Set.copyOf(Commons.transform(input.getIntegerList("variablesSet"), 
CorrelationId::new)),
-                input.getEnum("joinType", JoinRelType.class));
+                input.getBitSet("correlationColumns"),
+                input.getEnum("joinType", JoinRelType.class)
+        );
     }
 
     /** {@inheritDoc} */
     @Override
     public Join copy(RelTraitSet traitSet, RexNode condition, RelNode left, 
RelNode right, JoinRelType joinType,
             boolean semiJoinDone) {
-        return new IgniteCorrelatedNestedLoopJoin(getCluster(), traitSet, 
left, right, condition, variablesSet, joinType);
+        return new IgniteCorrelatedNestedLoopJoin(getCluster(), traitSet, 
left, right, condition,
+                variablesSet, correlationColumns, joinType);
     }
 
     /** {@inheritDoc} */
@@ -148,6 +158,11 @@ public class IgniteCorrelatedNestedLoopJoin extends 
AbstractIgniteJoin {
                 List.of(left.replace(RelCollations.EMPTY), right));
     }
 
+    @Override public RelWriter explainTerms(RelWriter pw) {
+        return super.explainTerms(pw)
+                .itemIf("correlationColumns", correlationColumns, 
!correlationColumns.isEmpty());
+    }
+
     /** {@inheritDoc} */
     @Override
     public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery 
mq) {
@@ -175,7 +190,11 @@ public class IgniteCorrelatedNestedLoopJoin extends 
AbstractIgniteJoin {
     @Override
     public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
         return new IgniteCorrelatedNestedLoopJoin(cluster, getTraitSet(), 
inputs.get(0), inputs.get(1), getCondition(),
-                getVariablesSet(), getJoinType());
+                getVariablesSet(), getCorrelationColumns(), getJoinType());
+    }
+
+    public ImmutableBitSet getCorrelationColumns() {
+        return correlationColumns;
     }
 
     /** {@inheritDoc} */
@@ -193,6 +212,8 @@ public class IgniteCorrelatedNestedLoopJoin extends 
AbstractIgniteJoin {
 
     @Override
     public IgniteRelWriter explain(IgniteRelWriter writer) {
-        return super.explain(writer).addCorrelatedVariables(variablesSet);
+        return super.explain(writer)
+                .addCorrelatedVariables(variablesSet)
+                .addCorrelationFieldNames(correlationColumns, 
getLeft().getRowType());
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/explain/IgniteRelWriter.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/explain/IgniteRelWriter.java
index e30e01d8abc..cd0d6b58970 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/explain/IgniteRelWriter.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/explain/IgniteRelWriter.java
@@ -197,6 +197,15 @@ public interface IgniteRelWriter {
      */
     IgniteRelWriter addCorrelatedVariables(Set<CorrelationId> variablesSet);
 
+    /**
+     * Adds correlation field names which are set by {@link 
IgniteCorrelatedNestedLoopJoin}.
+     *
+     * @param correlationColumns Set of columns that are used by correlation.
+     * @param rowType The row type against which field indexes in the 
correlation columns should be resolved.
+     * @return This writer instance for chaining.
+     */
+    IgniteRelWriter addCorrelationFieldNames(ImmutableBitSet 
correlationColumns, RelDataType rowType);
+
     /**
      * Adds search bounds used for index look ups.
      *
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/explain/RelTreeToTextWriter.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/explain/RelTreeToTextWriter.java
index 547c6c77724..9a3cb6accea 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/explain/RelTreeToTextWriter.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/explain/RelTreeToTextWriter.java
@@ -134,6 +134,7 @@ class RelTreeToTextWriter {
         AGGREGATION("aggregation"),
         KEY_EXPRESSIONS("key"),
         CORRELATED_VARIABLES("correlates"),
+        CORRELATION_FIELD_NAMES("correlationFieldNames"),
         INVOCATION("invocation"),
         OFFSET("offset"),
         FETCH("fetch"),
@@ -291,6 +292,13 @@ class RelTreeToTextWriter {
             return this;
         }
 
+        @Override
+        public IgniteRelWriter addCorrelationFieldNames(ImmutableBitSet 
correlationColumns, RelDataType rowType) {
+            attributes.put(AttributeName.CORRELATION_FIELD_NAMES, 
beautifyBitSet(correlationColumns, rowType).toString());
+
+            return this;
+        }
+
         @Override
         public IgniteRelWriter addSearchBounds(List<SearchBounds> 
searchBounds) {
             attributes.put(AttributeName.SEARCH_BOUNDS, 
beautifySearchBounds(searchBounds));
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelateToNestedLoopRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelateToNestedLoopRule.java
index 004185cab65..32c773dd901 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelateToNestedLoopRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelateToNestedLoopRule.java
@@ -73,6 +73,7 @@ public class CorrelateToNestedLoopRule extends 
AbstractIgniteConverterRule<Logic
                 right,
                 cluster.getRexBuilder().makeLiteral(true),
                 correlationIds,
+                rel.getRequiredColumns(),
                 rel.getJoinType()
         );
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java
index af9519ad1c9..b5ce777e7bf 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java
@@ -39,6 +39,8 @@ import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexShuttle;
 import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableBitSet.Builder;
 import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
 import 
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
 
@@ -73,6 +75,8 @@ public class CorrelatedNestedLoopJoinRule extends 
AbstractIgniteConverterRule<Lo
         final Set<CorrelationId> correlationIds = new HashSet<>();
         final ArrayList<RexNode> corrVar = new ArrayList<>();
 
+        Builder corrRequiredColumnsBuilder = ImmutableBitSet.builder();
+
         for (int i = 0; i < batchSize; i++) {
             CorrelationId correlationId = cluster.createCorrel();
             correlationIds.add(correlationId);
@@ -88,6 +92,8 @@ public class CorrelatedNestedLoopJoinRule extends 
AbstractIgniteConverterRule<Lo
                     return rexBuilder.makeInputRef(input.getType(), 
input.getIndex() - leftFieldCount);
                 }
 
+                corrRequiredColumnsBuilder.set(field);
+
                 return rexBuilder.makeFieldAccess(corrVar.get(0), field);
             }
         });
@@ -132,6 +138,7 @@ public class CorrelatedNestedLoopJoinRule extends 
AbstractIgniteConverterRule<Lo
                 right,
                 rel.getCondition(),
                 correlationIds,
+                corrRequiredColumnsBuilder.build(),
                 joinType
         );
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
index c6cbcf97768..aa2f3ca9cc8 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
@@ -857,4 +857,15 @@ public final class Commons {
                 offset
         );
     }
+
+    /**
+     * Packs two integers into a single long value.
+     *
+     * @param high The first integer to pack.
+     * @param low The second integer to pack.
+     * @return A long value containing both integers.
+     */
+    public static long packIntsToLong(int high, int low) {
+        return (((long) high) << 32) | (low & 0xffffffffL);
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
index b85dffbdb2a..18dc0d19f9a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
@@ -50,8 +50,8 @@ public enum IgniteMethod {
     /** See {@link SqlEvaluationContext#rowAccessor()}. */
     CONTEXT_ROW_HANDLER(SqlEvaluationContext.class, "rowAccessor"),
 
-    /** See {@link SqlEvaluationContext#correlatedVariable(int)}. */
-    CONTEXT_GET_CORRELATED_VALUE(SqlEvaluationContext.class, 
"correlatedVariable", int.class),
+    /** See {@link SqlEvaluationContext#correlatedVariable(long)}. */
+    CONTEXT_GET_CORRELATED_VALUE(SqlEvaluationContext.class, 
"correlatedVariable", long.class),
 
     /** See {@link IgniteSqlDateTimeUtils#subtractTimeZoneOffset(long, 
TimeZone)}. **/
     SUBTRACT_TIMEZONE_OFFSET(IgniteSqlDateTimeUtils.class, 
"subtractTimeZoneOffset", long.class, TimeZone.class),
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverterTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverterTest.java
new file mode 100644
index 00000000000..8148f595883
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverterTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.Period;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import org.apache.ignite.internal.schema.SchemaTestUtils;
+import org.apache.ignite.internal.sql.engine.message.SharedStateMessage;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.sql.ColumnType;
+import org.hamcrest.Matchers;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for class {@link SharedStateMessageConverter}.
+ */
+public class SharedStateMessageConverterTest extends BaseIgniteAbstractTest {
+    private Random rnd;
+
+    /**
+     * Initialization.
+     */
+    @BeforeEach
+    public void initRandom() {
+        long seed = System.currentTimeMillis();
+
+        log.info("Using seed: " + seed + "L;");
+
+        rnd = new Random(seed);
+    }
+
+    @ParameterizedTest
+    @MethodSource("allTypes")
+    void singleValue(NativeType type) {
+        Object value = SchemaTestUtils.generateRandomValue(rnd, type);
+
+        SharedState state = new SharedState();
+        state.correlatedVariable(1, TypeUtils.toInternal(value, type.spec()));
+
+        SharedState converted = doConversions(state);
+
+        assertThat(converted.correlations(), equalTo(state.correlations()));
+    }
+
+    @Test
+    void multipleValuesWithAllTypes() {
+        SharedState state = new SharedState();
+
+        Set<ColumnType> simpleColumnTypes = 
EnumSet.complementOf(EnumSet.of(ColumnType.STRUCT, ColumnType.NULL));
+
+        int fieldIdx = 0;
+
+        for (ColumnType typeSpec : simpleColumnTypes) {
+            Object value;
+
+            if (typeSpec == ColumnType.PERIOD) {
+                value = Period.between(LocalDate.of(1, 1, 1), LocalDate.now());
+            } else if (typeSpec == ColumnType.DURATION) {
+                value = Duration.ofMillis(Instant.now().toEpochMilli());
+            } else {
+                value = SchemaTestUtils.generateRandomValue(rnd, 
SchemaTestUtils.specToType(typeSpec));
+            }
+
+            state.correlatedVariable(fieldIdx++, TypeUtils.toInternal(value, 
typeSpec));
+        }
+
+        SharedState converted = doConversions(state);
+
+        assertThat(converted.correlations(), equalTo(state.correlations()));
+    }
+
+    @Test
+    void nullValue() {
+        SharedState state = new SharedState();
+        state.correlatedVariable(1, null);
+
+        SharedState converted = doConversions(state);
+
+        assertThat(converted, is(notNullValue()));
+
+        assertThat(converted.correlations(), equalTo(state.correlations()));
+    }
+
+    @Test
+    void nullState() {
+        assertThat(doConversions(null), is(Matchers.nullValue()));
+    }
+
+    @Test
+    void emptyState() {
+        SharedState state = new SharedState();
+
+        SharedState converted = doConversions(state);
+
+        assertThat(converted, is(notNullValue()));
+
+        assertThat(converted.correlations(), is(state.correlations()));
+    }
+
+    private static @Nullable SharedState doConversions(@Nullable SharedState 
state) {
+        SharedStateMessage msg = SharedStateMessageConverter.toMessage(state);
+
+        return SharedStateMessageConverter.fromMessage(msg);
+    }
+
+    private static List<NativeType> allTypes() {
+        return SchemaTestUtils.ALL_TYPES;
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SharedStateTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SharedStateTest.java
new file mode 100644
index 00000000000..d3ffdbb3744
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SharedStateTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.List;
+import java.util.Random;
+import org.apache.ignite.internal.schema.SchemaTestUtils;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.type.NativeType;
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for class {@link SharedState}.
+ */
+public class SharedStateTest extends BaseIgniteAbstractTest {
+    private final SharedState state = new SharedState();
+
+    @ParameterizedTest
+    @MethodSource("allTypes")
+    void canSetValue(NativeType type) {
+        long seed = System.currentTimeMillis();
+
+        log.info("Using seed: " + seed);
+
+        Random rnd = new Random(seed);
+
+        Object value = SchemaTestUtils.generateRandomValue(rnd, type);
+        long corrId = rnd.nextLong();
+
+        state.correlatedVariable(corrId, value);
+
+        assertThat(state.correlatedVariable(corrId), is(value));
+    }
+
+    @Test
+    void canSetNullAsValue() {
+        state.correlatedVariable(1, null);
+
+        assertThat(state.correlatedVariable(1), is(Matchers.nullValue()));
+    }
+
+    @Test
+    void throwsExceptionNonExistingValue() {
+        //noinspection ThrowableNotThrown
+        IgniteTestUtils.assertThrows(
+                IllegalStateException.class,
+                () -> state.correlatedVariable(1),
+                "Correlated variable is not set [id=1]"
+        );
+    }
+
+    private static List<NativeType> allTypes() {
+        return SchemaTestUtils.ALL_TYPES;
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java
index bae4ea2971a..0f89bb78900 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java
@@ -35,6 +35,7 @@ import java.util.stream.StreamSupport;
 import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.sql.engine.api.expressions.RowFactoryFactory;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
@@ -44,14 +45,15 @@ import 
org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.TypeUtils;
 import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.type.StructNativeType;
 import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.EnumSource.Mode;
 
 /** Correlated nested loop join execution tests. */
 public class CorrelatedNestedLoopJoinExecutionTest extends 
AbstractExecutionTest<Object[]> {
-
     @ParameterizedTest
     @EnumSource(value = JoinRelType.class, mode = Mode.INCLUDE, names = 
{"INNER", "LEFT"})
     public void testCorrelatedNestedLoopJoin(JoinRelType joinType) {
@@ -80,6 +82,7 @@ public class CorrelatedNestedLoopJoinExecutionTest extends 
AbstractExecutionTest
                 ctx,
                 (r1, r2) -> Objects.equals(hnd.get(2, r1), hnd.get(0, r2)),
                 Set.of(new CorrelationId(0)),
+                ImmutableBitSet.of(),
                 joinType,
                 
ctx.rowFactoryFactory().create(convertStructuredType(rightType)),
                 identityProjection()
@@ -118,6 +121,78 @@ public class CorrelatedNestedLoopJoinExecutionTest extends 
AbstractExecutionTest
         }
     }
 
+    /**
+     * Tests that correlated variables are correctly set by the correlation 
preparation logic.
+     */
+    @Test
+    public void testCorrelatedVariables() {
+        ExecutionContext<Object[]> ctx = executionContext(true);
+
+        // Left side: T0 (id, name, val)
+        ScanNode<Object[]> left = new ScanNode<>(ctx, Arrays.asList(
+                new Object[]{11, "A", 1},
+                new Object[]{12, "B", 2},
+                new Object[]{13, "C", 3}
+        ));
+
+        // Right side: T1 (id, val)
+        List<Object[]> deps = Arrays.asList(
+                new Object[]{11, 1},
+                new Object[]{12, 1},
+                new Object[]{13, 4}
+        );
+
+        StructNativeType rightType = NativeTypes.structBuilder()
+                .addField("ID", NativeTypes.INT32, false)
+                .addField("VAL", NativeTypes.INT32, true)
+                .build();
+
+        CorrelationId correlationId = new CorrelationId(0);
+        // Correlated columns: T0.id, T0.val
+        List<Integer> correlationColumns = List.of(0, 2);
+
+        ScanNode<Object[]> rightScan = new ScanNode<>(ctx, deps);
+
+        FilterNode<Object[]> rightFilter = new FilterNode<>(ctx, row -> {
+            // Can ignore correlationId because it is 0.
+            Object corr1 = ctx.correlatedVariable(correlationColumns.get(0));
+            Object corr2 = ctx.correlatedVariable(correlationColumns.get(1));
+
+            Object rightVal1 = ctx.rowAccessor().get(0, row);
+            Object rightVal2 = ctx.rowAccessor().get(1, row);
+
+            // Filter: t0.id = t1.id AND t0.val <> t1.val
+            return Objects.equals(corr1, rightVal1) && !Objects.equals(corr2, 
rightVal2);
+        });
+
+        rightFilter.register(rightScan);
+
+        CorrelatedNestedLoopJoinNode<Object[]> join = new 
CorrelatedNestedLoopJoinNode<>(
+                ctx,
+                (r1, r2) -> true,
+                Set.of(correlationId),
+                ImmutableBitSet.of(correlationColumns),
+                INNER,
+                ctx.rowFactoryFactory().create(rightType),
+                identityProjection()
+        );
+
+        join.register(Arrays.asList(left, rightFilter));
+
+        RootNode<Object[]> root = new RootNode<>(ctx);
+        root.register(join);
+
+        Object[][] result = 
StreamSupport.stream(Spliterators.spliteratorUnknownSize(root, 
Spliterator.ORDERED), false)
+                .toArray(Object[][]::new);
+
+        Object[][] expected = {
+                {12, "B", 2, 12, 1},
+                {13, "C", 3, 13, 4}
+        };
+
+        assert2DimArrayEquals(expected, result);
+    }
+
     private static void assert2DimArrayEquals(Object[][] expected, Object[][] 
actual) {
         assertEquals(expected.length, actual.length, "expected length: " + 
expected.length + ", actual length: " + actual.length);
 
@@ -236,6 +311,7 @@ public class CorrelatedNestedLoopJoinExecutionTest extends 
AbstractExecutionTest
                 ctx,
                 (r1, r2) -> Objects.equals(hnd.get(2, r1), hnd.get(0, r2)),
                 Set.of(new CorrelationId(0)),
+                ImmutableBitSet.of(),
                 joinType,
                 
ctx.rowFactoryFactory().create(convertStructuredType(rightType)),
                 identityProjection()
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
index bfd8e97e7f7..19179c5e1e0 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
@@ -24,12 +24,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.List;
 import java.util.Objects;
+import java.util.function.Predicate;
 import java.util.function.UnaryOperator;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.util.ImmutableBitSet;
 import 
org.apache.ignite.internal.sql.engine.framework.TestBuilders.TableBuilder;
 import org.apache.ignite.internal.sql.engine.prepare.bounds.ExactBounds;
 import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
+import 
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
 import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
 import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
 import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
@@ -42,12 +45,12 @@ import org.junit.jupiter.api.Test;
  * CorrelatedNestedLoopJoinPlannerTest.
  * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
  */
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-21286";)
 public class CorrelatedNestedLoopJoinPlannerTest extends AbstractPlannerTest {
     /**
      * Check equi-join. CorrelatedNestedLoopJoinTest is applicable for it.
      */
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21286";)
     public void testValidIndexExpressions() throws Exception {
         IgniteSchema publicSchema = createSchemaFrom(
                 tableA("T0"),
@@ -84,6 +87,7 @@ public class CorrelatedNestedLoopJoinPlannerTest extends 
AbstractPlannerTest {
      * Check join with not equi condition. Current implementation of the 
CorrelatedNestedLoopJoinTest is not applicable for such case.
      */
     @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21286";)
     public void testInvalidIndexExpressions() throws Exception {
         IgniteSchema publicSchema = createSchemaFrom(
                 tableA("T0").andThen(addHashIndex("JID", "ID")),
@@ -102,12 +106,41 @@ public class CorrelatedNestedLoopJoinPlannerTest extends 
AbstractPlannerTest {
         );
     }
 
+    @Test
+    public void testSubqueryCorrelationColumns() throws Exception {
+        IgniteSchema publicSchema = createSchemaFrom(
+                tableA("T0"),
+                tableA("T1")
+        );
+
+        {
+            String sql = "SELECT /*+ disable_decorrelation */ * FROM t0 WHERE "
+                    + "EXISTS(SELECT * FROM t1 WHERE t0.id=t1.id AND 
t0.int_val<>t1.int_val) ";
+
+            Predicate<IgniteCorrelatedNestedLoopJoin> pred = 
isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+                    .and(cnlj -> 
cnlj.getCorrelationColumns().equals(ImmutableBitSet.of(0, 3)));
+
+            assertPlan(sql, List.of(publicSchema), pred);
+        }
+
+        {
+            String sql = "SELECT /*+ disable_decorrelation */ * FROM t0 WHERE "
+                    + "EXISTS(SELECT * FROM t1 WHERE t0.jid=t1.jid AND 
t0.id<>t1.id) ";
+
+            Predicate<IgniteCorrelatedNestedLoopJoin> pred = 
isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+                    .and(cnlj -> 
cnlj.getCorrelationColumns().equals(ImmutableBitSet.of(0, 1)));
+
+            assertPlan(sql, List.of(publicSchema), pred);
+        }
+    }
+
     private static UnaryOperator<TableBuilder> tableA(String tableName) {
         return tableBuilder -> tableBuilder
                 .name(tableName)
                 .addColumn("ID", NativeTypes.INT32)
                 .addColumn("JID", NativeTypes.INT32)
                 .addColumn("VAL", NativeTypes.STRING)
+                .addColumn("INT_VAL", NativeTypes.INT32)
                 .distribution(IgniteDistributions.broadcast());
     }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
index 57edc31ec89..69ca1e92084 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -41,7 +42,9 @@ import 
org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.Mockito;
 
 /**
@@ -183,6 +186,18 @@ public class CommonsTest extends BaseIgniteAbstractTest {
         }
     }
 
+    @ParameterizedTest
+    @MethodSource("packIntsToLongTestCases")
+    void packIntsToLong(int num1, int num2) {
+        long packed = Commons.packIntsToLong(num1, num2);
+
+        int unpackedNum1 = (int) (packed >>> 32);
+        int unpackedNum2 = (int) packed;
+
+        assertThat(unpackedNum1, is(num1));
+        assertThat(unpackedNum2, is(num2));
+    }
+
     private static void expectMapped(Mapping mapping, ImmutableBitSet bitSet, 
ImmutableBitSet expected) {
         assertEquals(expected, Mappings.apply(mapping, bitSet), "direct 
mapping");
 
@@ -196,6 +211,20 @@ public class CommonsTest extends BaseIgniteAbstractTest {
         assertEquals(expected, Mappings.apply(mapping, source));
     }
 
+    private static Stream<Arguments> packIntsToLongTestCases() {
+        return Stream.of(
+                Arguments.of(123, 456),
+                Arguments.of(0, 0),
+                Arguments.of(1, 0),
+                Arguments.of(0, 1),
+                Arguments.of(-1, -1),
+                Arguments.of(100, -200),
+                Arguments.of(-100, 200),
+                Arguments.of(Integer.MAX_VALUE, Integer.MIN_VALUE),
+                Arguments.of(Integer.MIN_VALUE, Integer.MAX_VALUE)
+        );
+    }
+
     /** For test purposes. */
     @FunctionalInterface
     public interface StringConcat {
diff --git a/modules/sql-engine/src/test/resources/mapping/correlated.test 
b/modules/sql-engine/src/test/resources/mapping/correlated.test
index 34d6d8038ae..70d95670e3e 100644
--- a/modules/sql-engine/src/test/resources/mapping/correlated.test
+++ b/modules/sql-engine/src/test/resources/mapping/correlated.test
@@ -324,6 +324,7 @@ Fragment#3
             predicate: true
             type: inner
             correlates: [$cor5]
+            correlationFieldNames: [C1, C2]
             est: (rows=50001)
           TableScan
               table: PUBLIC.CT_N1
@@ -370,6 +371,7 @@ Fragment#2 root
           predicate: true
           type: inner
           correlates: [$cor5]
+          correlationFieldNames: [C1, C2]
           est: (rows=1)
         Receiver
             fieldNames: [C1, C2]
@@ -428,6 +430,7 @@ Fragment#2 root
         predicate: true
         type: inner
         correlates: [$cor3]
+        correlationFieldNames: [ID]
         est: (rows=1)
       Receiver
           fieldNames: [ID, C1, C2]
@@ -514,6 +517,7 @@ Fragment#3 root
         predicate: true
         type: inner
         correlates: [$cor6]
+        correlationFieldNames: [ID]
         est: (rows=1)
       Receiver
           fieldNames: [ID, C1, C2]
@@ -535,6 +539,7 @@ Fragment#3 root
                 predicate: true
                 type: inner
                 correlates: [$cor7]
+                correlationFieldNames: [ID]
                 est: (rows=1)
               Receiver
                   fieldNames: [ID]
@@ -641,6 +646,7 @@ Fragment#3 root
         predicate: true
         type: inner
         correlates: [$cor8]
+        correlationFieldNames: [ID]
         est: (rows=1)
       Receiver
           fieldNames: [ID, C1, C2]
@@ -662,6 +668,7 @@ Fragment#3 root
                 predicate: true
                 type: inner
                 correlates: [$cor9]
+                correlationFieldNames: [ID]
                 est: (rows=1)
               Receiver
                   fieldNames: [ID]
diff --git 
a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test 
b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
index a86fce7b2e1..7fd60a7cf7e 100644
--- a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
+++ b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
@@ -233,6 +233,7 @@ Fragment#2 root
         predicate: true
         type: inner
         correlates: [$cor3]
+        correlationFieldNames: [ID]
         est: (rows=1)
       Receiver
           fieldNames: [ID, C1, C2]
@@ -320,6 +321,7 @@ Fragment#2 root
         predicate: true
         type: inner
         correlates: [$cor3]
+        correlationFieldNames: [ID]
         est: (rows=1)
       Receiver
           fieldNames: [ID, C1, C2]

Reply via email to