This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 655eb285da0 IGNITE-20501 SQL Calcite: Fix memory leak in
MailboxRegistryImpl#remotes - Fixes #10996.
655eb285da0 is described below
commit 655eb285da0dc13fe1bff16eb85b5d62eaee0a78
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Oct 20 16:48:08 2023 +0300
IGNITE-20501 SQL Calcite: Fix memory leak in MailboxRegistryImpl#remotes -
Fixes #10996.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../query/calcite/CalciteQueryProcessor.java | 11 ++
.../query/calcite/exec/ExchangeServiceImpl.java | 43 +++++++-
.../query/calcite/exec/LogicalRelImplementor.java | 15 ++-
.../query/calcite/exec/MailboxRegistry.java | 14 +++
.../query/calcite/exec/MailboxRegistryImpl.java | 12 ++-
.../query/calcite/exec/TimeoutService.java | 28 +++++
.../query/calcite/exec/TimeoutServiceImpl.java | 42 ++++++++
.../query/calcite/exec/rel/MergeJoinNode.java | 118 +++++++++++++--------
.../query/calcite/exec/rel/ModifyNode.java | 4 +-
.../calcite/exec/rel/AbstractExecutionTest.java | 5 +
.../query/calcite/exec/rel/ExecutionTest.java | 95 +++++++++--------
.../calcite/exec/rel/MergeJoinExecutionTest.java | 18 +++-
.../integration/AbstractBasicIntegrationTest.java | 24 ++++-
.../integration/IndexScanlIntegrationTest.java | 2 +-
.../integration/JoinRehashIntegrationTest.java | 75 +++++++++++++
.../ignite/testsuites/IntegrationTestSuite.java | 6 +-
16 files changed, 404 insertions(+), 108 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 3122d3c99d0..5e60cd44b17 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -74,6 +74,8 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry;
import
org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistryImpl;
import
org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutor;
import
org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl;
+import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutService;
+import
org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorImpl;
import org.apache.ignite.internal.processors.query.calcite.hint.HintsConfig;
import
org.apache.ignite.internal.processors.query.calcite.message.MessageService;
@@ -211,6 +213,9 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
/** */
private final PrepareServiceImpl prepareSvc;
+ /** */
+ private final TimeoutService timeoutSvc;
+
/** */
private final QueryRegistry qryReg;
@@ -241,6 +246,7 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
mappingSvc = new MappingServiceImpl(ctx);
exchangeSvc = new ExchangeServiceImpl(ctx);
prepareSvc = new PrepareServiceImpl(ctx);
+ timeoutSvc = new TimeoutServiceImpl(ctx);
qryReg = new QueryRegistryImpl(ctx);
QueryEngineConfiguration[] qryEnginesCfg =
ctx.config().getSqlConfiguration().getQueryEnginesConfiguration();
@@ -325,6 +331,11 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
return prepareSvc;
}
+ /** */
+ public TimeoutService timeoutService() {
+ return timeoutSvc;
+ }
+
/** */
public ExecutionService<Object[]> executionService() {
return executionSvc;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
index 4105896c403..b10b69f2213 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExchangeServiceImpl.java
@@ -21,7 +21,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
-
import com.google.common.collect.ImmutableMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -50,6 +49,9 @@ import org.apache.ignite.internal.util.typedef.F;
*
*/
public class ExchangeServiceImpl extends AbstractService implements
ExchangeService {
+ /** */
+ public static final long INBOX_INITIALIZATION_TIMEOUT = 1_000L;
+
/** */
private final UUID locaNodeId;
@@ -62,6 +64,9 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
/** */
private MessageService msgSvc;
+ /** */
+ private TimeoutService timeoutSvc;
+
/** */
private QueryRegistry qryRegistry;
@@ -116,6 +121,20 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
return msgSvc;
}
+ /**
+ * @param timeoutSvc Timeout service.
+ */
+ public void timeoutService(TimeoutService timeoutSvc) {
+ this.timeoutSvc = timeoutSvc;
+ }
+
+ /**
+ * @return Timeout service.
+ */
+ public TimeoutService timeoutService() {
+ return timeoutSvc;
+ }
+
/** */
public void queryRegistry(QueryRegistry qryRegistry) {
this.qryRegistry = qryRegistry;
@@ -163,6 +182,7 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
taskExecutor(proc.taskExecutor());
mailboxRegistry(proc.mailboxRegistry());
messageService(proc.messageService());
+ timeoutService(proc.timeoutService());
queryRegistry(proc.queryRegistry());
init();
@@ -226,6 +246,9 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
if (qry != null)
qry.cancel();
else {
+ for (Inbox<?> inbox : mailboxRegistry().inboxes(msg.queryId(), -1,
-1))
+ mailboxRegistry().unregister(inbox);
+
if (log.isDebugEnabled()) {
log.debug("Stale query close message received: [" +
"nodeId=" + nodeId +
@@ -269,6 +292,24 @@ public class ExchangeServiceImpl extends AbstractService
implements ExchangeServ
this, mailboxRegistry(), msg.exchangeId(), msg.exchangeId());
inbox = mailboxRegistry().register(newInbox);
+
+ if (inbox == newInbox) {
+ // New inbox for query batch message can be registered in the
following cases:
+ // 1. Race between messages (when first batch arrived to node
before query start request). In this case
+ // query start request eventually will be delivered and query
execution context will be initialized.
+ // Inbox will be closed by standard query execution workflow.
+ // 2. Stale first message (query already has been closed by
some event). In this case query execution
+ // workflow already completed and inbox can leak. To prevent
leakage, schedule task to check that
+ // query context is initialized within reasonable time (assume
that race between messages can't be more
+ // than INBOX_INITIALIZATION_TIMEOUT milliseconds).
+ timeoutService().schedule(() -> {
+ Inbox<?> timeoutInbox =
mailboxRegistry().inbox(msg.queryId(), msg.exchangeId());
+
+ // Inbox is not unregistered and still not initialized.
+ if (timeoutInbox != null &&
timeoutInbox.context().topologyVersion() == null)
+ taskExecutor().execute(msg.queryId(),
msg.fragmentId(), timeoutInbox::close);
+ }, INBOX_INITIALIZATION_TIMEOUT);
+ }
}
if (inbox != null) {
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
index cb816373109..62e8799e2e4 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/LogicalRelImplementor.java
@@ -280,7 +280,7 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
rel.rightCollation().getFieldCollations().subList(0, pairsCnt)
);
- Node<Row> node = MergeJoinNode.create(ctx, outType, leftType,
rightType, joinType, comp);
+ Node<Row> node = MergeJoinNode.create(ctx, outType, leftType,
rightType, joinType, comp, hasExchange(rel));
Node<Row> leftInput = visit(rel.getLeft());
Node<Row> rightInput = visit(rel.getRight());
@@ -290,6 +290,19 @@ public class LogicalRelImplementor<Row> implements
IgniteRelVisitor<Node<Row>> {
return node;
}
+ /** */
+ private boolean hasExchange(RelNode rel) {
+ if (rel instanceof IgniteReceiver)
+ return true;
+
+ for (RelNode in : rel.getInputs()) {
+ if (hasExchange(in))
+ return true;
+ }
+
+ return false;
+ }
+
/** {@inheritDoc} */
@Override public Node<Row> visit(IgniteIndexScan rel) {
RexNode condition = rel.condition();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
index 8c7c2d4fa4a..93add75dabe 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistry.java
@@ -97,4 +97,18 @@ public interface MailboxRegistry extends Service {
* @return Registered outboxes.
*/
Collection<Outbox<?>> outboxes(@Nullable UUID qryId, long fragmentId, long
exchangeId);
+
+ /**
+ * Returns all registered inboxes.
+
+ * @return Registered inboxes.
+ */
+ Collection<Inbox<?>> inboxes();
+
+ /**
+ * Returns all registered outboxes.
+ *
+ * @return Registered outboxes.
+ */
+ Collection<Outbox<?>> outboxes();
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
index 538e7b049e7..c9c25518495 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MailboxRegistryImpl.java
@@ -18,13 +18,13 @@
package org.apache.ignite.internal.processors.query.calcite.exec;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
-
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
@@ -135,6 +135,16 @@ public class MailboxRegistryImpl extends AbstractService
implements MailboxRegis
.collect(Collectors.toList());
}
+ /** {@inheritDoc} */
+ @Override public Collection<Inbox<?>> inboxes() {
+ return Collections.unmodifiableCollection(remotes.values());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<Outbox<?>> outboxes() {
+ return Collections.unmodifiableCollection(locals.values());
+ }
+
/**
* @param evtMgr Event manager.
*/
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TimeoutService.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TimeoutService.java
new file mode 100644
index 00000000000..3ff81c1910f
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TimeoutService.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import org.apache.ignite.internal.processors.query.calcite.util.Service;
+
+/**
+ * Service to run tasks with the given timeout.
+ */
+public interface TimeoutService extends Service {
+ /** */
+ public void schedule(Runnable task, long timeout);
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TimeoutServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TimeoutServiceImpl.java
new file mode 100644
index 00000000000..92f611fe729
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/TimeoutServiceImpl.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import org.apache.ignite.internal.GridKernalContext;
+import
org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+
+/**
+ * Timeout service implementation.
+ */
+public class TimeoutServiceImpl extends AbstractService implements
TimeoutService {
+ /** */
+ private final GridTimeoutProcessor proc;
+
+ /** */
+ public TimeoutServiceImpl(GridKernalContext ctx) {
+ super(ctx);
+
+ proc = ctx.timeout();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void schedule(Runnable task, long timeout) {
+ proc.schedule(task, timeout, -1);
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
index 7a66ccaf361..35f31e8b931 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinNode.java
@@ -59,14 +59,22 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
/** */
protected boolean inLoop;
+ /**
+ * Flag indicating that at least one of the inputs has exchange
underneath. In this case we can't prematurely end
+ * downstream if one of the inputs is drained, we need to wait for both
inputs, since async message from remote
+ * node can reopen closed inbox, which can cause memory leaks.
+ */
+ protected final boolean distributed;
+
/**
* @param ctx Execution context.
* @param comp Join expression.
*/
- private MergeJoinNode(ExecutionContext<Row> ctx, RelDataType rowType,
Comparator<Row> comp) {
+ private MergeJoinNode(ExecutionContext<Row> ctx, RelDataType rowType,
Comparator<Row> comp, boolean distributed) {
super(ctx, rowType);
this.comp = comp;
+ this.distributed = distributed;
handler = ctx.rowHandler();
}
@@ -205,37 +213,45 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
/** */
protected abstract void join() throws Exception;
+ /** */
+ protected void checkJoinFinished() throws Exception {
+ if (!distributed || (waitingLeft == NOT_WAITING && waitingRight ==
NOT_WAITING)) {
+ requested = 0;
+ downstream().end();
+ }
+ }
+
/** */
@NotNull public static <Row> MergeJoinNode<Row>
create(ExecutionContext<Row> ctx, RelDataType outputRowType, RelDataType
leftRowType,
- RelDataType rightRowType, JoinRelType joinType, Comparator<Row> comp) {
+ RelDataType rightRowType, JoinRelType joinType, Comparator<Row> comp,
boolean distributed) {
switch (joinType) {
case INNER:
- return new InnerJoin<>(ctx, outputRowType, comp);
+ return new InnerJoin<>(ctx, outputRowType, comp, distributed);
case LEFT: {
RowHandler.RowFactory<Row> rightRowFactory =
ctx.rowHandler().factory(ctx.getTypeFactory(), rightRowType);
- return new LeftJoin<>(ctx, outputRowType, comp,
rightRowFactory);
+ return new LeftJoin<>(ctx, outputRowType, comp, distributed,
rightRowFactory);
}
case RIGHT: {
RowHandler.RowFactory<Row> leftRowFactory =
ctx.rowHandler().factory(ctx.getTypeFactory(), leftRowType);
- return new RightJoin<>(ctx, outputRowType, comp,
leftRowFactory);
+ return new RightJoin<>(ctx, outputRowType, comp, distributed,
leftRowFactory);
}
case FULL: {
RowHandler.RowFactory<Row> leftRowFactory =
ctx.rowHandler().factory(ctx.getTypeFactory(), leftRowType);
RowHandler.RowFactory<Row> rightRowFactory =
ctx.rowHandler().factory(ctx.getTypeFactory(), rightRowType);
- return new FullOuterJoin<>(ctx, outputRowType, comp,
leftRowFactory, rightRowFactory);
+ return new FullOuterJoin<>(ctx, outputRowType, comp,
distributed, leftRowFactory, rightRowFactory);
}
case SEMI:
- return new SemiJoin<>(ctx, outputRowType, comp);
+ return new SemiJoin<>(ctx, outputRowType, comp, distributed);
case ANTI:
- return new AntiJoin<>(ctx, outputRowType, comp);
+ return new AntiJoin<>(ctx, outputRowType, comp, distributed);
default:
throw new IllegalStateException("Join type \"" + joinType +
"\" is not supported yet");
@@ -263,9 +279,10 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
* @param ctx Execution context.
* @param rowType Row type.
* @param comp Join expression comparator.
+ * @param distributed If one of the inputs has exchange underneath.
*/
- public InnerJoin(ExecutionContext<Row> ctx, RelDataType rowType,
Comparator<Row> comp) {
- super(ctx, rowType, comp);
+ public InnerJoin(ExecutionContext<Row> ctx, RelDataType rowType,
Comparator<Row> comp, boolean distributed) {
+ super(ctx, rowType, comp, distributed);
}
/** {@inheritDoc} */
@@ -383,10 +400,8 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
if (requested > 0 && ((waitingLeft == NOT_WAITING && left == null
&& leftInBuf.isEmpty())
|| (waitingRight == NOT_WAITING && right == null &&
rightInBuf.isEmpty() && rightMaterialization == null))
- ) {
- requested = 0;
- downstream().end();
- }
+ )
+ checkJoinFinished();
}
}
@@ -417,10 +432,17 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
* @param ctx Execution context.
* @param rowType Row type.
* @param comp Join expression comparator.
+ * @param distributed If one of the inputs has exchange underneath.
* @param rightRowFactory Right row factory.
*/
- public LeftJoin(ExecutionContext<Row> ctx, RelDataType rowType,
Comparator<Row> comp, RowHandler.RowFactory<Row> rightRowFactory) {
- super(ctx, rowType, comp);
+ public LeftJoin(
+ ExecutionContext<Row> ctx,
+ RelDataType rowType,
+ Comparator<Row> comp,
+ boolean distributed,
+ RowHandler.RowFactory<Row> rightRowFactory
+ ) {
+ super(ctx, rowType, comp, distributed);
this.rightRowFactory = rightRowFactory;
}
@@ -561,10 +583,8 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
if (waitingLeft == 0)
leftSource().request(waitingLeft = IN_BUFFER_SIZE);
- if (requested > 0 && waitingLeft == NOT_WAITING && left == null &&
leftInBuf.isEmpty()) {
- requested = 0;
- downstream().end();
- }
+ if (requested > 0 && waitingLeft == NOT_WAITING && left == null &&
leftInBuf.isEmpty())
+ checkJoinFinished();
}
}
@@ -595,10 +615,17 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
* @param ctx Execution context.
* @param rowType Row type.
* @param comp Join expression comparator.
+ * @param distributed If one of the inputs has exchange underneath.
* @param leftRowFactory Left row factory.
*/
- public RightJoin(ExecutionContext<Row> ctx, RelDataType rowType,
Comparator<Row> comp, RowHandler.RowFactory<Row> leftRowFactory) {
- super(ctx, rowType, comp);
+ public RightJoin(
+ ExecutionContext<Row> ctx,
+ RelDataType rowType,
+ Comparator<Row> comp,
+ boolean distributed,
+ RowHandler.RowFactory<Row> leftRowFactory
+ ) {
+ super(ctx, rowType, comp, distributed);
this.leftRowFactory = leftRowFactory;
}
@@ -748,10 +775,8 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
if (waitingLeft == 0)
leftSource().request(waitingLeft = IN_BUFFER_SIZE);
- if (requested > 0 && waitingRight == NOT_WAITING && right == null
&& rightInBuf.isEmpty() && rightMaterialization == null) {
- requested = 0;
- downstream().end();
- }
+ if (requested > 0 && waitingRight == NOT_WAITING && right == null
&& rightInBuf.isEmpty() && rightMaterialization == null)
+ checkJoinFinished();
}
}
@@ -788,12 +813,19 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
* @param ctx Execution context.
* @param rowType Row type.
* @param comp Join expression comparator.
+ * @param distributed If one of the inputs has exchange underneath.
* @param leftRowFactory Left row factory.
* @param rightRowFactory Right row factory.
*/
- public FullOuterJoin(ExecutionContext<Row> ctx, RelDataType rowType,
Comparator<Row> comp,
- RowHandler.RowFactory<Row> leftRowFactory,
RowHandler.RowFactory<Row> rightRowFactory) {
- super(ctx, rowType, comp);
+ public FullOuterJoin(
+ ExecutionContext<Row> ctx,
+ RelDataType rowType,
+ Comparator<Row> comp,
+ boolean distributed,
+ RowHandler.RowFactory<Row> leftRowFactory,
+ RowHandler.RowFactory<Row> rightRowFactory
+ ) {
+ super(ctx, rowType, comp, distributed);
this.leftRowFactory = leftRowFactory;
this.rightRowFactory = rightRowFactory;
@@ -976,10 +1008,8 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
if (requested > 0 && waitingLeft == NOT_WAITING && left == null &&
leftInBuf.isEmpty()
&& waitingRight == NOT_WAITING && right == null &&
rightInBuf.isEmpty() && rightMaterialization == null
- ) {
- requested = 0;
- downstream().end();
- }
+ )
+ checkJoinFinished();
}
}
@@ -995,9 +1025,10 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
* @param ctx Execution context.
* @param rowType Row type.
* @param comp Join expression comparator.
+ * @param distributed If one of the inputs has exchange underneath.
*/
- public SemiJoin(ExecutionContext<Row> ctx, RelDataType rowType,
Comparator<Row> comp) {
- super(ctx, rowType, comp);
+ public SemiJoin(ExecutionContext<Row> ctx, RelDataType rowType,
Comparator<Row> comp, boolean distributed) {
+ super(ctx, rowType, comp, distributed);
}
/** {@inheritDoc} */
@@ -1052,10 +1083,8 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
if (requested > 0 && ((waitingLeft == NOT_WAITING && left == null
&& leftInBuf.isEmpty()
|| (waitingRight == NOT_WAITING && right == null &&
rightInBuf.isEmpty())))
- ) {
- requested = 0;
- downstream().end();
- }
+ )
+ checkJoinFinished();
}
}
@@ -1071,9 +1100,10 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
* @param ctx Execution context.
* @param rowType Row type.
* @param comp Join expression comparator.
+ * @param distributed If one of the inputs has exchange underneath.
*/
- public AntiJoin(ExecutionContext<Row> ctx, RelDataType rowType,
Comparator<Row> comp) {
- super(ctx, rowType, comp);
+ public AntiJoin(ExecutionContext<Row> ctx, RelDataType rowType,
Comparator<Row> comp, boolean distributed) {
+ super(ctx, rowType, comp, distributed);
}
/** {@inheritDoc} */
@@ -1129,10 +1159,8 @@ public abstract class MergeJoinNode<Row> extends
AbstractNode<Row> {
if (waitingLeft == 0)
leftSource().request(waitingLeft = IN_BUFFER_SIZE);
- if (requested > 0 && waitingLeft == NOT_WAITING && left == null &&
leftInBuf.isEmpty()) {
- requested = 0;
- downstream().end();
- }
+ if (requested > 0 && waitingLeft == NOT_WAITING && left == null &&
leftInBuf.isEmpty())
+ checkJoinFinished();
}
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
index a52b0996f99..4491028d0be 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ModifyNode.java
@@ -216,11 +216,11 @@ public class ModifyNode<Row> extends AbstractNode<Row>
implements SingleNode<Row
private IgniteSQLException conflictKeysException(List<Object>
conflictKeys) {
if (op == TableModify.Operation.INSERT) {
return new IgniteSQLException("Failed to INSERT some keys because
they are already in cache. " +
- "[keys=" + conflictKeys + ']', DUPLICATE_KEY);
+ "[cache=" + desc.cacheContext().name() + ", keys=" +
conflictKeys + ']', DUPLICATE_KEY);
}
else {
return new IgniteSQLException("Failed to MERGE some keys due to
keys conflict or concurrent updates. " +
- "[keys=" + conflictKeys + ']', CONCURRENT_UPDATE);
+ "[cache=" + desc.cacheContext().name() + ", keys=" +
conflictKeys + ']', CONCURRENT_UPDATE);
}
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
index 8903956bf98..3bad2abfc7f 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractExecutionTest.java
@@ -48,6 +48,7 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry;
import
org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistryImpl;
import
org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutor;
import
org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl;
+import
org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpIoTracker;
import
org.apache.ignite.internal.processors.query.calcite.exec.tracker.NoOpMemoryTracker;
import
org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
@@ -55,6 +56,7 @@ import
org.apache.ignite.internal.processors.query.calcite.message.MessageServic
import
org.apache.ignite.internal.processors.query.calcite.message.TestIoManager;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import
org.apache.ignite.internal.processors.query.calcite.prepare.BaseQueryContext;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
@@ -159,6 +161,8 @@ public class AbstractExecutionTest extends
GridCommonAbstractTest {
for (UUID uuid : nodes) {
GridTestKernalContext kernal = newContext();
+ kernal.add(new GridTimeoutProcessor(kernal));
+
QueryTaskExecutorImpl taskExecutor = new
QueryTaskExecutorImpl(kernal);
taskExecutor.stripedThreadPoolExecutor(new
IgniteTestStripedThreadPoolExecutor(
execStgy,
@@ -185,6 +189,7 @@ public class AbstractExecutionTest extends
GridCommonAbstractTest {
exchangeSvc.messageService(msgSvc);
exchangeSvc.mailboxRegistry(mailboxRegistry);
exchangeSvc.queryRegistry(new QueryRegistryImpl(kernal));
+ exchangeSvc.timeoutService(new TimeoutServiceImpl(kernal));
exchangeSvc.init();
exchangeServices.put(uuid, exchangeSvc);
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
index c669a58de38..e201c5cf2f4 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/ExecutionTest.java
@@ -549,55 +549,58 @@ public class ExecutionTest extends AbstractExecutionTest {
for (int leftSize : sizes) {
for (int rightSize : sizes) {
- log.info("Check: leftSize=" + leftSize + ", rightSize=" +
rightSize);
-
- ScanNode<Object[]> left = new ScanNode<>(ctx, rowType, new
TestTable(leftSize, rowType));
- ScanNode<Object[]> right = new ScanNode<>(ctx, rowType, new
TestTable(rightSize, rowType));
-
- RelDataType joinRowType = TypeUtils.createRowType(
- tf,
- int.class, String.class, int.class,
- int.class, String.class, int.class);
-
- MergeJoinNode<Object[]> join = MergeJoinNode.create(
- ctx,
- joinRowType,
- null,
- null,
- INNER,
- (r1, r2) -> {
- Object o1 = r1[0];
- Object o2 = r2[0];
-
- if (o1 == null || o2 == null) {
- if (o1 != null)
- return 1;
- else if (o2 != null)
- return -1;
- else
- return 0;
- }
-
- return Integer.compare((Integer)o1, (Integer)o2);
+ for (boolean distr : new boolean[] {false, true}) {
+ log.info("Check: leftSize=" + leftSize + ", rightSize=" +
rightSize + ", distributed=" + distr);
+
+ ScanNode<Object[]> left = new ScanNode<>(ctx, rowType, new
TestTable(leftSize, rowType));
+ ScanNode<Object[]> right = new ScanNode<>(ctx, rowType,
new TestTable(rightSize, rowType));
+
+ RelDataType joinRowType = TypeUtils.createRowType(
+ tf,
+ int.class, String.class, int.class,
+ int.class, String.class, int.class);
+
+ MergeJoinNode<Object[]> join = MergeJoinNode.create(
+ ctx,
+ joinRowType,
+ null,
+ null,
+ INNER,
+ (r1, r2) -> {
+ Object o1 = r1[0];
+ Object o2 = r2[0];
+
+ if (o1 == null || o2 == null) {
+ if (o1 != null)
+ return 1;
+ else if (o2 != null)
+ return -1;
+ else
+ return 0;
+ }
+
+ return Integer.compare((Integer)o1, (Integer)o2);
+ },
+ distr
+ );
+
+ join.register(Arrays.asList(left, right));
+
+ RootNode<Object[]> root = new RootNode<>(ctx, joinRowType);
+ root.register(join);
+
+ int cnt = 0;
+ while (root.hasNext()) {
+ root.next();
+
+ cnt++;
}
- );
-
- join.register(Arrays.asList(left, right));
- RootNode<Object[]> root = new RootNode<>(ctx, joinRowType);
- root.register(join);
-
- int cnt = 0;
- while (root.hasNext()) {
- root.next();
-
- cnt++;
+ assertEquals(
+ "Invalid result size. [left=" + leftSize + ", right="
+ rightSize + ", results=" + cnt,
+ min(leftSize, rightSize),
+ cnt);
}
-
- assertEquals(
- "Invalid result size. [left=" + leftSize + ", right=" +
rightSize + ", results=" + cnt,
- min(leftSize, rightSize),
- cnt);
}
}
}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinExecutionTest.java
index 5f49702b468..c7304838909 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinExecutionTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MergeJoinExecutionTest.java
@@ -27,7 +27,6 @@ import org.apache.calcite.rel.type.RelDataType;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -42,7 +41,6 @@ import static org.hamcrest.core.IsEqual.equalTo;
/** */
@SuppressWarnings("TypeMayBeWeakened")
-@WithSystemProperty(key = "calcite.debug", value = "true")
public class MergeJoinExecutionTest extends AbstractExecutionTest {
/** */
public static final Object[][] EMPTY = new Object[0][];
@@ -385,6 +383,20 @@ public class MergeJoinExecutionTest extends
AbstractExecutionTest {
* @param expRes Expected result.
*/
private void verifyJoin(Object[][] left, Object[][] right, JoinRelType
joinType, Object[][] expRes) {
+ verifyJoin(left, right, joinType, expRes, false);
+ verifyJoin(left, right, joinType, expRes, true);
+ }
+
+ /**
+ * Creates execution tree and executes it. Then compares the result of the
execution with the given one.
+ *
+ * @param left Data for left table.
+ * @param right Data for right table.
+ * @param joinType Join type.
+ * @param expRes Expected result.
+ * @param distr Distributed.
+ */
+ private void verifyJoin(Object[][] left, Object[][] right, JoinRelType
joinType, Object[][] expRes, boolean distr) {
ExecutionContext<Object[]> ctx = executionContext(F.first(nodes()),
UUID.randomUUID(), 0);
RelDataType leftType = TypeUtils.createRowType(ctx.getTypeFactory(),
int.class, String.class, Integer.class);
@@ -413,7 +425,7 @@ public class MergeJoinExecutionTest extends
AbstractExecutionTest {
}
return Integer.compare((Integer)o1, (Integer)o2);
- });
+ }, distr);
join.register(F.asList(leftNode, rightNode));
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
index 111416f0da8..eed1ae56e85 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AbstractBasicIntegrationTest.java
@@ -47,17 +47,16 @@ import
org.apache.ignite.internal.processors.query.calcite.schema.IgniteTable;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
-import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
+import static
org.apache.ignite.internal.processors.query.calcite.exec.ExchangeServiceImpl.INBOX_INITIALIZATION_TIMEOUT;
import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/**
*
*/
-@WithSystemProperty(key = "calcite.debug", value = "false")
public class AbstractBasicIntegrationTest extends GridCommonAbstractTest {
/** */
protected static final Object[] NULL_RESULT = new Object[] { null };
@@ -83,11 +82,20 @@ public class AbstractBasicIntegrationTest extends
GridCommonAbstractTest {
assertTrue("Not finished queries found on client", waitForCondition(
() ->
queryProcessor(client).queryRegistry().runningQueries().isEmpty(), 1_000L));
+ waitForCondition(() -> {
+ for (Ignite ign : G.allGrids()) {
+ if (!queryProcessor(ign).mailboxRegistry().inboxes().isEmpty())
+ return false;
+ }
+
+ return true;
+ }, INBOX_INITIALIZATION_TIMEOUT * 2);
+
for (Ignite ign : G.allGrids()) {
for (String cacheName : ign.cacheNames())
ign.destroyCache(cacheName);
- CalciteQueryProcessor qryProc = queryProcessor(((IgniteEx)ign));
+ CalciteQueryProcessor qryProc = queryProcessor(ign);
assertEquals("Not finished queries found [ignite=" + ign.name() +
']',
0, qryProc.queryRegistry().runningQueries().size());
@@ -95,6 +103,12 @@ public class AbstractBasicIntegrationTest extends
GridCommonAbstractTest {
ExecutionServiceImpl<Object[]> execSvc =
(ExecutionServiceImpl<Object[]>)qryProc.executionService();
assertEquals("Tracked memory must be 0 after test [ignite=" +
ign.name() + ']',
0, execSvc.memoryTracker().allocated());
+
+ assertEquals("Count of inboxes must be 0 after test [ignite=" +
ign.name() + ']',
+ 0, qryProc.mailboxRegistry().inboxes().size());
+
+ assertEquals("Count of outboxes must be 0 after test [ignite=" +
ign.name() + ']',
+ 0, qryProc.mailboxRegistry().outboxes().size());
}
awaitPartitionMapExchange();
@@ -189,8 +203,8 @@ public class AbstractBasicIntegrationTest extends
GridCommonAbstractTest {
}
/** */
- protected CalciteQueryProcessor queryProcessor(IgniteEx ignite) {
- return Commons.lookupComponent(ignite.context(),
CalciteQueryProcessor.class);
+ protected CalciteQueryProcessor queryProcessor(Ignite ignite) {
+ return Commons.lookupComponent(((IgniteEx)ignite).context(),
CalciteQueryProcessor.class);
}
/** */
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
index f66c495149f..093acb259fc 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/IndexScanlIntegrationTest.java
@@ -419,7 +419,7 @@ public class IndexScanlIntegrationTest extends
AbstractBasicIntegrationTest {
RowCountingIndex idx = null;
for (Ignite ignite : G.allGrids()) {
- IgniteTable tbl =
(IgniteTable)queryProcessor((IgniteEx)ignite).schemaHolder().schema("PUBLIC").getTable(tableName);
+ IgniteTable tbl =
(IgniteTable)queryProcessor(ignite).schemaHolder().schema("PUBLIC").getTable(tableName);
if (ignite == node) {
idx = new RowCountingIndex(tbl.getIndex(idxName));
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinRehashIntegrationTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinRehashIntegrationTest.java
new file mode 100644
index 00000000000..ad7fc45434f
--- /dev/null
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/JoinRehashIntegrationTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.math.BigDecimal;
+import org.apache.ignite.internal.processors.query.calcite.QueryChecker;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/** */
+public class JoinRehashIntegrationTest extends AbstractBasicIntegrationTest {
+ /** {@inheritDoc} */
+ @Override protected int nodeCount() {
+ return 3;
+ }
+
+ /** Test that resources (in particular inboxes) are cleaned up after
executing join with rehashing. */
+ @Test
+ public void testResourceCleanup() throws Exception {
+ sql("CREATE TABLE order_items (\n" +
+ " id varchar,\n" +
+ " orderId int,\n" +
+ " price decimal,\n" +
+ " amount int,\n" +
+ " PRIMARY KEY (id))\n" +
+ " WITH \"cache_name=order_items,backups=1\"");
+
+ sql("CREATE TABLE orders (\n" +
+ " id int,\n" +
+ " region varchar,\n" +
+ " PRIMARY KEY (id))\n" +
+ " WITH \"cache_name=orders,backups=1\"");
+
+ sql("CREATE INDEX order_items_orderId ON order_items (orderId ASC)");
+ sql("CREATE INDEX orders_region ON orders (region ASC)");
+
+ for (int i = 0; i < 30; i++) {
+ sql("INSERT INTO orders VALUES(?, ?)", i, "region" + i % 10);
+ for (int j = 0; j < 20; j++)
+ sql("INSERT INTO order_items VALUES(?, ?, ?, ?)", i + "_" + j,
i, i / 10.0, j % 10);
+ }
+
+ String sql = "SELECT sum(i.price * i.amount)" +
+ " FROM order_items i JOIN orders o ON o.id=i.orderId" +
+ " WHERE o.region = ?";
+
+ assertQuery(sql)
+ .withParams("region0")
+ .matches(QueryChecker.containsSubPlan("IgniteMergeJoin"))
+ .returns(BigDecimal.valueOf(270))
+ .check();
+
+ // Here we only start queries and wait for result, actual resource
clean up is checked by
+ // AbstractBasicIntegrationTest.afterTest method.
+ GridTestUtils.runMultiThreaded(() -> {
+ for (int i = 0; i < 100; i++)
+ sql(sql, i % 10);
+ }, 10, "query_starter");
+ }
+}
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index c71493ce606..f6fc3c82b69 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -37,6 +37,7 @@ import
org.apache.ignite.internal.processors.query.calcite.integration.IndexScan
import
org.apache.ignite.internal.processors.query.calcite.integration.IndexSpoolIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.IntervalTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.JoinIntegrationTest;
+import
org.apache.ignite.internal.processors.query.calcite.integration.JoinRehashIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.KeepBinaryIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.KillCommandDdlIntegrationTest;
import
org.apache.ignite.internal.processors.query.calcite.integration.KillQueryCommandDdlIntegrationTest;
@@ -126,9 +127,8 @@ import org.junit.runners.Suite;
DynamicParametersIntegrationTest.class,
ExpiredEntriesIntegrationTest.class,
TimeoutIntegrationTest.class,
-
- // Partition pruning
- PartitionPruneTest.class
+ PartitionPruneTest.class,
+ JoinRehashIntegrationTest.class,
})
public class IntegrationTestSuite {
}