This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 01f149382d7 IGNITE-24995 Optimize correaltes serialization when
propagating to another node (#7471)
01f149382d7 is described below
commit 01f149382d71af5f5c48408fe0ec8a3e89bd6522
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Thu Feb 5 15:05:49 2026 +0300
IGNITE-24995 Optimize correaltes serialization when propagating to another
node (#7471)
---
.../internal/network/NetworkMessageTypes.java | 71 ++++++++
.../message/value/BooleanValueMessage.java} | 23 +--
.../message/value/ByteArrayValueMessage.java} | 23 +--
.../network/message/value/ByteValueMessage.java} | 23 +--
.../network/message/value/DoubleValueMessage.java} | 23 +--
.../network/message/value/FloatValueMessage.java} | 23 +--
.../network/message/value/IntValueMessage.java} | 23 +--
.../network/message/value/LongValueMessage.java} | 23 +--
.../network/message/value/NullValueMessage.java} | 22 +--
.../network/message/value/ShortValueMessage.java} | 23 +--
.../network/message/value/SingleValueMessage.java} | 21 +--
.../network/message/value/StringValueMessage.java} | 23 +--
.../network/message/value/UuidValueMessage.java} | 24 +--
.../internal/sql/engine/ItSqlOperatorsTest.java | 2 +-
.../integrationTest/sql/group1/explain/join.test | 1 +
.../integrationTest/sql/group1/explain/scan.test | 2 +
.../sql/engine/exec/ExchangeServiceImpl.java | 7 +-
.../internal/sql/engine/exec/ExecutionContext.java | 9 +-
.../sql/engine/exec/LogicalRelImplementor.java | 2 +-
.../internal/sql/engine/exec/SharedState.java | 42 +++--
.../engine/exec/SharedStateMessageConverter.java | 188 +++++++++++++++++++++
.../sql/engine/exec/SqlEvaluationContext.java | 5 +-
.../sql/engine/exec/exp/CorrelatesBuilder.java | 21 +--
.../engine/exec/exp/CorrelationValueGetter.java | 45 +++++
.../engine/exec/exp/JoinPredicateImplementor.java | 2 +-
.../engine/exec/exp/JoinProjectionImplementor.java | 2 +-
.../sql/engine/exec/exp/PredicateImplementor.java | 2 +-
.../sql/engine/exec/exp/ProjectionImplementor.java | 2 +-
.../engine/exec/exp/RowProviderImplementor.java | 4 +-
.../sql/engine/exec/exp/ScalarImplementor.java | 5 +-
.../exec/rel/CorrelatedNestedLoopJoinNode.java | 21 ++-
.../expressions/SqlExpressionFactoryAdapter.java | 4 +-
.../sql/engine/externalize/RelJsonReader.java | 8 +-
...equestMessage.java => DecimalValueMessage.java} | 22 +--
.../engine/message/QueryBatchRequestMessage.java | 7 +-
...RequestMessage.java => SharedStateMessage.java} | 21 +--
.../sql/engine/message/SqlQueryMessageGroup.java | 6 +
.../engine/rel/IgniteCorrelatedNestedLoopJoin.java | 47 ++++--
.../sql/engine/rel/explain/IgniteRelWriter.java | 9 +
.../engine/rel/explain/RelTreeToTextWriter.java | 8 +
.../sql/engine/rule/CorrelateToNestedLoopRule.java | 1 +
.../engine/rule/CorrelatedNestedLoopJoinRule.java | 7 +
.../ignite/internal/sql/engine/util/Commons.java | 11 ++
.../internal/sql/engine/util/IgniteMethod.java | 4 +-
.../exec/SharedStateMessageConverterTest.java | 141 ++++++++++++++++
.../internal/sql/engine/exec/SharedStateTest.java | 77 +++++++++
.../rel/CorrelatedNestedLoopJoinExecutionTest.java | 78 ++++++++-
.../CorrelatedNestedLoopJoinPlannerTest.java | 35 +++-
.../internal/sql/engine/util/CommonsTest.java | 29 ++++
.../src/test/resources/mapping/correlated.test | 7 +
.../resources/mapping/test_partition_pruning.test | 2 +
51 files changed, 929 insertions(+), 302 deletions(-)
diff --git
a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
index 5ef73fe44ba..fbbd5b4964a 100644
---
a/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java
@@ -25,6 +25,17 @@ import
org.apache.ignite.internal.network.message.FieldDescriptorMessage;
import org.apache.ignite.internal.network.message.InvokeRequest;
import org.apache.ignite.internal.network.message.InvokeResponse;
import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.network.message.value.BooleanValueMessage;
+import org.apache.ignite.internal.network.message.value.ByteArrayValueMessage;
+import org.apache.ignite.internal.network.message.value.ByteValueMessage;
+import org.apache.ignite.internal.network.message.value.DoubleValueMessage;
+import org.apache.ignite.internal.network.message.value.FloatValueMessage;
+import org.apache.ignite.internal.network.message.value.IntValueMessage;
+import org.apache.ignite.internal.network.message.value.LongValueMessage;
+import org.apache.ignite.internal.network.message.value.NullValueMessage;
+import org.apache.ignite.internal.network.message.value.ShortValueMessage;
+import org.apache.ignite.internal.network.message.value.StringValueMessage;
+import org.apache.ignite.internal.network.message.value.UuidValueMessage;
import
org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
import
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
import
org.apache.ignite.internal.network.recovery.message.HandshakeRejectedMessage;
@@ -101,4 +112,64 @@ public class NetworkMessageTypes {
* Type for {@link ProbeMessage}.
*/
public static final short PROBE_MESSAGE = 12;
+
+ /**
+ * Message types that contain a single value of a certain type.
+ */
+ public interface SingleValueMessages {
+ /**
+ * Type for {@link BooleanValueMessage}.
+ */
+ short BOOLEAN_VALUE_MESSAGE = 101;
+
+ /**
+ * Type for {@link ByteValueMessage}.
+ */
+ short BYTE_VALUE_MESSAGE = 102;
+
+ /**
+ * Type for {@link ShortValueMessage}.
+ */
+ short SHORT_VALUE_MESSAGE = 103;
+
+ /**
+ * Type for {@link IntValueMessage}.
+ */
+ short INT_VALUE_MESSAGE = 104;
+
+ /**
+ * Type for {@link LongValueMessage}.
+ */
+ short LONG_VALUE_MESSAGE = 105;
+
+ /**
+ * Type for {@link FloatValueMessage}.
+ */
+ short FLOAT_VALUE_MESSAGE = 106;
+
+ /**
+ * Type for {@link DoubleValueMessage}.
+ */
+ short DOUBLE_VALUE_MESSAGE = 107;
+
+ /**
+ * Type for {@link UuidValueMessage}.
+ */
+ short UUID_VALUE_MESSAGE = 108;
+
+ /**
+ * Type for {@link StringValueMessage}.
+ */
+ short STRING_VALUE_MESSAGE = 109;
+
+ /**
+ * Type for {@link ByteArrayValueMessage}.
+ */
+ short BYTE_ARRAY_VALUE_MESSAGE = 110;
+
+ /**
+ * Type for {@link NullValueMessage}.
+ */
+ short NULL_VALUE_MESSAGE = 111;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/BooleanValueMessage.java
similarity index 52%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/BooleanValueMessage.java
index d224c4e50dd..24fa39537ba 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/BooleanValueMessage.java
@@ -15,25 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
/**
- * A message to notify remote fragment (aka remote source) that more batches
required to fulfil the result.
+ * A message that contains a single {@link Boolean} value.
*/
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage
{
- /** Returns an identifier of the exchange to request batches from. */
- long exchangeId();
-
- /** Returns amount of batches to request. */
- int amountOfBatches();
-
- /** Returns a state that has should be propagated to the target fragment.
*/
- @Marshallable
- @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.BOOLEAN_VALUE_MESSAGE)
+public interface BooleanValueMessage extends SingleValueMessage<Boolean> {
+ @Override
+ Boolean value();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ByteArrayValueMessage.java
similarity index 52%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ByteArrayValueMessage.java
index d224c4e50dd..a141b12e42b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ByteArrayValueMessage.java
@@ -15,25 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
/**
- * A message to notify remote fragment (aka remote source) that more batches
required to fulfil the result.
+ * A message that contains a single {@code byte[]} value.
*/
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage
{
- /** Returns an identifier of the exchange to request batches from. */
- long exchangeId();
-
- /** Returns amount of batches to request. */
- int amountOfBatches();
-
- /** Returns a state that has should be propagated to the target fragment.
*/
- @Marshallable
- @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.BYTE_ARRAY_VALUE_MESSAGE)
+public interface ByteArrayValueMessage extends SingleValueMessage<byte[]> {
+ @Override
+ byte[] value();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ByteValueMessage.java
similarity index 52%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ByteValueMessage.java
index d224c4e50dd..ec66655f6eb 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ByteValueMessage.java
@@ -15,25 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
/**
- * A message to notify remote fragment (aka remote source) that more batches
required to fulfil the result.
+ * A message that contains a single {@link Byte} value.
*/
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage
{
- /** Returns an identifier of the exchange to request batches from. */
- long exchangeId();
-
- /** Returns amount of batches to request. */
- int amountOfBatches();
-
- /** Returns a state that has should be propagated to the target fragment.
*/
- @Marshallable
- @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.BYTE_VALUE_MESSAGE)
+public interface ByteValueMessage extends SingleValueMessage<Byte> {
+ @Override
+ Byte value();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/DoubleValueMessage.java
similarity index 52%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/DoubleValueMessage.java
index d224c4e50dd..2ade90598b6 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/DoubleValueMessage.java
@@ -15,25 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
/**
- * A message to notify remote fragment (aka remote source) that more batches
required to fulfil the result.
+ * A message that contains a single {@link Double} value.
*/
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage
{
- /** Returns an identifier of the exchange to request batches from. */
- long exchangeId();
-
- /** Returns amount of batches to request. */
- int amountOfBatches();
-
- /** Returns a state that has should be propagated to the target fragment.
*/
- @Marshallable
- @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.DOUBLE_VALUE_MESSAGE)
+public interface DoubleValueMessage extends SingleValueMessage<Double> {
+ @Override
+ Double value();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/FloatValueMessage.java
similarity index 52%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/FloatValueMessage.java
index d224c4e50dd..443e897c40a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/FloatValueMessage.java
@@ -15,25 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
/**
- * A message to notify remote fragment (aka remote source) that more batches
required to fulfil the result.
+ * A message that contains a single {@link Float} value.
*/
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage
{
- /** Returns an identifier of the exchange to request batches from. */
- long exchangeId();
-
- /** Returns amount of batches to request. */
- int amountOfBatches();
-
- /** Returns a state that has should be propagated to the target fragment.
*/
- @Marshallable
- @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.FLOAT_VALUE_MESSAGE)
+public interface FloatValueMessage extends SingleValueMessage<Float> {
+ @Override
+ Float value();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/IntValueMessage.java
similarity index 52%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/IntValueMessage.java
index d224c4e50dd..514d39f6121 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/IntValueMessage.java
@@ -15,25 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
/**
- * A message to notify remote fragment (aka remote source) that more batches
required to fulfil the result.
+ * A message that contains a single {@link Integer} value.
*/
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage
{
- /** Returns an identifier of the exchange to request batches from. */
- long exchangeId();
-
- /** Returns amount of batches to request. */
- int amountOfBatches();
-
- /** Returns a state that has should be propagated to the target fragment.
*/
- @Marshallable
- @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.INT_VALUE_MESSAGE)
+public interface IntValueMessage extends SingleValueMessage<Integer> {
+ @Override
+ Integer value();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/LongValueMessage.java
similarity index 52%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/LongValueMessage.java
index d224c4e50dd..4f3506c8be2 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/LongValueMessage.java
@@ -15,25 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
/**
- * A message to notify remote fragment (aka remote source) that more batches
required to fulfil the result.
+ * A message that contains a single {@link Long} value.
*/
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage
{
- /** Returns an identifier of the exchange to request batches from. */
- long exchangeId();
-
- /** Returns amount of batches to request. */
- int amountOfBatches();
-
- /** Returns a state that has should be propagated to the target fragment.
*/
- @Marshallable
- @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.LONG_VALUE_MESSAGE)
+public interface LongValueMessage extends SingleValueMessage<Long> {
+ @Override
+ Long value();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/NullValueMessage.java
similarity index 55%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/NullValueMessage.java
index d224c4e50dd..b341c61551e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/NullValueMessage.java
@@ -15,25 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
import org.jetbrains.annotations.Nullable;
/**
- * A message to notify remote fragment (aka remote source) that more batches
required to fulfil the result.
+ * A message that contains a single {@code null} value.
*/
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage
{
- /** Returns an identifier of the exchange to request batches from. */
- long exchangeId();
-
- /** Returns amount of batches to request. */
- int amountOfBatches();
-
- /** Returns a state that has should be propagated to the target fragment.
*/
- @Marshallable
- @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.NULL_VALUE_MESSAGE)
+public interface NullValueMessage extends SingleValueMessage<Boolean> {
+ @Override
+ @Nullable Boolean value();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ShortValueMessage.java
similarity index 52%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ShortValueMessage.java
index d224c4e50dd..3cfca1f9a5e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/ShortValueMessage.java
@@ -15,25 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
/**
- * A message to notify remote fragment (aka remote source) that more batches
required to fulfil the result.
+ * A message that contains a single {@link Short} value.
*/
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage
{
- /** Returns an identifier of the exchange to request batches from. */
- long exchangeId();
-
- /** Returns amount of batches to request. */
- int amountOfBatches();
-
- /** Returns a state that has should be propagated to the target fragment.
*/
- @Marshallable
- @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.SHORT_VALUE_MESSAGE)
+public interface ShortValueMessage extends SingleValueMessage<Short> {
+ @Override
+ Short value();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/SingleValueMessage.java
similarity index 51%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/SingleValueMessage.java
index d224c4e50dd..5776e024649 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/SingleValueMessage.java
@@ -15,25 +15,14 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
-import org.apache.ignite.internal.network.annotations.Marshallable;
-import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
+import org.apache.ignite.internal.network.NetworkMessage;
import org.jetbrains.annotations.Nullable;
/**
- * A message to notify remote fragment (aka remote source) that more batches
required to fulfil the result.
+ * A message that contains a single value.
*/
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage
{
- /** Returns an identifier of the exchange to request batches from. */
- long exchangeId();
-
- /** Returns amount of batches to request. */
- int amountOfBatches();
-
- /** Returns a state that has should be propagated to the target fragment.
*/
- @Marshallable
- @Nullable SharedState sharedState();
+public interface SingleValueMessage<T> extends NetworkMessage {
+ @Nullable T value();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/StringValueMessage.java
similarity index 52%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/StringValueMessage.java
index d224c4e50dd..185d053f99d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/StringValueMessage.java
@@ -15,25 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
/**
- * A message to notify remote fragment (aka remote source) that more batches
required to fulfil the result.
+ * A message that contains a single {@link String} value.
*/
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage
{
- /** Returns an identifier of the exchange to request batches from. */
- long exchangeId();
-
- /** Returns amount of batches to request. */
- int amountOfBatches();
-
- /** Returns a state that has should be propagated to the target fragment.
*/
- @Marshallable
- @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.STRING_VALUE_MESSAGE)
+public interface StringValueMessage extends SingleValueMessage<String> {
+ @Override
+ String value();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/UuidValueMessage.java
similarity index 52%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to
modules/network/src/main/java/org/apache/ignite/internal/network/message/value/UuidValueMessage.java
index d224c4e50dd..623d4a09ad1 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++
b/modules/network/src/main/java/org/apache/ignite/internal/network/message/value/UuidValueMessage.java
@@ -15,25 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.message;
+package org.apache.ignite.internal.network.message.value;
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import java.util.UUID;
+import
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
/**
- * A message to notify remote fragment (aka remote source) that more batches
required to fulfil the result.
+ * A message that contains a single {@link UUID} value.
*/
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage
{
- /** Returns an identifier of the exchange to request batches from. */
- long exchangeId();
-
- /** Returns amount of batches to request. */
- int amountOfBatches();
-
- /** Returns a state that has should be propagated to the target fragment.
*/
- @Marshallable
- @Nullable SharedState sharedState();
+@Transferable(SingleValueMessages.UUID_VALUE_MESSAGE)
+public interface UuidValueMessage extends SingleValueMessage<UUID> {
+ @Override
+ UUID value();
}
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlOperatorsTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlOperatorsTest.java
index 3129d720b9f..ceaf1baed21 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlOperatorsTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlOperatorsTest.java
@@ -265,7 +265,7 @@ public class ItSqlOperatorsTest extends
BaseSqlIntegrationTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-20162")
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19332")
public void testCollections() {
assertExpression("MAP['a', 1, 'A', 2]").returns(Map.of("a", 1, "A",
2)).check();
assertExpression("ARRAY[1, 2, 3]").returns(List.of(1, 2, 3)).check();
diff --git
a/modules/sql-engine/src/integrationTest/sql/group1/explain/join.test
b/modules/sql-engine/src/integrationTest/sql/group1/explain/join.test
index ef0c072b2a7..aecb770b2fc 100644
--- a/modules/sql-engine/src/integrationTest/sql/group1/explain/join.test
+++ b/modules/sql-engine/src/integrationTest/sql/group1/explain/join.test
@@ -224,6 +224,7 @@ CorrelatedNestedLoopJoin
predicate: true
type: inner
correlates: [$cor1]
+ correlationFieldNames: [X]
est: (rows=100)
TableFunctionScan
fieldNames: [X]
diff --git
a/modules/sql-engine/src/integrationTest/sql/group1/explain/scan.test
b/modules/sql-engine/src/integrationTest/sql/group1/explain/scan.test
index 71639c28a33..33c50e3fb3e 100644
--- a/modules/sql-engine/src/integrationTest/sql/group1/explain/scan.test
+++ b/modules/sql-engine/src/integrationTest/sql/group1/explain/scan.test
@@ -216,6 +216,7 @@ Project
predicate: true
type: left
correlates: [$cor0]
+ correlationFieldNames: [RENAMED_C1]
est: (rows=1)
Exchange
distribution: single
@@ -256,6 +257,7 @@ Project
predicate: true
type: left
correlates: [$cor0]
+ correlationFieldNames: [DOUBLED_C1]
est: (rows=1)
Exchange
distribution: single
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
index 674b8e37f16..2130c3c1368 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
import org.apache.ignite.internal.sql.engine.message.MessageService;
import org.apache.ignite.internal.sql.engine.message.QueryBatchMessage;
import org.apache.ignite.internal.sql.engine.message.QueryBatchRequestMessage;
+import org.apache.ignite.internal.sql.engine.message.SharedStateMessage;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
import org.apache.ignite.internal.util.ExceptionUtils;
@@ -112,7 +113,7 @@ public class ExchangeServiceImpl implements ExchangeService
{
.fragmentId(fragmentId)
.exchangeId(exchangeId)
.amountOfBatches(amountOfBatches)
- .sharedState(state)
+
.sharedStateMessage(SharedStateMessageConverter.toMessage(state))
.build()
);
}
@@ -153,7 +154,9 @@ public class ExchangeServiceImpl implements ExchangeService
{
Consumer<Outbox<?>> onRequestHandler = outbox -> {
try {
- SharedState state = msg.sharedState();
+ SharedStateMessage sharedStateMessage =
msg.sharedStateMessage();
+ SharedState state =
SharedStateMessageConverter.fromMessage(sharedStateMessage);
+
if (state != null) {
outbox.onRewindRequest(node.name(), state,
msg.amountOfBatches());
} else {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index 5716c1fbaa4..d66109f15a9 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.sql.engine.exec;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
-import static org.apache.ignite.internal.sql.engine.util.Commons.cast;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import java.time.Clock;
@@ -359,17 +358,17 @@ public class ExecutionContext<RowT> implements
SqlEvaluationContext<RowT> {
}
@Override
- public RowT correlatedVariable(int id) {
- return cast(sharedState.correlatedVariable(id));
+ public @Nullable Object correlatedVariable(long id) {
+ return sharedState.correlatedVariable(id);
}
/**
* Sets correlated value.
*
- * @param id Correlation ID.
+ * @param id Composite identifier consisting of the correlated variable ID
and the field index.
* @param value Correlated value.
*/
- public void correlatedVariable(Object value, int id) {
+ public void correlatedVariable(long id, @Nullable Object value) {
sharedState.correlatedVariable(id, value);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index dfd117016ca..525f55e7f9b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -370,7 +370,7 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
RowFactory<RowT> rightRowFactory =
ctx.rowFactoryFactory().create(convertStructuredType(rightType));
Node<RowT> node = new CorrelatedNestedLoopJoinNode<>(ctx, cond,
rel.getVariablesSet(),
- rel.getJoinType(), rightRowFactory, joinProjection);
+ rel.getCorrelationColumns(), rel.getJoinType(),
rightRowFactory, joinProjection);
Node<RowT> leftInput = visit(rel.getLeft());
Node<RowT> rightInput = visit(rel.getRight());
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java
index ccb3dbd8ff7..ad3ae378bc0 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedState.java
@@ -17,41 +17,53 @@
package org.apache.ignite.internal.sql.engine.exec;
-import static org.apache.ignite.internal.sql.engine.util.Commons.checkRange;
-
-import java.io.Serializable;
-import org.apache.ignite.internal.sql.engine.util.Commons;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import org.jetbrains.annotations.Nullable;
/**
* This class represents the volatile state that may be propagated from parent
to its children
* during rewind.
*/
-public class SharedState implements Serializable {
- private static final long serialVersionUID = 42L;
+public class SharedState {
+ private final Long2ObjectMap<Object> correlations;
+
+ public SharedState() {
+ this(new Long2ObjectOpenHashMap<>());
+ }
- private Object[] correlations = new Object[16];
+ SharedState(Long2ObjectMap<Object> correlations) {
+ this.correlations = correlations;
+ }
/**
* Gets correlated value.
*
- * @param id Correlation ID.
+ * @param id Composite identifier consisting of the correlated variable ID
and the field index.
* @return Correlated value.
*/
- public Object correlatedVariable(int id) {
- checkRange(correlations, id);
+ public @Nullable Object correlatedVariable(long id) {
+ Object value = correlations.getOrDefault(id, this);
+
+ if (value == this) {
+ throw new IllegalStateException("Correlated variable is not set
[id=" + id + "]");
+ }
- return correlations[id];
+ return value;
}
/**
* Sets correlated value.
*
- * @param id Correlation ID.
+ * @param id Composite identifier consisting of the correlated variable ID
and the field index.
* @param value Correlated value.
*/
- public void correlatedVariable(int id, Object value) {
- correlations = Commons.ensureCapacity(correlations, id + 1);
+ public void correlatedVariable(long id, @Nullable Object value) {
+ correlations.put(id, value);
+ }
- correlations[id] = value;
+ Long2ObjectMap<Object> correlations() {
+ return Long2ObjectMaps.unmodifiable(correlations);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverter.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverter.java
new file mode 100644
index 00000000000..e31c3499d6c
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverter.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.calcite.avatica.util.ByteString;
+import org.apache.ignite.internal.network.NetworkMessage;
+import
org.apache.ignite.internal.network.NetworkMessageTypes.SingleValueMessages;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.value.SingleValueMessage;
+import org.apache.ignite.internal.sql.engine.message.SharedStateMessage;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
+import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Contract;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Converter between {@link SharedState} and {@link SharedStateMessage}.
+ */
+public class SharedStateMessageConverter {
+ /** Message factory. */
+ private static final SqlQueryMessagesFactory SQL_MESSAGE_FACTORY = new
SqlQueryMessagesFactory();
+
+ private static final NetworkMessagesFactory NETOWRK_MESSAGE_FACTORY = new
NetworkMessagesFactory();
+
+ @Contract("null -> null; !null -> !null")
+ static @Nullable SharedStateMessage toMessage(@Nullable SharedState state)
{
+ if (state == null) {
+ return null;
+ }
+
+ Long2ObjectMap<Object> correlations = state.correlations();
+ Map<Long, NetworkMessage> result =
IgniteUtils.newHashMap(correlations.size());
+
+ for (Long2ObjectMap.Entry<Object> entry :
correlations.long2ObjectEntrySet()) {
+ SingleValueMessage<?> msg = toSingleValueMessage(entry.getValue());
+
+ result.put(entry.getLongKey(), msg);
+ }
+
+ return SQL_MESSAGE_FACTORY.sharedStateMessage()
+ .sharedState(result)
+ .build();
+ }
+
+ @Contract("null -> null; !null -> !null")
+ static @Nullable SharedState fromMessage(@Nullable SharedStateMessage
sharedStateMessage) {
+ if (sharedStateMessage == null) {
+ return null;
+ }
+
+ int size = sharedStateMessage.sharedState().size();
+ Long2ObjectMap<Object> correlations = new
Long2ObjectOpenHashMap<>(size);
+
+ for (Map.Entry<Long, NetworkMessage> e :
sharedStateMessage.sharedState().entrySet()) {
+ NetworkMessage networkMessage = e.getValue();
+
+ if (!(networkMessage instanceof SingleValueMessage)) {
+ throw new IllegalArgumentException("Unexpected message type "
+ + "[type=" + networkMessage.messageType() + ", class="
+ networkMessage.getClass() + ']');
+ }
+
+ SingleValueMessage<Object> singleFieldMessage =
((SingleValueMessage<Object>) networkMessage);
+
+ correlations.put(e.getKey().longValue(),
extractFieldValue(singleFieldMessage));
+ }
+
+ return new SharedState(correlations);
+ }
+
+ private static @Nullable Object
extractFieldValue(SingleValueMessage<Object> msg) {
+ Object value = msg.value();
+
+ if (value == null) {
+ return null;
+ }
+
+ if (msg.groupType() == SqlQueryMessageGroup.GROUP_TYPE) {
+ assert msg.messageType() ==
SqlQueryMessageGroup.DECIMAL_VALUE_MESSAGE : msg.messageType();
+
+ return decimalFromBytes((byte[]) value);
+ }
+
+ switch (msg.messageType()) {
+ case SingleValueMessages.BYTE_ARRAY_VALUE_MESSAGE:
+ return new ByteString((byte[]) value);
+
+ case SingleValueMessages.NULL_VALUE_MESSAGE:
+ return null;
+
+ default:
+ return value;
+ }
+ }
+
+ private static BigDecimal decimalFromBytes(byte[] value) {
+ ByteBuffer buffer =
ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN);
+
+ short valScale = buffer.getShort();
+
+ BigInteger integer = new BigInteger(value, Short.BYTES, value.length -
Short.BYTES);
+
+ return new BigDecimal(integer, valScale);
+ }
+
+ private static byte[] decimalToBytes(BigDecimal value) {
+ if (value.scale() > Short.MAX_VALUE || value.scale() <
Short.MIN_VALUE) {
+ throw new UnsupportedOperationException("Decimal scale is out of
range: " + value.scale());
+ }
+
+ byte[] unscaledBytes = value.unscaledValue().toByteArray();
+
+ ByteBuffer buffer = ByteBuffer.allocate(Short.BYTES +
unscaledBytes.length)
+ .order(ByteOrder.LITTLE_ENDIAN);
+
+ buffer.putShort((short) value.scale());
+ buffer.put(unscaledBytes);
+
+ return buffer.array();
+ }
+
+ private static SingleValueMessage<?> toSingleValueMessage(Object value) {
+ if (value == null) {
+ return NETOWRK_MESSAGE_FACTORY.nullValueMessage().build();
+ }
+
+ if (value instanceof Boolean) {
+ return
NETOWRK_MESSAGE_FACTORY.booleanValueMessage().value((Boolean) value).build();
+ }
+ if (value instanceof Byte) {
+ return NETOWRK_MESSAGE_FACTORY.byteValueMessage().value((Byte)
value).build();
+ }
+ if (value instanceof Short) {
+ return NETOWRK_MESSAGE_FACTORY.shortValueMessage().value((Short)
value).build();
+ }
+ if (value instanceof Integer) {
+ return NETOWRK_MESSAGE_FACTORY.intValueMessage().value((Integer)
value).build();
+ }
+ if (value instanceof Long) {
+ return NETOWRK_MESSAGE_FACTORY.longValueMessage().value((Long)
value).build();
+ }
+ if (value instanceof Float) {
+ return NETOWRK_MESSAGE_FACTORY.floatValueMessage().value((Float)
value).build();
+ }
+ if (value instanceof Double) {
+ return NETOWRK_MESSAGE_FACTORY.doubleValueMessage().value((Double)
value).build();
+ }
+ if (value instanceof BigDecimal) {
+ return
SQL_MESSAGE_FACTORY.decimalValueMessage().value(decimalToBytes((BigDecimal)
value)).build();
+ }
+ if (value instanceof UUID) {
+ return NETOWRK_MESSAGE_FACTORY.uuidValueMessage().value((UUID)
value).build();
+ }
+ if (value instanceof String) {
+ return NETOWRK_MESSAGE_FACTORY.stringValueMessage().value((String)
value).build();
+ }
+ if (value instanceof ByteString) {
+ ByteString byteString = (ByteString) value;
+
+ return
NETOWRK_MESSAGE_FACTORY.byteArrayValueMessage().value(byteString.getBytes()).build();
+ }
+
+ throw new IllegalArgumentException("Unsupported type: " +
value.getClass());
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java
index bd7aa510c17..dd7bf8321f4 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/SqlEvaluationContext.java
@@ -21,6 +21,7 @@ import org.apache.calcite.DataContext;
import org.apache.ignite.internal.sql.engine.api.expressions.RowAccessor;
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactoryFactory;
+import org.jetbrains.annotations.Nullable;
/**
* Provides the contextual environment required for evaluating SQL expressions.
@@ -38,6 +39,6 @@ public interface SqlEvaluationContext<RowT> extends
DataContext {
/** Returns a factory capable of producing {@link RowFactory} instances
for the {@code RowT} row representation. */
RowFactoryFactory<RowT> rowFactoryFactory();
- /** Returns row representing correlation source by given correlation id. */
- RowT correlatedVariable(int id);
+ /** Returns the value of a correlated variable identified by the given
correlation id. */
+ @Nullable Object correlatedVariable(long corrId);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java
index 0db383a5e92..3be34a6b8b0 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelatesBuilder.java
@@ -20,27 +20,18 @@ package org.apache.ignite.internal.sql.engine.exec.exp;
import java.util.HashMap;
import java.util.Map;
import org.apache.calcite.linq4j.function.Function1;
-import org.apache.calcite.linq4j.tree.BlockBuilder;
import org.apache.calcite.linq4j.tree.Expression;
-import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.rex.RexCorrelVariable;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import
org.apache.ignite.internal.sql.engine.exec.exp.RexToLixTranslator.InputGetter;
-import org.apache.ignite.internal.sql.engine.util.IgniteMethod;
class CorrelatesBuilder extends RexShuttle {
- private final BlockBuilder builder;
-
private final Expression ctx;
- private final Expression hnd;
-
- private Map<String, FieldGetter> correlates;
+ private Map<String, InputGetter> correlates;
- CorrelatesBuilder(BlockBuilder builder, Expression ctx, Expression hnd) {
- this.builder = builder;
- this.hnd = hnd;
+ CorrelatesBuilder(Expression ctx) {
this.ctx = ctx;
}
@@ -61,15 +52,13 @@ class CorrelatesBuilder extends RexShuttle {
/** {@inheritDoc} */
@Override
public RexNode visitCorrelVariable(RexCorrelVariable variable) {
- Expression corr = builder.append("corr",
- Expressions.call(ctx,
IgniteMethod.CONTEXT_GET_CORRELATED_VALUE.method(),
- Expressions.constant(variable.id.getId())));
-
if (correlates == null) {
correlates = new HashMap<>();
}
- correlates.put(variable.getName(), new FieldGetter(hnd, corr,
variable.getType()));
+ InputGetter inputGetter = new CorrelationValueGetter(ctx, variable);
+
+ correlates.put(variable.getName(), inputGetter);
return variable;
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelationValueGetter.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelationValueGetter.java
new file mode 100644
index 00000000000..7989fa98fb8
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/CorrelationValueGetter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp;
+
+import org.apache.calcite.linq4j.tree.BlockBuilder;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.linq4j.tree.Expressions;
+import org.apache.calcite.rex.RexCorrelVariable;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.IgniteMethod;
+
+class CorrelationValueGetter extends CommonFieldGetter {
+ private final Expression ctx;
+ private final int correlationId;
+
+ CorrelationValueGetter(Expression ctx, RexCorrelVariable variable) {
+ super(null, null, variable.getType());
+
+ this.ctx = ctx;
+ this.correlationId = variable.id.getId();
+ }
+
+ @Override
+ protected Expression fillExpressions(BlockBuilder list, int index) {
+ long id = Commons.packIntsToLong(correlationId, index);
+
+ return list.append("corr",
+ Expressions.call(ctx,
IgniteMethod.CONTEXT_GET_CORRELATED_VALUE.method(), Expressions.constant(id)));
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/JoinPredicateImplementor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/JoinPredicateImplementor.java
index b923210f6f2..42f1ac8941f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/JoinPredicateImplementor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/JoinPredicateImplementor.java
@@ -100,7 +100,7 @@ class JoinPredicateImplementor {
InputGetter inputGetter = new BiFieldGetter(rowHandler, left, right,
type, firstRowSize);
- Function1<String, InputGetter> correlates = new
CorrelatesBuilder(builder, ctx, rowHandler)
+ Function1<String, InputGetter> correlates = new CorrelatesBuilder(ctx)
.build(List.of(predicateExpression));
Expression condition = RexToLixTranslator.translateCondition(program,
typeFactory, builder,
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/JoinProjectionImplementor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/JoinProjectionImplementor.java
index 1060c59b9b5..1e285eba138 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/JoinProjectionImplementor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/JoinProjectionImplementor.java
@@ -113,7 +113,7 @@ class JoinProjectionImplementor {
InputGetter inputGetter = new BiFieldGetter(rowHandler, left, right,
type, firstRowSize);
- Function1<String, InputGetter> correlates = new
CorrelatesBuilder(builder, ctx, rowHandler).build(projections);
+ Function1<String, InputGetter> correlates = new
CorrelatesBuilder(ctx).build(projections);
List<Expression> projects =
RexToLixTranslator.translateProjects(program, typeFactory, conformance,
builder, null, null, ctx, inputGetter, correlates);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/PredicateImplementor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/PredicateImplementor.java
index 463c4700009..2c340be9ef3 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/PredicateImplementor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/PredicateImplementor.java
@@ -98,7 +98,7 @@ class PredicateImplementor {
InputGetter inputGetter = new FieldGetter(rowHandler, row, type);
- Function1<String, InputGetter> correlates = new
CorrelatesBuilder(builder, ctx, rowHandler)
+ Function1<String, InputGetter> correlates = new CorrelatesBuilder(ctx)
.build(List.of(predicateExpression));
Expression condition = RexToLixTranslator.translateCondition(program,
typeFactory, builder,
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ProjectionImplementor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ProjectionImplementor.java
index 029e601f784..23f57503169 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ProjectionImplementor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ProjectionImplementor.java
@@ -111,7 +111,7 @@ class ProjectionImplementor {
InputGetter inputGetter = new FieldGetter(rowHandler, row,
inputRowType);
- Function1<String, InputGetter> correlates = new
CorrelatesBuilder(builder, ctx, rowHandler).build(projections);
+ Function1<String, InputGetter> correlates = new
CorrelatesBuilder(ctx).build(projections);
List<Expression> projects =
RexToLixTranslator.translateProjects(program, typeFactory, conformance,
builder, null, null, ctx, inputGetter, correlates);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RowProviderImplementor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RowProviderImplementor.java
index 47c7f723039..9680f7ec706 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RowProviderImplementor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/RowProviderImplementor.java
@@ -133,9 +133,7 @@ class RowProviderImplementor {
Expressions.declare(Modifier.FINAL, DataContext.ROOT,
Expressions.convert_(ctx, DataContext.class))
);
- Expression rowHandler = builder.append("hnd", Expressions.call(ctx,
IgniteMethod.CONTEXT_ROW_HANDLER.method()));
-
- Function1<String, InputGetter> correlates = new
CorrelatesBuilder(builder, ctx, rowHandler).build(values);
+ Function1<String, InputGetter> correlates = new
CorrelatesBuilder(ctx).build(values);
List<Expression> projects =
RexToLixTranslator.translateProjects(program, typeFactory, conformance,
builder, null, null, ctx, NoOpFieldGetter.INSTANCE,
correlates);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ScalarImplementor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ScalarImplementor.java
index 8ef8cd33c8b..8057d7f3732 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ScalarImplementor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/ScalarImplementor.java
@@ -43,7 +43,6 @@ import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.ignite.internal.sql.engine.exec.SqlEvaluationContext;
import
org.apache.ignite.internal.sql.engine.exec.exp.RexToLixTranslator.InputGetter;
import org.apache.ignite.internal.sql.engine.util.Commons;
-import org.apache.ignite.internal.sql.engine.util.IgniteMethod;
import org.apache.ignite.internal.sql.engine.util.Primitives;
import org.apache.ignite.internal.sql.engine.util.RexUtils;
import org.apache.ignite.internal.sql.engine.util.cache.Cache;
@@ -109,11 +108,9 @@ class ScalarImplementor {
ParameterExpression ctx =
Expressions.parameter(SqlEvaluationContext.class, "ctx");
- Expression rowHandler = builder.append("hnd", Expressions.call(ctx,
IgniteMethod.CONTEXT_ROW_HANDLER.method()));
-
Function1<String, InputGetter> correlates = scalarValue instanceof
RexDynamicParam
? null
- : new CorrelatesBuilder(builder, ctx,
rowHandler).build(List.of(scalarValue));
+ : new CorrelatesBuilder(ctx).build(List.of(scalarValue));
List<Expression> projects =
RexToLixTranslator.translateProjects(program, typeFactory, conformance,
builder, null, null, ctx, NoOpFieldGetter.INSTANCE,
correlates);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
index ecc5bbabe81..0d269190deb 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinNode.java
@@ -28,10 +28,12 @@ import java.util.Set;
import java.util.function.BiPredicate;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.lang.IgniteStringBuilder;
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.exp.SqlJoinProjection;
+import org.apache.ignite.internal.sql.engine.util.Commons;
/**
* CorrelatedNestedLoopJoinNode.
@@ -54,6 +56,8 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends
AbstractNode<RowT> {
private final RowT rightEmptyRow;
+ private final ImmutableBitSet correlationColumns;
+
private int requested;
private int waitingLeft;
@@ -77,9 +81,10 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends
AbstractNode<RowT> {
/**
* Creates CorrelatedNestedLoopJoin node.
*
- * @param ctx Execution context.
+ * @param ctx Execution context.
* @param cond Join expression.
- * @param correlationIds Set of collections ids.
+ * @param correlationIds Set of correlation ids.
+ * @param correlationColumns Set of columns that are used by correlation.
* @param joinType Join rel type.
* @param rightRowFactory Right row factory.
* @param joinProjection Output row factory.
@@ -88,6 +93,7 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends
AbstractNode<RowT> {
ExecutionContext<RowT> ctx,
BiPredicate<RowT, RowT> cond,
Set<CorrelationId> correlationIds,
+ ImmutableBitSet correlationColumns,
JoinRelType joinType,
RowFactory<RowT> rightRowFactory,
SqlJoinProjection joinProjection
@@ -101,6 +107,7 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends
AbstractNode<RowT> {
this.correlationIds = new ArrayList<>(correlationIds);
this.joinType = joinType;
this.joinProjection = joinProjection;
+ this.correlationColumns = correlationColumns;
leftInBufferSize = correlationIds.size();
rightInBufferSize = inBufSize;
@@ -485,7 +492,15 @@ public class CorrelatedNestedLoopJoinNode<RowT> extends
AbstractNode<RowT> {
private void prepareCorrelations() {
for (int i = 0; i < correlationIds.size(); i++) {
RowT row = i < leftInBuf.size() ? leftInBuf.get(i) :
first(leftInBuf);
- context().correlatedVariable(row, correlationIds.get(i).getId());
+ int corrId = correlationIds.get(i).getId();
+
+ for (int fieldIndex = correlationColumns.nextSetBit(0); fieldIndex
!= -1;
+ fieldIndex = correlationColumns.nextSetBit(fieldIndex +
1)) {
+ Object value = context().rowAccessor().get(fieldIndex, row);
+ long id = Commons.packIntsToLong(corrId, fieldIndex);
+
+ context().correlatedVariable(id, value);
+ }
}
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/expressions/SqlExpressionFactoryAdapter.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/expressions/SqlExpressionFactoryAdapter.java
index be3376377d7..3bc84a213fc 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/expressions/SqlExpressionFactoryAdapter.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/expressions/SqlExpressionFactoryAdapter.java
@@ -303,8 +303,8 @@ public class SqlExpressionFactoryAdapter implements
ExpressionFactory {
}
@Override
- public @Nullable RowT correlatedVariable(int id) {
- return null;
+ public @Nullable Object correlatedVariable(long id) {
+ throw new AssertionError("Should not get here");
}
@Override
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
index 458e7fdb0eb..171c8772ee7 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/externalize/RelJsonReader.java
@@ -203,7 +203,13 @@ public class RelJsonReader {
/** {@inheritDoc} */
@Override
public ImmutableBitSet getBitSet(String tag) {
- return ImmutableBitSet.of(getIntegerList(tag));
+ var list = getIntegerList(tag);
+
+ if (list == null) {
+ return ImmutableBitSet.of();
+ }
+
+ return ImmutableBitSet.of(list);
}
/** {@inheritDoc} */
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/DecimalValueMessage.java
similarity index 56%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/DecimalValueMessage.java
index d224c4e50dd..11ea304cf55 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/DecimalValueMessage.java
@@ -17,23 +17,15 @@
package org.apache.ignite.internal.sql.engine.message;
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import java.math.BigDecimal;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.internal.network.message.value.SingleValueMessage;
/**
- * A message to notify remote fragment (aka remote source) that more batches
required to fulfil the result.
+ * A message that contains a single {@link BigDecimal} value in a byte array
representation.
*/
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage
{
- /** Returns an identifier of the exchange to request batches from. */
- long exchangeId();
-
- /** Returns amount of batches to request. */
- int amountOfBatches();
-
- /** Returns a state that has should be propagated to the target fragment.
*/
- @Marshallable
- @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.DECIMAL_VALUE_MESSAGE)
+public interface DecimalValueMessage extends SingleValueMessage<byte[]> {
+ @Override
+ byte[] value();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
index d224c4e50dd..2777e9bb196 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
@@ -17,9 +17,7 @@
package org.apache.ignite.internal.sql.engine.message;
-import org.apache.ignite.internal.network.annotations.Marshallable;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
import org.jetbrains.annotations.Nullable;
/**
@@ -33,7 +31,6 @@ public interface QueryBatchRequestMessage extends
ExecutionContextAwareMessage {
/** Returns amount of batches to request. */
int amountOfBatches();
- /** Returns a state that has should be propagated to the target fragment.
*/
- @Marshallable
- @Nullable SharedState sharedState();
+ /** Returns a state that should be propagated to the target fragment. */
+ @Nullable SharedStateMessage sharedStateMessage();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SharedStateMessage.java
similarity index 56%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
copy to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SharedStateMessage.java
index d224c4e50dd..53dd91f1b89 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SharedStateMessage.java
@@ -17,23 +17,14 @@
package org.apache.ignite.internal.sql.engine.message;
-import org.apache.ignite.internal.network.annotations.Marshallable;
+import java.util.Map;
+import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.annotations.Transferable;
-import org.apache.ignite.internal.sql.engine.exec.SharedState;
-import org.jetbrains.annotations.Nullable;
/**
- * A message to notify remote fragment (aka remote source) that more batches
required to fulfil the result.
+ * A message that contains a map with correlations.
*/
-@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
-public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage
{
- /** Returns an identifier of the exchange to request batches from. */
- long exchangeId();
-
- /** Returns amount of batches to request. */
- int amountOfBatches();
-
- /** Returns a state that has should be propagated to the target fragment.
*/
- @Marshallable
- @Nullable SharedState sharedState();
+@Transferable(SqlQueryMessageGroup.SHARED_STATE_MESSAGE)
+public interface SharedStateMessage extends NetworkMessage {
+ Map<Long, NetworkMessage> sharedState();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
index ea0d1f1509a..85d24072c5b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
@@ -46,4 +46,10 @@ public final class SqlQueryMessageGroup {
/** See {@link CancelOperationResponse} for the details. */
public static final short OPERATION_CANCEL_RESPONSE = 7;
+
+ /** See {@link SharedStateMessage} for the details. */
+ public static final short SHARED_STATE_MESSAGE = 8;
+
+ /** See {@link DecimalValueMessage} for the details. */
+ public static final short DECIMAL_VALUE_MESSAGE = 9;
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteCorrelatedNestedLoopJoin.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteCorrelatedNestedLoopJoin.java
index 4a7246eba39..8d8b448fca3 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteCorrelatedNestedLoopJoin.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/IgniteCorrelatedNestedLoopJoin.java
@@ -30,11 +30,13 @@ import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCost;
import org.apache.ignite.internal.sql.engine.metadata.cost.IgniteCostFactory;
@@ -51,21 +53,26 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
public class IgniteCorrelatedNestedLoopJoin extends AbstractIgniteJoin {
private static final String REL_TYPE_NAME = "CorrelatedNestedLoopJoin";
+ private final ImmutableBitSet correlationColumns;
+
/**
* Creates a Join.
*
- * @param cluster Cluster
- * @param traitSet Trait set
- * @param left Left input
- * @param right Right input
- * @param condition Join condition
- * @param joinType Join type
- * @param variablesSet Set variables that are set by the LHS and used by
the RHS and are not available to nodes above this Join in the
- * tree
+ * @param cluster Cluster
+ * @param traitSet Trait set
+ * @param left Left input
+ * @param right Right input
+ * @param condition Join condition
+ * @param variablesSet Set variables that are set by the LHS and used by
the RHS and are not available to nodes above this Join
+ * in the tree
+ * @param correlationColumns Set of columns that are used by correlation.
+ * @param joinType Join type
*/
public IgniteCorrelatedNestedLoopJoin(RelOptCluster cluster, RelTraitSet
traitSet, RelNode left, RelNode right,
- RexNode condition, Set<CorrelationId> variablesSet, JoinRelType
joinType) {
+ RexNode condition, Set<CorrelationId> variablesSet,
ImmutableBitSet correlationColumns, JoinRelType joinType) {
super(cluster, traitSet, left, right, condition, variablesSet,
joinType);
+
+ this.correlationColumns = correlationColumns;
}
/**
@@ -79,14 +86,17 @@ public class IgniteCorrelatedNestedLoopJoin extends
AbstractIgniteJoin {
input.getInputs().get(1),
input.getExpression("condition"),
Set.copyOf(Commons.transform(input.getIntegerList("variablesSet"),
CorrelationId::new)),
- input.getEnum("joinType", JoinRelType.class));
+ input.getBitSet("correlationColumns"),
+ input.getEnum("joinType", JoinRelType.class)
+ );
}
/** {@inheritDoc} */
@Override
public Join copy(RelTraitSet traitSet, RexNode condition, RelNode left,
RelNode right, JoinRelType joinType,
boolean semiJoinDone) {
- return new IgniteCorrelatedNestedLoopJoin(getCluster(), traitSet,
left, right, condition, variablesSet, joinType);
+ return new IgniteCorrelatedNestedLoopJoin(getCluster(), traitSet,
left, right, condition,
+ variablesSet, correlationColumns, joinType);
}
/** {@inheritDoc} */
@@ -148,6 +158,11 @@ public class IgniteCorrelatedNestedLoopJoin extends
AbstractIgniteJoin {
List.of(left.replace(RelCollations.EMPTY), right));
}
+ @Override public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw)
+ .itemIf("correlationColumns", correlationColumns,
!correlationColumns.isEmpty());
+ }
+
/** {@inheritDoc} */
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery
mq) {
@@ -175,7 +190,11 @@ public class IgniteCorrelatedNestedLoopJoin extends
AbstractIgniteJoin {
@Override
public IgniteRel clone(RelOptCluster cluster, List<IgniteRel> inputs) {
return new IgniteCorrelatedNestedLoopJoin(cluster, getTraitSet(),
inputs.get(0), inputs.get(1), getCondition(),
- getVariablesSet(), getJoinType());
+ getVariablesSet(), getCorrelationColumns(), getJoinType());
+ }
+
+ public ImmutableBitSet getCorrelationColumns() {
+ return correlationColumns;
}
/** {@inheritDoc} */
@@ -193,6 +212,8 @@ public class IgniteCorrelatedNestedLoopJoin extends
AbstractIgniteJoin {
@Override
public IgniteRelWriter explain(IgniteRelWriter writer) {
- return super.explain(writer).addCorrelatedVariables(variablesSet);
+ return super.explain(writer)
+ .addCorrelatedVariables(variablesSet)
+ .addCorrelationFieldNames(correlationColumns,
getLeft().getRowType());
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/explain/IgniteRelWriter.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/explain/IgniteRelWriter.java
index e30e01d8abc..cd0d6b58970 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/explain/IgniteRelWriter.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/explain/IgniteRelWriter.java
@@ -197,6 +197,15 @@ public interface IgniteRelWriter {
*/
IgniteRelWriter addCorrelatedVariables(Set<CorrelationId> variablesSet);
+ /**
+ * Adds correlation field names which are set by {@link
IgniteCorrelatedNestedLoopJoin}.
+ *
+ * @param correlationColumns Set of columns that are used by correlation.
+ * @param rowType The row type against which field indexes in the
correlation columns should be resolved.
+ * @return This writer instance for chaining.
+ */
+ IgniteRelWriter addCorrelationFieldNames(ImmutableBitSet
correlationColumns, RelDataType rowType);
+
/**
* Adds search bounds used for index look ups.
*
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/explain/RelTreeToTextWriter.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/explain/RelTreeToTextWriter.java
index 547c6c77724..9a3cb6accea 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/explain/RelTreeToTextWriter.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rel/explain/RelTreeToTextWriter.java
@@ -134,6 +134,7 @@ class RelTreeToTextWriter {
AGGREGATION("aggregation"),
KEY_EXPRESSIONS("key"),
CORRELATED_VARIABLES("correlates"),
+ CORRELATION_FIELD_NAMES("correlationFieldNames"),
INVOCATION("invocation"),
OFFSET("offset"),
FETCH("fetch"),
@@ -291,6 +292,13 @@ class RelTreeToTextWriter {
return this;
}
+ @Override
+ public IgniteRelWriter addCorrelationFieldNames(ImmutableBitSet
correlationColumns, RelDataType rowType) {
+ attributes.put(AttributeName.CORRELATION_FIELD_NAMES,
beautifyBitSet(correlationColumns, rowType).toString());
+
+ return this;
+ }
+
@Override
public IgniteRelWriter addSearchBounds(List<SearchBounds>
searchBounds) {
attributes.put(AttributeName.SEARCH_BOUNDS,
beautifySearchBounds(searchBounds));
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelateToNestedLoopRule.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelateToNestedLoopRule.java
index 004185cab65..32c773dd901 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelateToNestedLoopRule.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelateToNestedLoopRule.java
@@ -73,6 +73,7 @@ public class CorrelateToNestedLoopRule extends
AbstractIgniteConverterRule<Logic
right,
cluster.getRexBuilder().makeLiteral(true),
correlationIds,
+ rel.getRequiredColumns(),
rel.getJoinType()
);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java
index af9519ad1c9..b5ce777e7bf 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/CorrelatedNestedLoopJoinRule.java
@@ -39,6 +39,8 @@ import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableBitSet.Builder;
import org.apache.ignite.internal.sql.engine.rel.IgniteConvention;
import
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
@@ -73,6 +75,8 @@ public class CorrelatedNestedLoopJoinRule extends
AbstractIgniteConverterRule<Lo
final Set<CorrelationId> correlationIds = new HashSet<>();
final ArrayList<RexNode> corrVar = new ArrayList<>();
+ Builder corrRequiredColumnsBuilder = ImmutableBitSet.builder();
+
for (int i = 0; i < batchSize; i++) {
CorrelationId correlationId = cluster.createCorrel();
correlationIds.add(correlationId);
@@ -88,6 +92,8 @@ public class CorrelatedNestedLoopJoinRule extends
AbstractIgniteConverterRule<Lo
return rexBuilder.makeInputRef(input.getType(),
input.getIndex() - leftFieldCount);
}
+ corrRequiredColumnsBuilder.set(field);
+
return rexBuilder.makeFieldAccess(corrVar.get(0), field);
}
});
@@ -132,6 +138,7 @@ public class CorrelatedNestedLoopJoinRule extends
AbstractIgniteConverterRule<Lo
right,
rel.getCondition(),
correlationIds,
+ corrRequiredColumnsBuilder.build(),
joinType
);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
index c6cbcf97768..aa2f3ca9cc8 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
@@ -857,4 +857,15 @@ public final class Commons {
offset
);
}
+
+ /**
+ * Packs two integers into a single long value.
+ *
+ * @param high The first integer to pack.
+ * @param low The second integer to pack.
+ * @return A long value containing both integers.
+ */
+ public static long packIntsToLong(int high, int low) {
+ return (((long) high) << 32) | (low & 0xffffffffL);
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
index b85dffbdb2a..18dc0d19f9a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IgniteMethod.java
@@ -50,8 +50,8 @@ public enum IgniteMethod {
/** See {@link SqlEvaluationContext#rowAccessor()}. */
CONTEXT_ROW_HANDLER(SqlEvaluationContext.class, "rowAccessor"),
- /** See {@link SqlEvaluationContext#correlatedVariable(int)}. */
- CONTEXT_GET_CORRELATED_VALUE(SqlEvaluationContext.class,
"correlatedVariable", int.class),
+ /** See {@link SqlEvaluationContext#correlatedVariable(long)}. */
+ CONTEXT_GET_CORRELATED_VALUE(SqlEvaluationContext.class,
"correlatedVariable", long.class),
/** See {@link IgniteSqlDateTimeUtils#subtractTimeZoneOffset(long,
TimeZone)}. **/
SUBTRACT_TIMEZONE_OFFSET(IgniteSqlDateTimeUtils.class,
"subtractTimeZoneOffset", long.class, TimeZone.class),
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverterTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverterTest.java
new file mode 100644
index 00000000000..8148f595883
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SharedStateMessageConverterTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.Period;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import org.apache.ignite.internal.schema.SchemaTestUtils;
+import org.apache.ignite.internal.sql.engine.message.SharedStateMessage;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.sql.ColumnType;
+import org.hamcrest.Matchers;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for class {@link SharedStateMessageConverter}.
+ */
+public class SharedStateMessageConverterTest extends BaseIgniteAbstractTest {
+ private Random rnd;
+
+ /**
+ * Initialization.
+ */
+ @BeforeEach
+ public void initRandom() {
+ long seed = System.currentTimeMillis();
+
+ log.info("Using seed: " + seed + "L;");
+
+ rnd = new Random(seed);
+ }
+
+ @ParameterizedTest
+ @MethodSource("allTypes")
+ void singleValue(NativeType type) {
+ Object value = SchemaTestUtils.generateRandomValue(rnd, type);
+
+ SharedState state = new SharedState();
+ state.correlatedVariable(1, TypeUtils.toInternal(value, type.spec()));
+
+ SharedState converted = doConversions(state);
+
+ assertThat(converted.correlations(), equalTo(state.correlations()));
+ }
+
+ @Test
+ void multipleValuesWithAllTypes() {
+ SharedState state = new SharedState();
+
+ Set<ColumnType> simpleColumnTypes =
EnumSet.complementOf(EnumSet.of(ColumnType.STRUCT, ColumnType.NULL));
+
+ int fieldIdx = 0;
+
+ for (ColumnType typeSpec : simpleColumnTypes) {
+ Object value;
+
+ if (typeSpec == ColumnType.PERIOD) {
+ value = Period.between(LocalDate.of(1, 1, 1), LocalDate.now());
+ } else if (typeSpec == ColumnType.DURATION) {
+ value = Duration.ofMillis(Instant.now().toEpochMilli());
+ } else {
+ value = SchemaTestUtils.generateRandomValue(rnd,
SchemaTestUtils.specToType(typeSpec));
+ }
+
+ state.correlatedVariable(fieldIdx++, TypeUtils.toInternal(value,
typeSpec));
+ }
+
+ SharedState converted = doConversions(state);
+
+ assertThat(converted.correlations(), equalTo(state.correlations()));
+ }
+
+ @Test
+ void nullValue() {
+ SharedState state = new SharedState();
+ state.correlatedVariable(1, null);
+
+ SharedState converted = doConversions(state);
+
+ assertThat(converted, is(notNullValue()));
+
+ assertThat(converted.correlations(), equalTo(state.correlations()));
+ }
+
+ @Test
+ void nullState() {
+ assertThat(doConversions(null), is(Matchers.nullValue()));
+ }
+
+ @Test
+ void emptyState() {
+ SharedState state = new SharedState();
+
+ SharedState converted = doConversions(state);
+
+ assertThat(converted, is(notNullValue()));
+
+ assertThat(converted.correlations(), is(state.correlations()));
+ }
+
+ private static @Nullable SharedState doConversions(@Nullable SharedState
state) {
+ SharedStateMessage msg = SharedStateMessageConverter.toMessage(state);
+
+ return SharedStateMessageConverter.fromMessage(msg);
+ }
+
+ private static List<NativeType> allTypes() {
+ return SchemaTestUtils.ALL_TYPES;
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SharedStateTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SharedStateTest.java
new file mode 100644
index 00000000000..d3ffdbb3744
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/SharedStateTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.List;
+import java.util.Random;
+import org.apache.ignite.internal.schema.SchemaTestUtils;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.type.NativeType;
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for class {@link SharedState}.
+ */
+public class SharedStateTest extends BaseIgniteAbstractTest {
+ private final SharedState state = new SharedState();
+
+ @ParameterizedTest
+ @MethodSource("allTypes")
+ void canSetValue(NativeType type) {
+ long seed = System.currentTimeMillis();
+
+ log.info("Using seed: " + seed);
+
+ Random rnd = new Random(seed);
+
+ Object value = SchemaTestUtils.generateRandomValue(rnd, type);
+ long corrId = rnd.nextLong();
+
+ state.correlatedVariable(corrId, value);
+
+ assertThat(state.correlatedVariable(corrId), is(value));
+ }
+
+ @Test
+ void canSetNullAsValue() {
+ state.correlatedVariable(1, null);
+
+ assertThat(state.correlatedVariable(1), is(Matchers.nullValue()));
+ }
+
+ @Test
+ void throwsExceptionNonExistingValue() {
+ //noinspection ThrowableNotThrown
+ IgniteTestUtils.assertThrows(
+ IllegalStateException.class,
+ () -> state.correlatedVariable(1),
+ "Correlated variable is not set [id=1]"
+ );
+ }
+
+ private static List<NativeType> allTypes() {
+ return SchemaTestUtils.ALL_TYPES;
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java
index bae4ea2971a..0f89bb78900 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/CorrelatedNestedLoopJoinExecutionTest.java
@@ -35,6 +35,7 @@ import java.util.stream.StreamSupport;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.sql.engine.api.expressions.RowFactoryFactory;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
@@ -44,14 +45,15 @@ import
org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.type.NativeTypes;
+import org.apache.ignite.internal.type.StructNativeType;
import org.junit.jupiter.api.Assumptions;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.EnumSource.Mode;
/** Correlated nested loop join execution tests. */
public class CorrelatedNestedLoopJoinExecutionTest extends
AbstractExecutionTest<Object[]> {
-
@ParameterizedTest
@EnumSource(value = JoinRelType.class, mode = Mode.INCLUDE, names =
{"INNER", "LEFT"})
public void testCorrelatedNestedLoopJoin(JoinRelType joinType) {
@@ -80,6 +82,7 @@ public class CorrelatedNestedLoopJoinExecutionTest extends
AbstractExecutionTest
ctx,
(r1, r2) -> Objects.equals(hnd.get(2, r1), hnd.get(0, r2)),
Set.of(new CorrelationId(0)),
+ ImmutableBitSet.of(),
joinType,
ctx.rowFactoryFactory().create(convertStructuredType(rightType)),
identityProjection()
@@ -118,6 +121,78 @@ public class CorrelatedNestedLoopJoinExecutionTest extends
AbstractExecutionTest
}
}
+ /**
+ * Tests that correlated variables are correctly set by the correlation
preparation logic.
+ */
+ @Test
+ public void testCorrelatedVariables() {
+ ExecutionContext<Object[]> ctx = executionContext(true);
+
+ // Left side: T0 (id, name, val)
+ ScanNode<Object[]> left = new ScanNode<>(ctx, Arrays.asList(
+ new Object[]{11, "A", 1},
+ new Object[]{12, "B", 2},
+ new Object[]{13, "C", 3}
+ ));
+
+ // Right side: T1 (id, val)
+ List<Object[]> deps = Arrays.asList(
+ new Object[]{11, 1},
+ new Object[]{12, 1},
+ new Object[]{13, 4}
+ );
+
+ StructNativeType rightType = NativeTypes.structBuilder()
+ .addField("ID", NativeTypes.INT32, false)
+ .addField("VAL", NativeTypes.INT32, true)
+ .build();
+
+ CorrelationId correlationId = new CorrelationId(0);
+ // Correlated columns: T0.id, T0.val
+ List<Integer> correlationColumns = List.of(0, 2);
+
+ ScanNode<Object[]> rightScan = new ScanNode<>(ctx, deps);
+
+ FilterNode<Object[]> rightFilter = new FilterNode<>(ctx, row -> {
+ // Can ignore correlationId because it is 0.
+ Object corr1 = ctx.correlatedVariable(correlationColumns.get(0));
+ Object corr2 = ctx.correlatedVariable(correlationColumns.get(1));
+
+ Object rightVal1 = ctx.rowAccessor().get(0, row);
+ Object rightVal2 = ctx.rowAccessor().get(1, row);
+
+ // Filter: t0.id = t1.id AND t0.val <> t1.val
+ return Objects.equals(corr1, rightVal1) && !Objects.equals(corr2,
rightVal2);
+ });
+
+ rightFilter.register(rightScan);
+
+ CorrelatedNestedLoopJoinNode<Object[]> join = new
CorrelatedNestedLoopJoinNode<>(
+ ctx,
+ (r1, r2) -> true,
+ Set.of(correlationId),
+ ImmutableBitSet.of(correlationColumns),
+ INNER,
+ ctx.rowFactoryFactory().create(rightType),
+ identityProjection()
+ );
+
+ join.register(Arrays.asList(left, rightFilter));
+
+ RootNode<Object[]> root = new RootNode<>(ctx);
+ root.register(join);
+
+ Object[][] result =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(root,
Spliterator.ORDERED), false)
+ .toArray(Object[][]::new);
+
+ Object[][] expected = {
+ {12, "B", 2, 12, 1},
+ {13, "C", 3, 13, 4}
+ };
+
+ assert2DimArrayEquals(expected, result);
+ }
+
private static void assert2DimArrayEquals(Object[][] expected, Object[][]
actual) {
assertEquals(expected.length, actual.length, "expected length: " +
expected.length + ", actual length: " + actual.length);
@@ -236,6 +311,7 @@ public class CorrelatedNestedLoopJoinExecutionTest extends
AbstractExecutionTest
ctx,
(r1, r2) -> Objects.equals(hnd.get(2, r1), hnd.get(0, r2)),
Set.of(new CorrelationId(0)),
+ ImmutableBitSet.of(),
joinType,
ctx.rowFactoryFactory().create(convertStructuredType(rightType)),
identityProjection()
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
index bfd8e97e7f7..19179c5e1e0 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
@@ -24,12 +24,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
import java.util.Objects;
+import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.util.ImmutableBitSet;
import
org.apache.ignite.internal.sql.engine.framework.TestBuilders.TableBuilder;
import org.apache.ignite.internal.sql.engine.prepare.bounds.ExactBounds;
import org.apache.ignite.internal.sql.engine.prepare.bounds.SearchBounds;
+import
org.apache.ignite.internal.sql.engine.rel.IgniteCorrelatedNestedLoopJoin;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
@@ -42,12 +45,12 @@ import org.junit.jupiter.api.Test;
* CorrelatedNestedLoopJoinPlannerTest.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
-@Disabled("https://issues.apache.org/jira/browse/IGNITE-21286")
public class CorrelatedNestedLoopJoinPlannerTest extends AbstractPlannerTest {
/**
* Check equi-join. CorrelatedNestedLoopJoinTest is applicable for it.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-21286")
public void testValidIndexExpressions() throws Exception {
IgniteSchema publicSchema = createSchemaFrom(
tableA("T0"),
@@ -84,6 +87,7 @@ public class CorrelatedNestedLoopJoinPlannerTest extends
AbstractPlannerTest {
* Check join with not equi condition. Current implementation of the
CorrelatedNestedLoopJoinTest is not applicable for such case.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-21286")
public void testInvalidIndexExpressions() throws Exception {
IgniteSchema publicSchema = createSchemaFrom(
tableA("T0").andThen(addHashIndex("JID", "ID")),
@@ -102,12 +106,41 @@ public class CorrelatedNestedLoopJoinPlannerTest extends
AbstractPlannerTest {
);
}
+ @Test
+ public void testSubqueryCorrelationColumns() throws Exception {
+ IgniteSchema publicSchema = createSchemaFrom(
+ tableA("T0"),
+ tableA("T1")
+ );
+
+ {
+ String sql = "SELECT /*+ disable_decorrelation */ * FROM t0 WHERE "
+ + "EXISTS(SELECT * FROM t1 WHERE t0.id=t1.id AND
t0.int_val<>t1.int_val) ";
+
+ Predicate<IgniteCorrelatedNestedLoopJoin> pred =
isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+ .and(cnlj ->
cnlj.getCorrelationColumns().equals(ImmutableBitSet.of(0, 3)));
+
+ assertPlan(sql, List.of(publicSchema), pred);
+ }
+
+ {
+ String sql = "SELECT /*+ disable_decorrelation */ * FROM t0 WHERE "
+ + "EXISTS(SELECT * FROM t1 WHERE t0.jid=t1.jid AND
t0.id<>t1.id) ";
+
+ Predicate<IgniteCorrelatedNestedLoopJoin> pred =
isInstanceOf(IgniteCorrelatedNestedLoopJoin.class)
+ .and(cnlj ->
cnlj.getCorrelationColumns().equals(ImmutableBitSet.of(0, 1)));
+
+ assertPlan(sql, List.of(publicSchema), pred);
+ }
+ }
+
private static UnaryOperator<TableBuilder> tableA(String tableName) {
return tableBuilder -> tableBuilder
.name(tableName)
.addColumn("ID", NativeTypes.INT32)
.addColumn("JID", NativeTypes.INT32)
.addColumn("VAL", NativeTypes.STRING)
+ .addColumn("INT_VAL", NativeTypes.INT32)
.distribution(IgniteDistributions.broadcast());
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
index 57edc31ec89..69ca1e92084 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/CommonsTest.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -41,7 +42,9 @@ import
org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
/**
@@ -183,6 +186,18 @@ public class CommonsTest extends BaseIgniteAbstractTest {
}
}
+ @ParameterizedTest
+ @MethodSource("packIntsToLongTestCases")
+ void packIntsToLong(int num1, int num2) {
+ long packed = Commons.packIntsToLong(num1, num2);
+
+ int unpackedNum1 = (int) (packed >>> 32);
+ int unpackedNum2 = (int) packed;
+
+ assertThat(unpackedNum1, is(num1));
+ assertThat(unpackedNum2, is(num2));
+ }
+
private static void expectMapped(Mapping mapping, ImmutableBitSet bitSet,
ImmutableBitSet expected) {
assertEquals(expected, Mappings.apply(mapping, bitSet), "direct
mapping");
@@ -196,6 +211,20 @@ public class CommonsTest extends BaseIgniteAbstractTest {
assertEquals(expected, Mappings.apply(mapping, source));
}
+ private static Stream<Arguments> packIntsToLongTestCases() {
+ return Stream.of(
+ Arguments.of(123, 456),
+ Arguments.of(0, 0),
+ Arguments.of(1, 0),
+ Arguments.of(0, 1),
+ Arguments.of(-1, -1),
+ Arguments.of(100, -200),
+ Arguments.of(-100, 200),
+ Arguments.of(Integer.MAX_VALUE, Integer.MIN_VALUE),
+ Arguments.of(Integer.MIN_VALUE, Integer.MAX_VALUE)
+ );
+ }
+
/** For test purposes. */
@FunctionalInterface
public interface StringConcat {
diff --git a/modules/sql-engine/src/test/resources/mapping/correlated.test
b/modules/sql-engine/src/test/resources/mapping/correlated.test
index 34d6d8038ae..70d95670e3e 100644
--- a/modules/sql-engine/src/test/resources/mapping/correlated.test
+++ b/modules/sql-engine/src/test/resources/mapping/correlated.test
@@ -324,6 +324,7 @@ Fragment#3
predicate: true
type: inner
correlates: [$cor5]
+ correlationFieldNames: [C1, C2]
est: (rows=50001)
TableScan
table: PUBLIC.CT_N1
@@ -370,6 +371,7 @@ Fragment#2 root
predicate: true
type: inner
correlates: [$cor5]
+ correlationFieldNames: [C1, C2]
est: (rows=1)
Receiver
fieldNames: [C1, C2]
@@ -428,6 +430,7 @@ Fragment#2 root
predicate: true
type: inner
correlates: [$cor3]
+ correlationFieldNames: [ID]
est: (rows=1)
Receiver
fieldNames: [ID, C1, C2]
@@ -514,6 +517,7 @@ Fragment#3 root
predicate: true
type: inner
correlates: [$cor6]
+ correlationFieldNames: [ID]
est: (rows=1)
Receiver
fieldNames: [ID, C1, C2]
@@ -535,6 +539,7 @@ Fragment#3 root
predicate: true
type: inner
correlates: [$cor7]
+ correlationFieldNames: [ID]
est: (rows=1)
Receiver
fieldNames: [ID]
@@ -641,6 +646,7 @@ Fragment#3 root
predicate: true
type: inner
correlates: [$cor8]
+ correlationFieldNames: [ID]
est: (rows=1)
Receiver
fieldNames: [ID, C1, C2]
@@ -662,6 +668,7 @@ Fragment#3 root
predicate: true
type: inner
correlates: [$cor9]
+ correlationFieldNames: [ID]
est: (rows=1)
Receiver
fieldNames: [ID]
diff --git
a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
index a86fce7b2e1..7fd60a7cf7e 100644
--- a/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
+++ b/modules/sql-engine/src/test/resources/mapping/test_partition_pruning.test
@@ -233,6 +233,7 @@ Fragment#2 root
predicate: true
type: inner
correlates: [$cor3]
+ correlationFieldNames: [ID]
est: (rows=1)
Receiver
fieldNames: [ID, C1, C2]
@@ -320,6 +321,7 @@ Fragment#2 root
predicate: true
type: inner
correlates: [$cor3]
+ correlationFieldNames: [ID]
est: (rows=1)
Receiver
fieldNames: [ID, C1, C2]