This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 655eb285da0 IGNITE-20501 SQL Calcite: Fix memory leak in 
MailboxRegistryImpl#remotes - Fixes #10996.
655eb285da0 is described below

commit 655eb285da0dc13fe1bff16eb85b5d62eaee0a78
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Oct 20 16:48:08 2023 +0300

    IGNITE-20501 SQL Calcite: Fix memory leak in MailboxRegistryImpl#remotes - 
Fixes #10996.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../query/calcite/CalciteQueryProcessor.java       |  11 ++
 .../query/calcite/exec/ExchangeServiceImpl.java    |  43 +++++++-
 .../query/calcite/exec/LogicalRelImplementor.java  |  15 ++-
 .../query/calcite/exec/MailboxRegistry.java        |  14 +++
 .../query/calcite/exec/MailboxRegistryImpl.java    |  12 ++-
 .../query/calcite/exec/TimeoutService.java         |  28 +++++
 .../query/calcite/exec/TimeoutServiceImpl.java     |  42 ++++++++
 .../query/calcite/exec/rel/MergeJoinNode.java      | 118 +++++++++++++--------
 .../query/calcite/exec/rel/ModifyNode.java         |   4 +-
 .../calcite/exec/rel/AbstractExecutionTest.java    |   5 +
 .../query/calcite/exec/rel/ExecutionTest.java      |  95 +++++++++--------
 .../calcite/exec/rel/MergeJoinExecutionTest.java   |  18 +++-
 .../integration/AbstractBasicIntegrationTest.java  |  24 ++++-
 .../integration/IndexScanlIntegrationTest.java     |   2 +-
 .../integration/JoinRehashIntegrationTest.java     |  75 +++++++++++++
 .../ignite/testsuites/IntegrationTestSuite.java    |   6 +-
 16 files changed, 404 insertions(+), 108 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 3122d3c99d0..5e60cd44b17 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -74,6 +74,8 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistryImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutor;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl;
+import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutService;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorImpl;
 import org.apache.ignite.internal.processors.query.calcite.hint.HintsConfig;
 import 
org.apache.ignite.internal.processors.query.calcite.message.MessageService;
@@ -211,6 +213,9 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
     /** */
     private final PrepareServiceImpl prepareSvc;
 
+    /** */
+    private final TimeoutService timeoutSvc;
+
     /** */
     private final QueryRegistry qryReg;
 
@@ -241,6 +246,7 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
         mappingSvc = new MappingServiceImpl(ctx);
         exchangeSvc = new ExchangeServiceImpl(ctx);
         prepareSvc = new PrepareServiceImpl(ctx);
+        timeoutSvc = new TimeoutServiceImpl(ctx);
         qryReg = new QueryRegistryImpl(ctx);
 
         QueryEngineConfiguration[] qryEnginesCfg = 
ctx.config().getSqlConfiguration().getQueryEnginesConfiguration();
@@ -325,6 +331,11 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
         return prepareSvc;
     }
 
+    /** */
+    public TimeoutService timeoutService() {
+        return timeoutSvc;
+    }
+
     /** */
     public ExecutionService<Object[]> executionService() {
         return executionSvc;
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 4105896c403..b10b69f2213 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
-
 import com.google.common.collect.ImmutableMap;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -50,6 +49,9 @@ import org.apache.ignite.internal.util.typedef.F;
  *
  */
 public class ExchangeServiceImpl extends AbstractService implements 
ExchangeService {
+    /** */
+    public static final long INBOX_INITIALIZATION_TIMEOUT = 1_000L;
+
     /** */
     private final UUID locaNodeId;
 
@@ -62,6 +64,9 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
     /** */
     private MessageService msgSvc;
 
+    /** */
+    private TimeoutService timeoutSvc;
+
     /** */
     private QueryRegistry qryRegistry;
 
@@ -116,6 +121,20 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
         return msgSvc;
     }
 
+    /**
+     * @param timeoutSvc Timeout service.
+     */
+    public void timeoutService(TimeoutService timeoutSvc) {
+        this.timeoutSvc = timeoutSvc;
+    }
+
+    /**
+     * @return Timeout service.
+     */
+    public TimeoutService timeoutService() {
+        return timeoutSvc;
+    }
+
     /** */
     public void queryRegistry(QueryRegistry qryRegistry) {
         this.qryRegistry = qryRegistry;
@@ -163,6 +182,7 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
         taskExecutor(proc.taskExecutor());
         mailboxRegistry(proc.mailboxRegistry());
         messageService(proc.messageService());
+        timeoutService(proc.timeoutService());
         queryRegistry(proc.queryRegistry());
 
         init();
@@ -226,6 +246,9 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
         if (qry != null)
             qry.cancel();
         else {
+            for (Inbox<?> inbox : mailboxRegistry().inboxes(msg.queryId(), -1, 
-1))
+                mailboxRegistry().unregister(inbox);
+
             if (log.isDebugEnabled()) {
                 log.debug("Stale query close message received: [" +
                     "nodeId=" + nodeId +
@@ -269,6 +292,24 @@ public class ExchangeServiceImpl extends AbstractService 
implements ExchangeServ
                 this, mailboxRegistry(), msg.exchangeId(), msg.exchangeId());
 
             inbox = mailboxRegistry().register(newInbox);
+
+            if (inbox == newInbox) {
+                // New inbox for query batch message can be registered in the 
following cases:
+                // 1. Race between messages (when first batch arrived to node 
before query start request). In this case
+                // query start request eventually will be delivered and query 
execution context will be initialized.
+                // Inbox will be closed by standard query execution workflow.
+                // 2. Stale first message (query already has been closed by 
some event). In this case query execution
+                // workflow already completed and inbox can leak. To prevent 
leakage, schedule task to check that
+                // query context is initialized within reasonable time (assume 
that race between messages can't be more
+                // than INBOX_INITIALIZATION_TIMEOUT milliseconds).
+                timeoutService().schedule(() -> {
+                    Inbox<?> timeoutInbox = 
mailboxRegistry().inbox(msg.queryId(), msg.exchangeId());
+
+                    // Inbox is not unregistered and still not initialized.
+                    if (timeoutInbox != null && 
timeoutInbox.context().topologyVersion() == null)
+                        taskExecutor().execute(msg.queryId(), 
msg.fragmentId(), timeoutInbox::close);
+                }, INBOX_INITIALIZATION_TIMEOUT);
+            }
         }
 
         if (inbox != null) {
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index cb816373109..62e8799e2e4 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -280,7 +280,7 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
             rel.rightCollation().getFieldCollations().subList(0, pairsCnt)
         );
 
-        Node<Row> node = MergeJoinNode.create(ctx, outType, leftType, 
rightType, joinType, comp);
+        Node<Row> node = MergeJoinNode.create(ctx, outType, leftType, 
rightType, joinType, comp, hasExchange(rel));
 
         Node<Row> leftInput = visit(rel.getLeft());
         Node<Row> rightInput = visit(rel.getRight());
@@ -290,6 +290,19 @@ public class LogicalRelImplementor<Row> implements 
IgniteRelVisitor<Node<Row>> {
         return node;
     }
 
+    /** */
+    private boolean hasExchange(RelNode rel) {
+        if (rel instanceof IgniteReceiver)
+            return true;
+
+        for (RelNode in : rel.getInputs()) {
+            if (hasExchange(in))
+                return true;
+        }
+
+        return false;
+    }
+
     /** {@inheritDoc} */
     @Override public Node<Row> visit(IgniteIndexScan rel) {
         RexNode condition = rel.condition();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
index 8c7c2d4fa4a..93add75dabe 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
@@ -97,4 +97,18 @@ public interface MailboxRegistry extends Service {
      * @return Registered outboxes.
      */
     Collection<Outbox<?>> outboxes(@Nullable UUID qryId, long fragmentId, long 
exchangeId);
+
+    /**
+     * Returns all registered inboxes.
+
+     * @return Registered inboxes.
+     */
+    Collection<Inbox<?>> inboxes();
+
+    /**
+     * Returns all registered outboxes.
+     *
+     * @return Registered outboxes.
+     */
+    Collection<Outbox<?>> outboxes();
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
index 538e7b049e7..c9c25518495 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
@@ -18,13 +18,13 @@
 package org.apache.ignite.internal.processors.query.calcite.exec;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
-
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
@@ -135,6 +135,16 @@ public class MailboxRegistryImpl extends AbstractService 
implements MailboxRegis
             .collect(Collectors.toList());
     }
 
+    /** {@inheritDoc} */
+    @Override public Collection<Inbox<?>> inboxes() {
+        return Collections.unmodifiableCollection(remotes.values());
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Outbox<?>> outboxes() {
+        return Collections.unmodifiableCollection(locals.values());
+    }
+
     /**
      * @param evtMgr Event manager.
      */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TimeoutService.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TimeoutService.java
new file mode 100644
index 00000000000..3ff81c1910f
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TimeoutService.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import org.apache.ignite.internal.processors.query.calcite.util.Service;
+
+/**
+ * Service to run tasks with the given timeout.
+ */
+public interface TimeoutService extends Service {
+    /** */
+    public void schedule(Runnable task, long timeout);
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TimeoutServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TimeoutServiceImpl.java
new file mode 100644
index 00000000000..92f611fe729
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TimeoutServiceImpl.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import org.apache.ignite.internal.GridKernalContext;
+import 
org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+
+/**
+ * Timeout service implementation.
+ */
+public class TimeoutServiceImpl extends AbstractService implements 
TimeoutService {
+    /** */
+    private final GridTimeoutProcessor proc;
+
+    /** */
+    public TimeoutServiceImpl(GridKernalContext ctx) {
+        super(ctx);
+
+        proc = ctx.timeout();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void schedule(Runnable task, long timeout) {
+        proc.schedule(task, timeout, -1);
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
index 7a66ccaf361..35f31e8b931 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
@@ -59,14 +59,22 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
     /** */
     protected boolean inLoop;
 
+    /**
+     * Flag indicating that at least one of the inputs has exchange 
underneath. In this case we can't prematurely end
+     * downstream if one of the inputs is drained, we need to wait for both 
inputs, since async message from remote
+     * node can reopen closed inbox, which can cause memory leaks.
+     */
+    protected final boolean distributed;
+
     /**
      * @param ctx Execution context.
      * @param comp Join expression.
      */
-    private MergeJoinNode(ExecutionContext<Row> ctx, RelDataType rowType, 
Comparator<Row> comp) {
+    private MergeJoinNode(ExecutionContext<Row> ctx, RelDataType rowType, 
Comparator<Row> comp, boolean distributed) {
         super(ctx, rowType);
 
         this.comp = comp;
+        this.distributed = distributed;
         handler = ctx.rowHandler();
     }
 
@@ -205,37 +213,45 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
     /** */
     protected abstract void join() throws Exception;
 
+    /** */
+    protected void checkJoinFinished() throws Exception {
+        if (!distributed || (waitingLeft == NOT_WAITING && waitingRight == 
NOT_WAITING)) {
+            requested = 0;
+            downstream().end();
+        }
+    }
+
     /** */
     @NotNull public static <Row> MergeJoinNode<Row> 
create(ExecutionContext<Row> ctx, RelDataType outputRowType, RelDataType 
leftRowType,
-        RelDataType rightRowType, JoinRelType joinType, Comparator<Row> comp) {
+        RelDataType rightRowType, JoinRelType joinType, Comparator<Row> comp, 
boolean distributed) {
         switch (joinType) {
             case INNER:
-                return new InnerJoin<>(ctx, outputRowType, comp);
+                return new InnerJoin<>(ctx, outputRowType, comp, distributed);
 
             case LEFT: {
                 RowHandler.RowFactory<Row> rightRowFactory = 
ctx.rowHandler().factory(ctx.getTypeFactory(), rightRowType);
 
-                return new LeftJoin<>(ctx, outputRowType, comp, 
rightRowFactory);
+                return new LeftJoin<>(ctx, outputRowType, comp, distributed, 
rightRowFactory);
             }
 
             case RIGHT: {
                 RowHandler.RowFactory<Row> leftRowFactory = 
ctx.rowHandler().factory(ctx.getTypeFactory(), leftRowType);
 
-                return new RightJoin<>(ctx, outputRowType, comp, 
leftRowFactory);
+                return new RightJoin<>(ctx, outputRowType, comp, distributed, 
leftRowFactory);
             }
 
             case FULL: {
                 RowHandler.RowFactory<Row> leftRowFactory = 
ctx.rowHandler().factory(ctx.getTypeFactory(), leftRowType);
                 RowHandler.RowFactory<Row> rightRowFactory = 
ctx.rowHandler().factory(ctx.getTypeFactory(), rightRowType);
 
-                return new FullOuterJoin<>(ctx, outputRowType, comp, 
leftRowFactory, rightRowFactory);
+                return new FullOuterJoin<>(ctx, outputRowType, comp, 
distributed, leftRowFactory, rightRowFactory);
             }
 
             case SEMI:
-                return new SemiJoin<>(ctx, outputRowType, comp);
+                return new SemiJoin<>(ctx, outputRowType, comp, distributed);
 
             case ANTI:
-                return new AntiJoin<>(ctx, outputRowType, comp);
+                return new AntiJoin<>(ctx, outputRowType, comp, distributed);
 
             default:
                 throw new IllegalStateException("Join type \"" + joinType + 
"\" is not supported yet");
@@ -263,9 +279,10 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
          * @param ctx Execution context.
          * @param rowType Row type.
          * @param comp Join expression comparator.
+         * @param distributed If one of the inputs has exchange underneath.
          */
-        public InnerJoin(ExecutionContext<Row> ctx, RelDataType rowType, 
Comparator<Row> comp) {
-            super(ctx, rowType, comp);
+        public InnerJoin(ExecutionContext<Row> ctx, RelDataType rowType, 
Comparator<Row> comp, boolean distributed) {
+            super(ctx, rowType, comp, distributed);
         }
 
         /** {@inheritDoc} */
@@ -383,10 +400,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
 
             if (requested > 0 && ((waitingLeft == NOT_WAITING && left == null 
&& leftInBuf.isEmpty())
                 || (waitingRight == NOT_WAITING && right == null && 
rightInBuf.isEmpty() && rightMaterialization == null))
-            ) {
-                requested = 0;
-                downstream().end();
-            }
+            )
+                checkJoinFinished();
         }
     }
 
@@ -417,10 +432,17 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
          * @param ctx Execution context.
          * @param rowType Row type.
          * @param comp Join expression comparator.
+         * @param distributed If one of the inputs has exchange underneath.
          * @param rightRowFactory Right row factory.
          */
-        public LeftJoin(ExecutionContext<Row> ctx, RelDataType rowType, 
Comparator<Row> comp, RowHandler.RowFactory<Row> rightRowFactory) {
-            super(ctx, rowType, comp);
+        public LeftJoin(
+            ExecutionContext<Row> ctx,
+            RelDataType rowType,
+            Comparator<Row> comp,
+            boolean distributed,
+            RowHandler.RowFactory<Row> rightRowFactory
+        ) {
+            super(ctx, rowType, comp, distributed);
 
             this.rightRowFactory = rightRowFactory;
         }
@@ -561,10 +583,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
             if (waitingLeft == 0)
                 leftSource().request(waitingLeft = IN_BUFFER_SIZE);
 
-            if (requested > 0 && waitingLeft == NOT_WAITING && left == null && 
leftInBuf.isEmpty()) {
-                requested = 0;
-                downstream().end();
-            }
+            if (requested > 0 && waitingLeft == NOT_WAITING && left == null && 
leftInBuf.isEmpty())
+                checkJoinFinished();
         }
     }
 
@@ -595,10 +615,17 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
          * @param ctx Execution context.
          * @param rowType Row type.
          * @param comp Join expression comparator.
+         * @param distributed If one of the inputs has exchange underneath.
          * @param leftRowFactory Left row factory.
          */
-        public RightJoin(ExecutionContext<Row> ctx, RelDataType rowType, 
Comparator<Row> comp, RowHandler.RowFactory<Row> leftRowFactory) {
-            super(ctx, rowType, comp);
+        public RightJoin(
+            ExecutionContext<Row> ctx,
+            RelDataType rowType,
+            Comparator<Row> comp,
+            boolean distributed,
+            RowHandler.RowFactory<Row> leftRowFactory
+        ) {
+            super(ctx, rowType, comp, distributed);
 
             this.leftRowFactory = leftRowFactory;
         }
@@ -748,10 +775,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
             if (waitingLeft == 0)
                 leftSource().request(waitingLeft = IN_BUFFER_SIZE);
 
-            if (requested > 0 && waitingRight == NOT_WAITING && right == null 
&& rightInBuf.isEmpty() && rightMaterialization == null) {
-                requested = 0;
-                downstream().end();
-            }
+            if (requested > 0 && waitingRight == NOT_WAITING && right == null 
&& rightInBuf.isEmpty() && rightMaterialization == null)
+                checkJoinFinished();
         }
     }
 
@@ -788,12 +813,19 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
          * @param ctx Execution context.
          * @param rowType Row type.
          * @param comp Join expression comparator.
+         * @param distributed If one of the inputs has exchange underneath.
          * @param leftRowFactory Left row factory.
          * @param rightRowFactory Right row factory.
          */
-        public FullOuterJoin(ExecutionContext<Row> ctx, RelDataType rowType, 
Comparator<Row> comp,
-            RowHandler.RowFactory<Row> leftRowFactory, 
RowHandler.RowFactory<Row> rightRowFactory) {
-            super(ctx, rowType, comp);
+        public FullOuterJoin(
+            ExecutionContext<Row> ctx,
+            RelDataType rowType,
+            Comparator<Row> comp,
+            boolean distributed,
+            RowHandler.RowFactory<Row> leftRowFactory,
+            RowHandler.RowFactory<Row> rightRowFactory
+        ) {
+            super(ctx, rowType, comp, distributed);
 
             this.leftRowFactory = leftRowFactory;
             this.rightRowFactory = rightRowFactory;
@@ -976,10 +1008,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
 
             if (requested > 0 && waitingLeft == NOT_WAITING && left == null && 
leftInBuf.isEmpty()
                 && waitingRight == NOT_WAITING && right == null && 
rightInBuf.isEmpty() && rightMaterialization == null
-            ) {
-                requested = 0;
-                downstream().end();
-            }
+            )
+                checkJoinFinished();
         }
     }
 
@@ -995,9 +1025,10 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
          * @param ctx Execution context.
          * @param rowType Row type.
          * @param comp Join expression comparator.
+         * @param distributed If one of the inputs has exchange underneath.
          */
-        public SemiJoin(ExecutionContext<Row> ctx, RelDataType rowType, 
Comparator<Row> comp) {
-            super(ctx, rowType, comp);
+        public SemiJoin(ExecutionContext<Row> ctx, RelDataType rowType, 
Comparator<Row> comp, boolean distributed) {
+            super(ctx, rowType, comp, distributed);
         }
 
         /** {@inheritDoc} */
@@ -1052,10 +1083,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
 
             if (requested > 0 && ((waitingLeft == NOT_WAITING && left == null 
&& leftInBuf.isEmpty()
                 || (waitingRight == NOT_WAITING && right == null && 
rightInBuf.isEmpty())))
-            ) {
-                requested = 0;
-                downstream().end();
-            }
+            )
+                checkJoinFinished();
         }
     }
 
@@ -1071,9 +1100,10 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
          * @param ctx Execution context.
          * @param rowType Row type.
          * @param comp Join expression comparator.
+         * @param distributed If one of the inputs has exchange underneath.
          */
-        public AntiJoin(ExecutionContext<Row> ctx, RelDataType rowType, 
Comparator<Row> comp) {
-            super(ctx, rowType, comp);
+        public AntiJoin(ExecutionContext<Row> ctx, RelDataType rowType, 
Comparator<Row> comp, boolean distributed) {
+            super(ctx, rowType, comp, distributed);
         }
 
         /** {@inheritDoc} */
@@ -1129,10 +1159,8 @@ public abstract class MergeJoinNode<Row> extends 
AbstractNode<Row> {
             if (waitingLeft == 0)
                 leftSource().request(waitingLeft = IN_BUFFER_SIZE);
 
-            if (requested > 0 && waitingLeft == NOT_WAITING && left == null && 
leftInBuf.isEmpty()) {
-                requested = 0;
-                downstream().end();
-            }
+            if (requested > 0 && waitingLeft == NOT_WAITING && left == null && 
leftInBuf.isEmpty())
+                checkJoinFinished();
         }
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
index a52b0996f99..4491028d0be 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
@@ -216,11 +216,11 @@ public class ModifyNode<Row> extends AbstractNode<Row> 
implements SingleNode<Row
     private IgniteSQLException conflictKeysException(List<Object> 
conflictKeys) {
         if (op == TableModify.Operation.INSERT) {
             return new IgniteSQLException("Failed to INSERT some keys because 
they are already in cache. " +
-                "[keys=" + conflictKeys + ']', DUPLICATE_KEY);
+                "[cache=" + desc.cacheContext().name() + ", keys=" + 
conflictKeys + ']', DUPLICATE_KEY);
         }
         else {
             return new IgniteSQLException("Failed to MERGE some keys due to 
keys conflict or concurrent updates. " +
-                "[keys=" + conflictKeys + ']', CONCURRENT_UPDATE);
+                "[cache=" + desc.cacheContext().name() + ", keys=" + 
conflictKeys + ']', CONCURRENT_UPDATE);
         }
     }
 
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index 8903956bf98..3bad2abfc7f 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -48,6 +48,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistryImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutor;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
 import 
org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
@@ -55,6 +56,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.message.MessageServic
 import 
org.apache.ignite.internal.processors.query.calcite.message.TestIoManager;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
 import 
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -159,6 +161,8 @@ public class AbstractExecutionTest extends 
GridCommonAbstractTest {
         for (UUID uuid : nodes) {
             GridTestKernalContext kernal = newContext();
 
+            kernal.add(new GridTimeoutProcessor(kernal));
+
             QueryTaskExecutorImpl taskExecutor = new 
QueryTaskExecutorImpl(kernal);
             taskExecutor.stripedThreadPoolExecutor(new 
IgniteTestStripedThreadPoolExecutor(
                 execStgy,
@@ -185,6 +189,7 @@ public class AbstractExecutionTest extends 
GridCommonAbstractTest {
             exchangeSvc.messageService(msgSvc);
             exchangeSvc.mailboxRegistry(mailboxRegistry);
             exchangeSvc.queryRegistry(new QueryRegistryImpl(kernal));
+            exchangeSvc.timeoutService(new TimeoutServiceImpl(kernal));
             exchangeSvc.init();
 
             exchangeServices.put(uuid, exchangeSvc);
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
index c669a58de38..e201c5cf2f4 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
@@ -549,55 +549,58 @@ public class ExecutionTest extends AbstractExecutionTest {
 
         for (int leftSize : sizes) {
             for (int rightSize : sizes) {
-                log.info("Check: leftSize=" + leftSize + ", rightSize=" + 
rightSize);
-
-                ScanNode<Object[]> left = new ScanNode<>(ctx, rowType, new 
TestTable(leftSize, rowType));
-                ScanNode<Object[]> right = new ScanNode<>(ctx, rowType, new 
TestTable(rightSize, rowType));
-
-                RelDataType joinRowType = TypeUtils.createRowType(
-                    tf,
-                    int.class, String.class, int.class,
-                    int.class, String.class, int.class);
-
-                MergeJoinNode<Object[]> join = MergeJoinNode.create(
-                    ctx,
-                    joinRowType,
-                    null,
-                    null,
-                    INNER,
-                    (r1, r2) -> {
-                        Object o1 = r1[0];
-                        Object o2 = r2[0];
-
-                        if (o1 == null || o2 == null) {
-                            if (o1 != null)
-                                return 1;
-                            else if (o2 != null)
-                                return -1;
-                            else
-                                return 0;
-                        }
-
-                        return Integer.compare((Integer)o1, (Integer)o2);
+                for (boolean distr : new boolean[] {false, true}) {
+                    log.info("Check: leftSize=" + leftSize + ", rightSize=" + 
rightSize + ", distributed=" + distr);
+
+                    ScanNode<Object[]> left = new ScanNode<>(ctx, rowType, new 
TestTable(leftSize, rowType));
+                    ScanNode<Object[]> right = new ScanNode<>(ctx, rowType, 
new TestTable(rightSize, rowType));
+
+                    RelDataType joinRowType = TypeUtils.createRowType(
+                        tf,
+                        int.class, String.class, int.class,
+                        int.class, String.class, int.class);
+
+                    MergeJoinNode<Object[]> join = MergeJoinNode.create(
+                        ctx,
+                        joinRowType,
+                        null,
+                        null,
+                        INNER,
+                        (r1, r2) -> {
+                            Object o1 = r1[0];
+                            Object o2 = r2[0];
+
+                            if (o1 == null || o2 == null) {
+                                if (o1 != null)
+                                    return 1;
+                                else if (o2 != null)
+                                    return -1;
+                                else
+                                    return 0;
+                            }
+
+                            return Integer.compare((Integer)o1, (Integer)o2);
+                        },
+                        distr
+                    );
+
+                    join.register(Arrays.asList(left, right));
+
+                    RootNode<Object[]> root = new RootNode<>(ctx, joinRowType);
+                    root.register(join);
+
+                    int cnt = 0;
+                    while (root.hasNext()) {
+                        root.next();
+
+                        cnt++;
                     }
-                );
-
-                join.register(Arrays.asList(left, right));
 
-                RootNode<Object[]> root = new RootNode<>(ctx, joinRowType);
-                root.register(join);
-
-                int cnt = 0;
-                while (root.hasNext()) {
-                    root.next();
-
-                    cnt++;
+                    assertEquals(
+                        "Invalid result size. [left=" + leftSize + ", right=" 
+ rightSize + ", results=" + cnt,
+                        min(leftSize, rightSize),
+                        cnt);
                 }
-
-                assertEquals(
-                    "Invalid result size. [left=" + leftSize + ", right=" + 
rightSize + ", results=" + cnt,
-                    min(leftSize, rightSize),
-                    cnt);
             }
         }
     }
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinExecutionTest.java
index 5f49702b468..c7304838909 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinExecutionTest.java
@@ -27,7 +27,6 @@ import org.apache.calcite.rel.type.RelDataType;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -42,7 +41,6 @@ import static org.hamcrest.core.IsEqual.equalTo;
 
 /** */
 @SuppressWarnings("TypeMayBeWeakened")
-@WithSystemProperty(key = "calcite.debug", value = "true")
 public class MergeJoinExecutionTest extends AbstractExecutionTest {
     /** */
     public static final Object[][] EMPTY = new Object[0][];
@@ -385,6 +383,20 @@ public class MergeJoinExecutionTest extends 
AbstractExecutionTest {
      * @param expRes Expected result.
      */
     private void verifyJoin(Object[][] left, Object[][] right, JoinRelType 
joinType, Object[][] expRes) {
+        verifyJoin(left, right, joinType, expRes, false);
+        verifyJoin(left, right, joinType, expRes, true);
+    }
+
+    /**
+     * Creates execution tree and executes it. Then compares the result of the 
execution with the given one.
+     *
+     * @param left Data for left table.
+     * @param right Data for right table.
+     * @param joinType Join type.
+     * @param expRes Expected result.
+     * @param distr Distributed.
+     */
+    private void verifyJoin(Object[][] left, Object[][] right, JoinRelType 
joinType, Object[][] expRes, boolean distr) {
         ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()), 
UUID.randomUUID(), 0);
 
         RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(), 
int.class, String.class, Integer.class);
@@ -413,7 +425,7 @@ public class MergeJoinExecutionTest extends 
AbstractExecutionTest {
             }
 
             return Integer.compare((Integer)o1, (Integer)o2);
-        });
+        }, distr);
 
         join.register(F.asList(leftNode, rightNode));
 
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 111416f0da8..eed1ae56e85 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -47,17 +47,16 @@ import 
org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeServiceImpl.INBOX_INITIALIZATION_TIMEOUT;
 import static 
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
 /**
  *
  */
-@WithSystemProperty(key = "calcite.debug", value = "false")
 public class AbstractBasicIntegrationTest extends GridCommonAbstractTest {
     /** */
     protected static final Object[] NULL_RESULT = new Object[] { null };
@@ -83,11 +82,20 @@ public class AbstractBasicIntegrationTest extends 
GridCommonAbstractTest {
         assertTrue("Not finished queries found on client", waitForCondition(
             () -> 
queryProcessor(client).queryRegistry().runningQueries().isEmpty(), 1_000L));
 
+        waitForCondition(() -> {
+            for (Ignite ign : G.allGrids()) {
+                if (!queryProcessor(ign).mailboxRegistry().inboxes().isEmpty())
+                    return false;
+            }
+
+            return true;
+        }, INBOX_INITIALIZATION_TIMEOUT * 2);
+
         for (Ignite ign : G.allGrids()) {
             for (String cacheName : ign.cacheNames())
                 ign.destroyCache(cacheName);
 
-            CalciteQueryProcessor qryProc = queryProcessor(((IgniteEx)ign));
+            CalciteQueryProcessor qryProc = queryProcessor(ign);
 
             assertEquals("Not finished queries found [ignite=" + ign.name() + 
']',
                 0, qryProc.queryRegistry().runningQueries().size());
@@ -95,6 +103,12 @@ public class AbstractBasicIntegrationTest extends 
GridCommonAbstractTest {
             ExecutionServiceImpl<Object[]> execSvc = 
(ExecutionServiceImpl<Object[]>)qryProc.executionService();
             assertEquals("Tracked memory must be 0 after test [ignite=" + 
ign.name() + ']',
                 0, execSvc.memoryTracker().allocated());
+
+            assertEquals("Count of inboxes must be 0 after test [ignite=" + 
ign.name() + ']',
+                0, qryProc.mailboxRegistry().inboxes().size());
+
+            assertEquals("Count of outboxes must be 0 after test [ignite=" + 
ign.name() + ']',
+                0, qryProc.mailboxRegistry().outboxes().size());
         }
 
         awaitPartitionMapExchange();
@@ -189,8 +203,8 @@ public class AbstractBasicIntegrationTest extends 
GridCommonAbstractTest {
     }
 
     /** */
-    protected CalciteQueryProcessor queryProcessor(IgniteEx ignite) {
-        return Commons.lookupComponent(ignite.context(), 
CalciteQueryProcessor.class);
+    protected CalciteQueryProcessor queryProcessor(Ignite ignite) {
+        return Commons.lookupComponent(((IgniteEx)ignite).context(), 
CalciteQueryProcessor.class);
     }
 
     /** */
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
index f66c495149f..093acb259fc 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
@@ -419,7 +419,7 @@ public class IndexScanlIntegrationTest extends 
AbstractBasicIntegrationTest {
         RowCountingIndex idx = null;
 
         for (Ignite ignite : G.allGrids()) {
-            IgniteTable tbl = 
(IgniteTable)queryProcessor((IgniteEx)ignite).schemaHolder().schema("PUBLIC").getTable(tableName);
+            IgniteTable tbl = 
(IgniteTable)queryProcessor(ignite).schemaHolder().schema("PUBLIC").getTable(tableName);
 
             if (ignite == node) {
                 idx = new RowCountingIndex(tbl.getIndex(idxName));
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinRehashIntegrationTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinRehashIntegrationTest.java
new file mode 100644
index 00000000000..ad7fc45434f
--- /dev/null
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinRehashIntegrationTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.math.BigDecimal;
+import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/** */
+public class JoinRehashIntegrationTest extends AbstractBasicIntegrationTest {
+    /** {@inheritDoc} */
+    @Override protected int nodeCount() {
+        return 3;
+    }
+
+    /** Test that resources (in particular inboxes) are cleaned up after 
executing join with rehashing. */
+    @Test
+    public void testResourceCleanup() throws Exception {
+        sql("CREATE TABLE order_items (\n" +
+            "    id varchar,\n" +
+            "    orderId int,\n" +
+            "    price decimal,\n" +
+            "    amount int,\n" +
+            "    PRIMARY KEY (id))\n" +
+            "    WITH \"cache_name=order_items,backups=1\"");
+
+        sql("CREATE TABLE orders (\n" +
+            "    id int,\n" +
+            "    region varchar,\n" +
+            "    PRIMARY KEY (id))\n" +
+            "    WITH \"cache_name=orders,backups=1\"");
+
+        sql("CREATE INDEX order_items_orderId ON order_items (orderId ASC)");
+        sql("CREATE INDEX orders_region ON orders (region ASC)");
+
+        for (int i = 0; i < 30; i++) {
+            sql("INSERT INTO orders VALUES(?, ?)", i, "region" + i % 10);
+            for (int j = 0; j < 20; j++)
+                sql("INSERT INTO order_items VALUES(?, ?, ?, ?)", i + "_" + j, 
i, i / 10.0, j % 10);
+        }
+
+        String sql = "SELECT sum(i.price * i.amount)" +
+            " FROM order_items i JOIN orders o ON o.id=i.orderId" +
+            " WHERE o.region = ?";
+
+        assertQuery(sql)
+            .withParams("region0")
+            .matches(QueryChecker.containsSubPlan("IgniteMergeJoin"))
+            .returns(BigDecimal.valueOf(270))
+            .check();
+
+        // Here we only start queries and wait for result, actual resource 
clean up is checked by
+        // AbstractBasicIntegrationTest.afterTest method.
+        GridTestUtils.runMultiThreaded(() -> {
+            for (int i = 0; i < 100; i++)
+                sql(sql, i % 10);
+        }, 10, "query_starter");
+    }
+}
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index c71493ce606..f6fc3c82b69 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -37,6 +37,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.integration.IndexScan
 import 
org.apache.ignite.internal.processors.query.calcite.integration.IndexSpoolIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.IntervalTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.JoinIntegrationTest;
+import 
org.apache.ignite.internal.processors.query.calcite.integration.JoinRehashIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.KeepBinaryIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.KillCommandDdlIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.KillQueryCommandDdlIntegrationTest;
@@ -126,9 +127,8 @@ import org.junit.runners.Suite;
     DynamicParametersIntegrationTest.class,
     ExpiredEntriesIntegrationTest.class,
     TimeoutIntegrationTest.class,
-
-    // Partition pruning
-    PartitionPruneTest.class
+    PartitionPruneTest.class,
+    JoinRehashIntegrationTest.class,
 })
 public class IntegrationTestSuite {
 }

Reply via email to