This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4c8ad63012 IGNITE-18580 Sql. Redesign the Exchange to use a pull-based
approach (#1553)
4c8ad63012 is described below
commit 4c8ad6301258610849566cf384fd48e4d6eede02
Author: korlov42 <[email protected]>
AuthorDate: Tue Jan 24 16:42:31 2023 +0200
IGNITE-18580 Sql. Redesign the Exchange to use a pull-based approach (#1553)
---
.../internal/sql/engine/SqlQueryProcessor.java | 2 -
.../internal/sql/engine/exec/ExchangeService.java | 15 +-
.../sql/engine/exec/ExchangeServiceImpl.java | 108 +++----
.../sql/engine/exec/ExecutionServiceImpl.java | 4 +-
.../sql/engine/exec/LogicalRelImplementor.java | 9 +-
.../internal/sql/engine/exec/MailboxRegistry.java | 20 +-
.../sql/engine/exec/MailboxRegistryImpl.java | 40 +--
.../internal/sql/engine/exec/rel/AbstractNode.java | 8 +-
.../sql/engine/exec/rel/AsyncRootNode.java | 120 +++-----
.../ignite/internal/sql/engine/exec/rel/Inbox.java | 340 ++++++++++++---------
.../internal/sql/engine/exec/rel/Outbox.java | 289 +++++++++++-------
...eMessage.java => QueryBatchRequestMessage.java} | 17 +-
.../sql/engine/message/SqlQueryMessageGroup.java | 3 +-
.../ignite/internal/sql/engine/util/Commons.java | 3 +
.../sql/engine/benchmarks/SqlBenchmark.java | 43 +--
.../sql/engine/exec/ExecutionServiceImplTest.java | 4 +-
.../sql/engine/exec/rel/AbstractExecutionTest.java | 5 +-
.../sql/engine/exec/rel/ExchangeExecutionTest.java | 238 +++++++++++++++
...sterService.java => ClusterServiceFactory.java} | 12 +-
.../sql/engine/framework/DataProvider.java | 42 +++
.../sql/engine/framework/TestBuilders.java | 107 ++++++-
.../internal/sql/engine/framework/TestNode.java | 2 +-
22 files changed, 916 insertions(+), 515 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index eecaf29519..2e383ec123 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -199,8 +199,6 @@ public class SqlQueryProcessor implements QueryProcessor {
));
var exchangeService = registerService(new ExchangeServiceImpl(
- clusterSrvc.topologyService().localMember(),
- taskExecutor,
mailboxRegistry,
msgSrvc
));
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
index 3724784dae..12b58de40f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
@@ -41,15 +41,16 @@ public interface ExchangeService extends LifecycleAware {
List<RowT> rows) throws IgniteInternalCheckedException;
/**
- * Acknowledges a batch with given ID is processed.
+ * Requests batches from remote source.
*
- * @param nodeName Node consistent ID to notify.
- * @param qryId Query ID.
- * @param fragmentId Target fragment ID.
- * @param exchangeId Exchange ID.
- * @param batchId Batch ID.
+ * @param nodeName A consistent identifier of the node to request from.
+ * @param queryId An identifier of the query.
+ * @param fragmentId An identifier of the fragment to request from.
+ * @param exchangeId An identifier of the exchange to request from.
+ * @param amountOfBatches A count of batches to request.
*/
- void acknowledge(String nodeName, UUID qryId, long fragmentId, long
exchangeId, int batchId) throws IgniteInternalCheckedException;
+ void request(String nodeName, UUID queryId, long fragmentId, long
exchangeId, int amountOfBatches)
+ throws IgniteInternalCheckedException;
/**
* Sends cancel request.
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
index 5f26aa2585..39c7f248a3 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
@@ -20,72 +20,64 @@ package org.apache.ignite.internal.sql.engine.exec;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static org.apache.ignite.lang.ErrorGroups.Common.UNEXPECTED_ERR;
-import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
import org.apache.ignite.internal.sql.engine.message.InboxCloseMessage;
import org.apache.ignite.internal.sql.engine.message.MessageService;
-import
org.apache.ignite.internal.sql.engine.message.QueryBatchAcknowledgeMessage;
import org.apache.ignite.internal.sql.engine.message.QueryBatchMessage;
+import org.apache.ignite.internal.sql.engine.message.QueryBatchRequestMessage;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessageGroup;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
-import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
-import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.network.ClusterNode;
/**
- * ExchangeServiceImpl. TODO Documentation
https://issues.apache.org/jira/browse/IGNITE-15859
+ * Message-based implementation of {@link ExchangeService} interface.
+ *
+ * <p>Provides simple methods of interaction with the mailbox, hiding all the
machinery to send and receive messages.
*/
public class ExchangeServiceImpl implements ExchangeService {
private static final IgniteLogger LOG =
Loggers.forClass(ExchangeServiceImpl.class);
-
private static final SqlQueryMessagesFactory FACTORY = new
SqlQueryMessagesFactory();
- private final ClusterNode localNode;
-
- private final QueryTaskExecutor taskExecutor;
-
private final MailboxRegistry mailboxRegistry;
-
- private final MessageService msgSrvc;
+ private final MessageService messageService;
/**
- * Constructor. TODO Documentation
https://issues.apache.org/jira/browse/IGNITE-15859
+ * Creates the object.
+ *
+ * @param mailboxRegistry A registry of mailboxes created on the node.
+ * @param messageService A messaging service to exchange messages between
mailboxes.
*/
public ExchangeServiceImpl(
- ClusterNode localNode,
- QueryTaskExecutor taskExecutor,
MailboxRegistry mailboxRegistry,
- MessageService msgSrvc
+ MessageService messageService
) {
- this.localNode = localNode;
- this.taskExecutor = taskExecutor;
this.mailboxRegistry = mailboxRegistry;
- this.msgSrvc = msgSrvc;
+ this.messageService = messageService;
}
/** {@inheritDoc} */
@Override
public void start() {
- msgSrvc.register((n, m) -> onMessage(n, (InboxCloseMessage) m),
SqlQueryMessageGroup.INBOX_CLOSE_MESSAGE);
- msgSrvc.register((n, m) -> onMessage(n, (QueryBatchAcknowledgeMessage)
m), SqlQueryMessageGroup.QUERY_BATCH_ACK);
- msgSrvc.register((n, m) -> onMessage(n, (QueryBatchMessage) m),
SqlQueryMessageGroup.QUERY_BATCH_MESSAGE);
+ messageService.register((n, m) -> onMessage(n, (InboxCloseMessage) m),
SqlQueryMessageGroup.INBOX_CLOSE_MESSAGE);
+ messageService.register((n, m) -> onMessage(n,
(QueryBatchRequestMessage) m), SqlQueryMessageGroup.QUERY_BATCH_REQUEST);
+ messageService.register((n, m) -> onMessage(n, (QueryBatchMessage) m),
SqlQueryMessageGroup.QUERY_BATCH_MESSAGE);
}
/** {@inheritDoc} */
@Override
public <RowT> void sendBatch(String nodeName, UUID qryId, long fragmentId,
long exchangeId, int batchId,
boolean last, List<RowT> rows) throws
IgniteInternalCheckedException {
- msgSrvc.send(
+ messageService.send(
nodeName,
FACTORY.queryBatchMessage()
.queryId(qryId)
@@ -100,15 +92,15 @@ public class ExchangeServiceImpl implements
ExchangeService {
/** {@inheritDoc} */
@Override
- public void acknowledge(String nodeName, UUID qryId, long fragmentId, long
exchangeId, int batchId)
+ public void request(String nodeName, UUID queryId, long fragmentId, long
exchangeId, int amountOfBatches)
throws IgniteInternalCheckedException {
- msgSrvc.send(
+ messageService.send(
nodeName,
- FACTORY.queryBatchAcknowledgeMessage()
- .queryId(qryId)
+ FACTORY.queryBatchRequestMessage()
+ .queryId(queryId)
.fragmentId(fragmentId)
.exchangeId(exchangeId)
- .batchId(batchId)
+ .amountOfBatches(amountOfBatches)
.build()
);
}
@@ -116,7 +108,7 @@ public class ExchangeServiceImpl implements ExchangeService
{
/** {@inheritDoc} */
@Override
public void closeQuery(String nodeName, UUID qryId) throws
IgniteInternalCheckedException {
- msgSrvc.send(
+ messageService.send(
nodeName,
FACTORY.queryCloseMessage()
.queryId(qryId)
@@ -127,7 +119,7 @@ public class ExchangeServiceImpl implements ExchangeService
{
/** {@inheritDoc} */
@Override
public void closeInbox(String nodeName, UUID qryId, long fragmentId, long
exchangeId) throws IgniteInternalCheckedException {
- msgSrvc.send(
+ messageService.send(
nodeName,
FACTORY.inboxCloseMessage()
.queryId(qryId)
@@ -140,7 +132,7 @@ public class ExchangeServiceImpl implements ExchangeService
{
/** {@inheritDoc} */
@Override
public void sendError(String nodeName, UUID qryId, long fragmentId,
Throwable err) throws IgniteInternalCheckedException {
- msgSrvc.send(
+ messageService.send(
nodeName,
FACTORY.errorMessage()
.queryId(qryId)
@@ -153,7 +145,7 @@ public class ExchangeServiceImpl implements ExchangeService
{
/** {@inheritDoc} */
@Override
public boolean alive(String nodeName) {
- return msgSrvc.alive(nodeName);
+ return messageService.alive(nodeName);
}
private void onMessage(String nodeName, InboxCloseMessage msg) {
@@ -169,35 +161,29 @@ public class ExchangeServiceImpl implements
ExchangeService {
}
}
- private void onMessage(String nodeName, QueryBatchAcknowledgeMessage msg) {
- Outbox<?> outbox = mailboxRegistry.outbox(msg.queryId(),
msg.exchangeId());
+ private void onMessage(String nodeName, QueryBatchRequestMessage msg) {
+ CompletableFuture<Outbox<?>> outboxFut =
mailboxRegistry.outbox(msg.queryId(), msg.exchangeId());
- if (outbox != null) {
+ Consumer<Outbox<?>> onRequestHandler = outbox -> {
try {
- outbox.onAcknowledge(nodeName, msg.batchId());
+ outbox.onRequest(nodeName, msg.amountOfBatches());
} catch (Throwable e) {
outbox.onError(e);
throw new IgniteInternalException(UNEXPECTED_ERR, "Unexpected
exception", e);
}
- } else if (LOG.isDebugEnabled()) {
- LOG.debug("Stale acknowledge message received: [nodeName={},
queryId={}, fragmentId={}, exchangeId={}, batchId={}]",
- nodeName, msg.queryId(), msg.fragmentId(),
msg.exchangeId(), msg.batchId());
+ };
+
+ if (outboxFut.isDone()) {
+ onRequestHandler.accept(outboxFut.join());
+ } else {
+ outboxFut.thenAccept(onRequestHandler);
}
}
private void onMessage(String nodeName, QueryBatchMessage msg) {
Inbox<?> inbox = mailboxRegistry.inbox(msg.queryId(),
msg.exchangeId());
- if (inbox == null && msg.batchId() == 0) {
- // first message sent before a fragment is built
- // note that an inbox source fragment id is also used as an
exchange id
- Inbox<?> newInbox = new Inbox<>(baseInboxContext(nodeName,
msg.queryId(), msg.fragmentId()),
- this, mailboxRegistry, msg.exchangeId(), msg.exchangeId());
-
- inbox = mailboxRegistry.register(newInbox);
- }
-
if (inbox != null) {
try {
inbox.onBatchReceived(nodeName, msg.batchId(), msg.last(),
Commons.cast(msg.rows()));
@@ -212,28 +198,6 @@ public class ExchangeServiceImpl implements
ExchangeService {
}
}
- /**
- * Get minimal execution context to meet Inbox needs.
- */
- private ExecutionContext<?> baseInboxContext(String nodeName, UUID qryId,
long fragmentId) {
- return new ExecutionContext<>(
- BaseQueryContext.builder()
- .logger(LOG)
- .build(),
- taskExecutor,
- qryId,
- localNode,
- nodeName,
- new FragmentDescription(
- fragmentId,
- null,
- null,
- Long2ObjectMaps.emptyMap()),
- null,
- Map.of(),
- null);
- }
-
/** {@inheritDoc} */
@Override
public void stop() {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index ff96768d28..bc792f0c04 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -511,6 +511,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
});
node.onRegister(rootNode);
+ rootNode.prefetch();
+
root.complete(rootNode);
}
@@ -527,7 +529,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
}
if (node instanceof Outbox) {
- ((Outbox<?>) node).init();
+ ((Outbox<?>) node).prefetch();
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index 18cdd49fe7..81ad1df7aa 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -587,12 +587,11 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
/** {@inheritDoc} */
@Override
public Node<RowT> visit(IgniteReceiver rel) {
- Inbox<RowT> inbox = mailboxRegistry.register(
- new Inbox<>(ctx, exchangeSvc, mailboxRegistry,
rel.exchangeId(), rel.sourceFragmentId()));
+ Inbox<RowT> inbox = new Inbox<>(ctx, exchangeSvc, mailboxRegistry,
+ ctx.remotes(rel.exchangeId()),
expressionFactory.comparator(rel.collation()),
+ rel.exchangeId(), rel.sourceFragmentId());
- // here may be an already created (to consume rows from remote nodes)
inbox
- // without proper context, we need to init it with a right one.
- inbox.init(ctx, ctx.remotes(rel.exchangeId()),
expressionFactory.comparator(rel.collation()));
+ mailboxRegistry.register(inbox);
return inbox;
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistry.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistry.java
index f0ec828de3..eed6a5d0ba 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistry.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistry.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.exec;
import java.util.Collection;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
import org.jetbrains.annotations.Nullable;
@@ -29,12 +30,11 @@ import org.jetbrains.annotations.Nullable;
*/
public interface MailboxRegistry extends LifecycleAware {
/**
- * Tries to register and inbox node and returns it if success or returns
previously registered inbox otherwise.
+ * Registers an inbox.
*
- * @param inbox Inbox.
- * @return Registered inbox.
+ * @param inbox Inbox to register.
*/
- <T> Inbox<T> register(Inbox<T> inbox);
+ void register(Inbox<?> inbox);
/**
* Registers an outbox.
@@ -64,7 +64,7 @@ public interface MailboxRegistry extends LifecycleAware {
* @param exchangeId Exchange ID.
* @return Registered outbox. May be {@code null} if execution was
cancelled.
*/
- Outbox<?> outbox(UUID qryId, long exchangeId);
+ CompletableFuture<Outbox<?>> outbox(UUID qryId, long exchangeId);
/**
* Returns a registered inbox by provided query ID, exchange ID pair.
@@ -84,14 +84,4 @@ public interface MailboxRegistry extends LifecycleAware {
* @return Registered inboxes.
*/
Collection<Inbox<?>> inboxes(@Nullable UUID qryId, long fragmentId, long
exchangeId);
-
- /**
- * Returns all registered outboxes for provided query ID.
- *
- * @param qryId Query ID. {@code null} means return outboxes with any
query id.
- * @param fragmentId Fragment Id. {@code -1} means return outboxes with
any fragment id.
- * @param exchangeId Exchange Id. {@code -1} means return outboxes with
any exchange id.
- * @return Registered outboxes.
- */
- Collection<Outbox<?>> outboxes(@Nullable UUID qryId, long fragmentId, long
exchangeId);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
index 2f00be6d6b..aa939b9769 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
@@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -43,7 +44,7 @@ public class MailboxRegistryImpl implements MailboxRegistry,
TopologyEventHandle
private static final Predicate<Mailbox<?>> ALWAYS_TRUE = o -> true;
- private final Map<MailboxKey, Outbox<?>> locals;
+ private final Map<MailboxKey, CompletableFuture<Outbox<?>>> locals;
private final Map<MailboxKey, Inbox<?>> remotes;
@@ -64,30 +65,29 @@ public class MailboxRegistryImpl implements
MailboxRegistry, TopologyEventHandle
/** {@inheritDoc} */
@Override
- public <T> Inbox<T> register(Inbox<T> inbox) {
- Inbox<T> old = (Inbox<T>) remotes.putIfAbsent(new
MailboxKey(inbox.queryId(), inbox.exchangeId()), inbox);
+ public void register(Inbox<?> inbox) {
+ Inbox<?> res = remotes.putIfAbsent(new MailboxKey(inbox.queryId(),
inbox.exchangeId()), inbox);
if (LOG.isTraceEnabled()) {
- if (old != null) {
- LOG.trace("Inbox already registered [qryId={},
fragmentId={}]", inbox.queryId(), inbox.fragmentId());
- } else {
- LOG.trace("Inbox registered [qryId={}, fragmentId={}]",
inbox.queryId(), inbox.fragmentId());
- }
+ LOG.trace("Inbox registered [qryId={}, fragmentId={}]",
inbox.queryId(), inbox.fragmentId());
}
- return old != null ? old : inbox;
+ assert res == null : res;
}
/** {@inheritDoc} */
@Override
public void register(Outbox<?> outbox) {
- Outbox<?> res = locals.put(new MailboxKey(outbox.queryId(),
outbox.exchangeId()), outbox);
+ CompletableFuture<Outbox<?>> res = locals.computeIfAbsent(new
MailboxKey(outbox.queryId(), outbox.exchangeId()),
+ k -> new CompletableFuture<>());
+
+ assert !res.isDone();
+
+ res.complete(outbox);
if (LOG.isTraceEnabled()) {
LOG.trace("Outbox registered [qryId={}, fragmentId={}]",
outbox.queryId(), outbox.fragmentId());
}
-
- assert res == null : res;
}
/** {@inheritDoc} */
@@ -104,7 +104,7 @@ public class MailboxRegistryImpl implements
MailboxRegistry, TopologyEventHandle
/** {@inheritDoc} */
@Override
public void unregister(Outbox<?> outbox) {
- boolean removed = locals.remove(new MailboxKey(outbox.queryId(),
outbox.exchangeId()), outbox);
+ boolean removed = locals.remove(new MailboxKey(outbox.queryId(),
outbox.exchangeId())) != null;
if (LOG.isTraceEnabled()) {
LOG.trace("Outbox {} unregistered [qryId={}, fragmentId={}]",
removed ? "was" : "wasn't",
@@ -114,8 +114,8 @@ public class MailboxRegistryImpl implements
MailboxRegistry, TopologyEventHandle
/** {@inheritDoc} */
@Override
- public Outbox<?> outbox(UUID qryId, long exchangeId) {
- return locals.get(new MailboxKey(qryId, exchangeId));
+ public CompletableFuture<Outbox<?>> outbox(UUID qryId, long exchangeId) {
+ return locals.computeIfAbsent(new MailboxKey(qryId, exchangeId), k ->
new CompletableFuture<>());
}
/** {@inheritDoc} */
@@ -132,14 +132,6 @@ public class MailboxRegistryImpl implements
MailboxRegistry, TopologyEventHandle
.collect(Collectors.toList());
}
- /** {@inheritDoc} */
- @Override
- public Collection<Outbox<?>> outboxes(@Nullable UUID qryId, long
fragmentId, long exchangeId) {
- return locals.values().stream()
- .filter(makeFilter(qryId, fragmentId, exchangeId))
- .collect(Collectors.toList());
- }
-
private static Predicate<Mailbox<?>> makeFilter(@Nullable UUID qryId, long
fragmentId, long exchangeId) {
Predicate<Mailbox<?>> filter = ALWAYS_TRUE;
if (qryId != null) {
@@ -177,7 +169,7 @@ public class MailboxRegistryImpl implements
MailboxRegistry, TopologyEventHandle
/** {@inheritDoc} */
@Override
public void onDisappeared(ClusterNode member) {
- locals.values().forEach(n -> n.onNodeLeft(member.name()));
+ locals.values().forEach(fut -> fut.thenAccept(n ->
n.onNodeLeft(member.name())));
remotes.values().forEach(n -> n.onNodeLeft(member.name()));
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
index 811095d789..e7c7bd5a5a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractNode.java
@@ -34,13 +34,13 @@ import
org.apache.ignite.lang.IgniteInternalCheckedException;
* Abstract node of execution tree.
*/
public abstract class AbstractNode<RowT> implements Node<RowT> {
- public static final int MODIFY_BATCH_SIZE = 100;
//IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_BATCH_SIZE", 100);
+ public static final int MODIFY_BATCH_SIZE = 100;
- protected static final int IO_BATCH_SIZE = 256;
//IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_SIZE", 256);
+ protected static final int IO_BATCH_SIZE = Commons.IO_BATCH_SIZE;
- protected static final int IO_BATCH_CNT = 4;
//IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IO_BATCH_CNT", 4);
+ protected static final int IO_BATCH_CNT = Commons.IO_BATCH_COUNT;
- protected final int inBufSize = Commons.IN_BUFFER_SIZE;
//IgniteSystemProperties.getInteger("IGNITE_CALCITE_EXEC_IN_BUFFER_SIZE", 2);
+ protected final int inBufSize = Commons.IN_BUFFER_SIZE;
/** For debug purpose. */
private volatile Thread thread;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
index e159e4a79a..f3d0423a32 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/AsyncRootNode.java
@@ -17,18 +17,20 @@
package org.apache.ignite.internal.sql.engine.exec.rel;
+import static
org.apache.ignite.internal.sql.engine.util.Commons.IN_BUFFER_SIZE;
+
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.ignite.internal.sql.engine.AsyncCursor;
import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException;
import org.apache.ignite.sql.CursorClosedException;
-import org.jetbrains.annotations.Nullable;
/**
* An async iterator over the execution tree.
@@ -48,9 +50,11 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
private final Function<InRowT, OutRowT> converter;
+ private final Queue<OutRowT> buff = new ArrayDeque<>(IN_BUFFER_SIZE);
+
private final AtomicBoolean taskScheduled = new AtomicBoolean();
- private final Queue<PendingRequest<OutRowT>> pendingRequests = new
LinkedBlockingQueue<>();
+ private final Queue<PendingRequest<OutRowT>> pendingRequests = new
ConcurrentLinkedQueue<>();
private volatile boolean closed = false;
@@ -61,26 +65,6 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
*/
private int waiting;
- /**
- * Last row that pushed to this downstream.
- *
- * <p>{@link
org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult} requires
information about whether there are more
- * rows available or not. To meet this requirement, we request an extra
row from source at the very first requested batch. Thus,
- * we have an extra row which should be the very first row of the next
batch.
- *
- * <p>Note: this variable should be accessed from an execution task only.
- */
- private @Nullable OutRowT lastRow;
-
- /**
- * Whether the very first batch was already requested or not. See {@link
#lastRow} for details.
- *
- * <p>Note: this variable should be accessed from an execution task only.
- *
- * @see #lastRow
- */
- private boolean firstRequest = true;
-
/**
* Constructor.
*
@@ -97,19 +81,9 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
public void push(InRowT row) throws Exception {
assert waiting > 0;
- waiting--;
-
- var currentReq = pendingRequests.peek();
-
- assert currentReq != null;
-
- if (currentReq.buff.size() < currentReq.requested) {
- currentReq.buff.add(converter.apply(row));
- } else {
- assert waiting == 0;
-
- lastRow = converter.apply(row);
+ buff.add(converter.apply(row));
+ if (--waiting == 0) {
flush();
}
}
@@ -121,16 +95,6 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
waiting = -1;
- var currentReq = pendingRequests.peek();
-
- assert currentReq != null;
-
- if (currentReq.buff.size() < currentReq.requested && lastRow != null) {
- currentReq.buff.add(lastRow);
-
- lastRow = null;
- }
-
flush();
}
@@ -214,20 +178,48 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
return cancelFut.thenApply(Function.identity());
}
- private void flush() {
- var currentReq = pendingRequests.remove();
+ /**
+ * Starts the execution of the fragment and keeps the result in the
intermediate buffer.
+ *
+ * <p>Note: this method must be called by the same thread that will
execute the whole fragment.
+ */
+ public void prefetch() {
+ if (waiting == 0) {
+ try {
+ source.request(waiting = IN_BUFFER_SIZE);
+ } catch (Exception ex) {
+ onError(ex);
+ }
+ }
+ }
+
+ private void flush() throws Exception {
+ // flush may be triggered by prefetching, so let's do nothing in this
case
+ if (pendingRequests.isEmpty()) {
+ return;
+ }
+
+ PendingRequest<OutRowT> currentReq = pendingRequests.peek();
assert currentReq != null;
taskScheduled.set(false);
- currentReq.fut.complete(new BatchedResult<>(currentReq.buff, waiting
!= -1 || lastRow != null));
+ while (!buff.isEmpty() && currentReq.buff.size() <
currentReq.requested) {
+ currentReq.buff.add(buff.remove());
+ }
- boolean hasMoreRow = waiting != -1 || lastRow != null;
+ boolean hasMoreRows = waiting != -1 || !buff.isEmpty();
- if (hasMoreRow) {
- scheduleTask();
- } else {
+ if (currentReq.buff.size() == currentReq.requested || !hasMoreRows) {
+ pendingRequests.remove();
+
+ currentReq.fut.complete(new BatchedResult<>(currentReq.buff,
hasMoreRows));
+ }
+
+ if (waiting == 0) {
+ source.request(waiting = IN_BUFFER_SIZE);
+ } else if (waiting == -1 && buff.isEmpty()) {
closeAsync();
}
}
@@ -237,27 +229,7 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
*/
private void scheduleTask() {
if (!pendingRequests.isEmpty() && taskScheduled.compareAndSet(false,
true)) {
- var nextTask = pendingRequests.peek();
-
- assert nextTask != null;
-
- source.context().execute(() -> {
- // for the very first request we need to request one extra row
- // to be able to determine whether there is more rows or not
- if (firstRequest) {
- waiting = nextTask.requested + 1;
- firstRequest = false;
- } else {
- waiting = nextTask.requested;
-
- assert lastRow != null;
-
- nextTask.buff.add(lastRow);
- lastRow = null;
- }
-
- source.request(waiting);
- }, source::onError);
+ source.context().execute(this::flush, source::onError);
}
}
@@ -277,7 +249,7 @@ public class AsyncRootNode<InRowT, OutRowT> implements
Downstream<InRowT>, Async
*/
private final List<OutRowT> buff;
- public PendingRequest(int requested,
CompletableFuture<BatchedResult<OutRowT>> fut) {
+ private PendingRequest(int requested,
CompletableFuture<BatchedResult<OutRowT>> fut) {
this.requested = requested;
this.fut = fut;
this.buff = new ArrayList<>(requested);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
index c519c9a019..d67264c21e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
@@ -20,25 +20,24 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
import static org.apache.calcite.util.Util.unexpected;
import static org.apache.ignite.lang.ErrorGroups.Sql.NODE_LEFT_ERR;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
-import java.util.stream.Collectors;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
+import org.apache.ignite.internal.sql.engine.exec.rel.Inbox.RemoteSource.State;
import org.apache.ignite.lang.IgniteInternalCheckedException;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
- * A part of exchange.
+ * A part of exchange which receives batches from remote sources.
*/
public class Inbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>,
SingleNode<RowT> {
private final ExchangeService exchange;
@@ -49,13 +48,13 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
private final long srcFragmentId;
- private final Map<String, Buffer> perNodeBuffers;
+ private final Map<String, RemoteSource<RowT>> perNodeBuffers;
- private volatile Collection<String> srcNodeNames;
+ private final Collection<String> srcNodeNames;
- private Comparator<RowT> comp;
+ private final @Nullable Comparator<RowT> comp;
- private List<Buffer> buffers;
+ private List<RemoteSource<RowT>> remoteSources;
private int requested;
@@ -74,12 +73,16 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
ExecutionContext<RowT> ctx,
ExchangeService exchange,
MailboxRegistry registry,
+ Collection<String> srcNodeNames,
+ @Nullable Comparator<RowT> comp,
long exchangeId,
long srcFragmentId
) {
super(ctx);
this.exchange = exchange;
this.registry = registry;
+ this.srcNodeNames = srcNodeNames;
+ this.comp = comp;
this.srcFragmentId = srcFragmentId;
this.exchangeId = exchangeId;
@@ -93,35 +96,9 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
return exchangeId;
}
- /**
- * Inits this Inbox.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
- *
- * @param ctx Execution context.
- * @param srcNodeNames Source nodes' consistent IDs.
- * @param comp Optional comparator for merge exchange.
- */
- public void init(
- ExecutionContext<RowT> ctx, Collection<String> srcNodeNames,
@Nullable Comparator<RowT> comp) {
- assert srcNodeNames != null : "Collection srcNodeNames not found for
exchangeId: " + exchangeId;
- assert context().fragmentId() == ctx.fragmentId() : "different
fragments unsupported: previous=" + context().fragmentId()
- + " current=" + ctx.fragmentId();
-
- // It's important to set proper context here because
- // the one, that is created on a first message
- // received doesn't have all context variables in place.
- context(ctx);
-
- this.comp = comp;
-
- // memory barier
- this.srcNodeNames = new HashSet<>(srcNodeNames);
- }
-
/** {@inheritDoc} */
@Override
public void request(int rowsCnt) throws Exception {
- assert srcNodeNames != null;
assert rowsCnt > 0 && requested == 0;
checkState();
@@ -168,13 +145,13 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
* @param rows Rows.
*/
public void onBatchReceived(String srcNodeName, int batchId, boolean last,
List<RowT> rows) throws Exception {
- Buffer buf = getOrCreateBuffer(srcNodeName);
+ RemoteSource<RowT> source = getOrCreateBuffer(srcNodeName);
- boolean waitingBefore = buf.check() == State.WAITING;
+ boolean waitingBefore = source.check() == State.WAITING;
- buf.offer(batchId, last, rows);
+ source.onBatchReceived(batchId, last, rows);
- if (requested > 0 && waitingBefore && buf.check() != State.WAITING) {
+ if (requested > 0 && waitingBefore && source.check() != State.WAITING)
{
push();
}
}
@@ -186,16 +163,16 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
}
private void push() throws Exception {
- if (buffers == null) {
+ if (remoteSources == null) {
for (String node : srcNodeNames) {
checkNode(node);
}
- buffers = srcNodeNames.stream()
- .map(this::getOrCreateBuffer)
- .collect(Collectors.toList());
+ remoteSources = new ArrayList<>(srcNodeNames.size());
- assert buffers.size() == perNodeBuffers.size();
+ for (String nodeName : srcNodeNames) {
+ remoteSources.add(getOrCreateBuffer(nodeName));
+ }
}
if (comp != null) {
@@ -206,9 +183,9 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
}
/** Checks that all corresponding buffers are in ready state. */
- private boolean checkAllBuffsReady(Iterator<Buffer> it) {
+ private boolean checkAllBuffsReady(Iterator<RemoteSource<RowT>> it) {
while (it.hasNext()) {
- Buffer buf = it.next();
+ RemoteSource<?> buf = it.next();
State state = buf.check();
@@ -227,15 +204,22 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
return true;
}
+ @SuppressWarnings("LabeledStatement")
private void pushOrdered() throws Exception {
- if (!checkAllBuffsReady(buffers.iterator())) {
+ if (!checkAllBuffsReady(remoteSources.iterator())) {
+ for (RemoteSource<RowT> remote : remoteSources) {
+ remote.requestNextBatchIfNeeded();
+ }
+
return;
}
- PriorityQueue<Pair<RowT, Buffer>> heap =
- new PriorityQueue<>(Math.max(buffers.size(), 1),
Map.Entry.comparingByKey(comp));
+ assert comp != null;
- for (Buffer buf : buffers) {
+ PriorityQueue<Pair<RowT, RemoteSource<RowT>>> heap =
+ new PriorityQueue<>(Math.max(remoteSources.size(), 1),
Map.Entry.comparingByKey(comp));
+
+ for (RemoteSource<RowT> buf : remoteSources) {
State state = buf.check();
if (state == State.READY) {
@@ -247,25 +231,28 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
inLoop = true;
try {
+ loop:
while (requested > 0 && !heap.isEmpty()) {
checkState();
- Buffer buf = heap.poll().right;
+ RemoteSource<RowT> source = heap.poll().right;
requested--;
- downstream().push(buf.remove());
+ downstream().push(source.remove());
- State state = buf.check();
+ State state = source.check();
switch (state) {
case END:
- buffers.remove(buf);
+ remoteSources.remove(source);
break;
case READY:
- heap.offer(Pair.of(buf.peek(), buf));
+ heap.offer(Pair.of(source.peek(), source));
break;
case WAITING:
- return;
+ // at this point we've drained all received batches
from particular source,
+ // thus we need to wait next batch in order to be able
to preserve the ordering
+ break loop;
default:
throw unexpected(state);
}
@@ -274,9 +261,11 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
inLoop = false;
}
- if (requested > 0 && heap.isEmpty()) {
- assert buffers.isEmpty();
+ for (RemoteSource<?> remote : remoteSources) {
+ remote.requestNextBatchIfNeeded();
+ }
+ if (requested > 0 && remoteSources.isEmpty() && heap.isEmpty()) {
requested = 0;
downstream().end();
}
@@ -288,33 +277,36 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
inLoop = true;
try {
- while (requested > 0 && !buffers.isEmpty()) {
+ while (requested > 0 && !remoteSources.isEmpty()) {
checkState();
- Buffer buf = buffers.get(idx);
+ RemoteSource<RowT> source = remoteSources.get(idx);
- switch (buf.check()) {
+ switch (source.check()) {
case END:
- buffers.remove(idx--);
+ remoteSources.remove(idx);
break;
case READY:
noProgress = 0;
requested--;
- downstream().push(buf.remove());
+ downstream().push(source.remove());
break;
case WAITING:
- if (++noProgress >= buffers.size()) {
- return;
- }
+ noProgress++;
+ idx++;
break;
default:
break;
}
- if (++idx == buffers.size()) {
+ if (noProgress >= remoteSources.size()) {
+ break;
+ }
+
+ if (idx == remoteSources.size()) {
idx = 0;
}
}
@@ -322,22 +314,22 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
inLoop = false;
}
- if (requested > 0 && buffers.isEmpty()) {
+ for (RemoteSource<?> source : remoteSources) {
+ source.requestNextBatchIfNeeded();
+ }
+
+ if (requested > 0 && remoteSources.isEmpty()) {
requested = 0;
downstream().end();
}
}
- private void acknowledge(String nodeName, int batchId) throws
IgniteInternalCheckedException {
- exchange.acknowledge(nodeName, queryId(), srcFragmentId, exchangeId,
batchId);
- }
-
- private Buffer getOrCreateBuffer(String nodeName) {
- return perNodeBuffers.computeIfAbsent(nodeName, this::createBuffer);
+ private void requestBatches(String nodeName, int cnt) throws
IgniteInternalCheckedException {
+ exchange.request(nodeName, queryId(), srcFragmentId, exchangeId, cnt);
}
- private Buffer createBuffer(String nodeName) {
- return new Buffer(nodeName);
+ private RemoteSource<RowT> getOrCreateBuffer(String nodeName) {
+ return perNodeBuffers.computeIfAbsent(nodeName, name -> new
RemoteSource<>(cnt -> requestBatches(name, cnt)));
}
/**
@@ -404,118 +396,182 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
/** {@inheritDoc} */
@Override
- public int compareTo(@NotNull Inbox.Batch<RowT> o) {
+ public int compareTo(Inbox.Batch<RowT> o) {
return Integer.compare(batchId, o.batchId);
}
}
- private enum State {
- END,
-
- READY,
-
- WAITING
- }
+ /**
+ * An object to keep track of batches and their order from particular
remote source.
+ *
+ * @param <RowT> A type if the rows received in batches.
+ * @see State
+ */
+ static final class RemoteSource<RowT> {
+ @FunctionalInterface
+ private interface BatchRequester {
+ void request(int amountOfBatches) throws
IgniteInternalCheckedException;
+ }
- private static final Batch<?> WAITING = new Batch<>(0, false, null);
+ /**
+ * A enumeration of all possible states of the {@link RemoteSource
remote source}. Below is a state diagram showing possible
+ * transitions from one state to another.
+ *
+ * <p>Node: "out of order", "next received", "and batch drained" are
ephemeral states, thus not presented in enumeration.
+ * <pre>
+ * +---+
+ * | * |
+ * +---+
+ * |
+ * v
+ * +---------+
+ * | WAITING |<-----\
+ * +---------+ \
+ * / ^ \
+ * batch received | \
+ * / | \
+ * v | \
+ * /-------------\ | |
+ * | out of order? | | |
+ * \-------------/ | |
+ * / \ | no
+ * no yes / |
+ * | \__/ |
+ * v |
+ * +-------+ /--------------\
+ * | READY |<-----yes-------| next received? |
+ * +-------+ \--------------/
+ * \___ ^
+ * \ |
+ * batch drained /
+ * \____ /
+ * v no
+ * /-----------\ /
+ * | last batch? |-----/
+ * \-----------/
+ * |
+ * yes
+ * |
+ * v
+ * +-----+
+ * | END |
+ * +-----+
+ * |
+ * v
+ * +---+
+ * | * |
+ * +---+
+ * </pre>
+ */
+ enum State {
+ /** Last batch was received and has already drained. No more data
expected from this source. */
+ END,
+
+ /** Batch with expected id is received and ready to be drained. */
+ READY,
+
+ /** Batch with expected id is not received yet. */
+ WAITING
+ }
- private static final Batch<?> END = new Batch<>(0, false, null);
+ private final PriorityQueue<Batch<RowT>> batches = new
PriorityQueue<>(IO_BATCH_CNT);
- private final class Buffer {
- private final String nodeName;
+ private final BatchRequester batchRequester;
+ private State state = State.WAITING;
private int lastEnqueued = -1;
+ private int lastRequested = -1;
+ private @Nullable Batch<RowT> curr = null;
- private final PriorityQueue<Batch<RowT>> batches = new
PriorityQueue<>(IO_BATCH_CNT);
-
- private Batch<RowT> curr = waitingMark();
-
- private Buffer(String nodeName) {
- this.nodeName = nodeName;
+ private RemoteSource(BatchRequester batchRequester) {
+ this.batchRequester = batchRequester;
}
- private void offer(int id, boolean last, List<RowT> rows) {
+ /** A handler for batches received from remote source. */
+ void onBatchReceived(int id, boolean last, List<RowT> rows) {
batches.offer(new Batch<>(id, last, rows));
- }
- private Batch<RowT> pollBatch() {
- if (batches.isEmpty() || batches.peek().batchId != lastEnqueued +
1) {
- return waitingMark();
+ if (state == State.WAITING && id == lastEnqueued + 1) {
+ advanceBatch();
}
-
- Batch<RowT> batch = batches.poll();
-
- assert batch != null && batch.batchId == lastEnqueued + 1;
-
- lastEnqueued = batch.batchId;
-
- return batch;
}
- private State check() {
- if (finished()) {
- return State.END;
- }
+ /**
+ * Requests another several batches from remote source if a count of
in-flight batches
+ * is less or equal than half of {@link #IO_BATCH_CNT}.
+ */
+ void requestNextBatchIfNeeded() throws IgniteInternalCheckedException {
+ int inFlightCount = lastRequested - lastEnqueued;
- if (waiting()) {
- return State.WAITING;
- }
+ // IO_BATCH_CNT should never be less than 1, but we don't have
validation
+ if (IO_BATCH_CNT <= 1 && inFlightCount == 0) {
+ batchRequester.request(1);
+ } else if (IO_BATCH_CNT / 2 >= inFlightCount) {
+ int countOfBatches = IO_BATCH_CNT - inFlightCount;
- if (isEnd()) {
- curr = finishedMark();
+ lastRequested += countOfBatches;
- return State.END;
+ batchRequester.request(countOfBatches);
}
+ }
- return State.READY;
+ /** Returns the state of the source. */
+ State check() {
+ return state;
}
- private RowT peek() {
+ /**
+ * Returns the first element of a buffer without removing.
+ *
+ * @return The first element of a buffer.
+ */
+ RowT peek() {
+ assert state == State.READY;
assert curr != null;
- assert curr != WAITING;
- assert curr != END;
- assert !isEnd();
return curr.rows.get(curr.idx);
}
- private RowT remove() throws IgniteInternalCheckedException {
+ /**
+ * Removes the first element from a buffer.
+ *
+ * @return The removed element.
+ */
+ RowT remove() {
+ assert state == State.READY;
assert curr != null;
- assert curr != WAITING;
- assert curr != END;
- assert !isEnd();
RowT row = curr.rows.set(curr.idx++, null);
if (curr.idx == curr.rows.size()) {
- acknowledge(nodeName, curr.batchId);
-
- if (!isEnd()) {
- curr = pollBatch();
+ if (curr.last) {
+ state = State.END;
+ } else {
+ advanceBatch();
}
}
return row;
}
- private boolean finished() {
- return curr == END;
+ private boolean hasNextBatch() {
+ return !batches.isEmpty() && batches.peek().batchId ==
lastEnqueued + 1;
}
- private boolean waiting() {
- return curr == WAITING && (curr = pollBatch()) == WAITING;
- }
+ private void advanceBatch() {
+ if (!hasNextBatch()) {
+ state = State.WAITING;
- private boolean isEnd() {
- return curr.last && curr.idx == curr.rows.size();
- }
+ return;
+ }
- private Batch<RowT> finishedMark() {
- return (Batch<RowT>) END;
- }
+ curr = batches.poll();
+
+ assert curr != null;
+
+ state = curr.rows.isEmpty() ? State.END : State.READY;
- private Batch<RowT> waitingMark() {
- return (Batch<RowT>) WAITING;
+ lastEnqueued = curr.batchId;
}
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
index d18a2ef5c0..69b608995d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
@@ -17,17 +17,14 @@
package org.apache.ignite.internal.sql.engine.exec.rel;
-import static
org.apache.ignite.internal.sql.engine.util.Commons.IN_BUFFER_SIZE;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
@@ -36,39 +33,34 @@ import
org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
import org.apache.ignite.internal.sql.engine.trait.Destination;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
/**
- * A part of exchange.
+ * A part of exchange which sends batches to a remote downstream.
*/
public class Outbox<RowT> extends AbstractNode<RowT> implements Mailbox<RowT>,
SingleNode<RowT>, Downstream<RowT> {
private static final IgniteLogger LOG = Loggers.forClass(Outbox.class);
- private final ExchangeService exchange;
-
- private final MailboxRegistry registry;
-
private final long exchangeId;
-
private final long targetFragmentId;
-
+ private final ExchangeService exchange;
+ private final MailboxRegistry registry;
private final Destination<RowT> dest;
private final Deque<RowT> inBuf = new ArrayDeque<>(inBufSize);
-
- private final Map<String, Buffer> nodeBuffers = new HashMap<>();
+ private final Map<String, RemoteDownstream<RowT>> nodeBuffers = new
HashMap<>();
private int waiting;
/**
* Constructor.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*
- * @param ctx Execution context.
- * @param exchange Exchange service.
- * @param registry Mailbox registry.
- * @param exchangeId Exchange ID.
- * @param targetFragmentId Target fragment ID.
- * @param dest Destination.
+ * @param ctx An execution context.
+ * @param exchange A service that provide a way for Inbox and Outbox to
communicate with each other.
+ * @param registry A registry of all created inboxes and outboxes.
+ * @param exchangeId An identifier of the exchange this outbox is part of.
+ * @param targetFragmentId An identifier of the fragment to send batches
to.
+ * @param dest A function which determines which row to send on which
remote.
*/
public Outbox(
ExecutionContext<RowT> ctx,
@@ -84,6 +76,14 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
this.targetFragmentId = targetFragmentId;
this.exchangeId = exchangeId;
this.dest = dest;
+
+ initBuffers();
+ }
+
+ private void initBuffers() {
+ for (String nodeName : dest.targets()) {
+ nodeBuffers.put(nodeName, new RemoteDownstream<>(nodeName,
this::sendBatch));
+ }
}
/** {@inheritDoc} */
@@ -93,28 +93,35 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
}
/**
- * Callback method.
+ * A handler with saves the demand from remote downstream and starts the
execution.
*
- * @param nodeName Target consistent ID.
- * @param batchId Batch ID.
+ * @param nodeName An identifier of the demander.
+ * @param amountOfBatches A count of demanded batches.
*/
- public void onAcknowledge(String nodeName, int batchId) throws Exception {
- assert nodeBuffers.containsKey(nodeName);
-
+ public void onRequest(String nodeName, int amountOfBatches) throws
Exception {
checkState();
- nodeBuffers.get(nodeName).acknowledge(batchId);
+ RemoteDownstream<?> downstream = getOrCreateBuffer(nodeName);
+
+ downstream.onBatchRequested(amountOfBatches);
+
+ if (waiting != -1 || !inBuf.isEmpty()) {
+ flush();
+ }
}
/**
- * Init.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Starts the execution of the fragment and keeps the result in the
intermediate buffer.
+ *
+ * <p>Note: this method must be called by the same thread that will
execute the whole fragment.
*/
- public void init() {
+ public void prefetch() {
try {
checkState();
- flush();
+ if (waiting == 0) {
+ source().request(waiting = inBufSize);
+ }
} catch (Throwable t) {
onError(t);
}
@@ -186,7 +193,11 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
/** {@inheritDoc} */
@Override
protected void rewindInternal() {
- throw new UnsupportedOperationException();
+ inBuf.clear();
+ nodeBuffers.clear();
+ waiting = 0;
+
+ initBuffers();
}
/** {@inheritDoc} */
@@ -215,31 +226,32 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
}
}
- private Buffer getOrCreateBuffer(String nodeName) {
- return nodeBuffers.computeIfAbsent(nodeName, this::createBuffer);
- }
-
- private Buffer createBuffer(String nodeName) {
- return new Buffer(nodeName);
+ private RemoteDownstream<RowT> getOrCreateBuffer(String nodeName) {
+ return nodeBuffers.computeIfAbsent(nodeName, name -> new
RemoteDownstream<>(name, this::sendBatch));
}
private void flush() throws Exception {
while (!inBuf.isEmpty()) {
checkState();
- Collection<Buffer> buffers = dest.targets(inBuf.peek()).stream()
- .map(this::getOrCreateBuffer)
- .collect(Collectors.toList());
+ List<String> targets = dest.targets(inBuf.peek());
+ List<RemoteDownstream<RowT>> buffers = new
ArrayList<>(targets.size());
- assert !nullOrEmpty(buffers);
+ for (String target : targets) {
+ RemoteDownstream<RowT> buffer = getOrCreateBuffer(target);
+
+ if (!buffer.ready()) {
+ return;
+ }
- if (!buffers.stream().allMatch(Buffer::ready)) {
- return;
+ buffers.add(buffer);
}
+ assert !nullOrEmpty(buffers);
+
RowT row = inBuf.remove();
- for (Buffer dest : buffers) {
+ for (RemoteDownstream<RowT> dest : buffers) {
dest.add(row);
}
}
@@ -247,10 +259,10 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
assert inBuf.isEmpty();
if (waiting == 0) {
- source().request(waiting = IN_BUFFER_SIZE);
+ source().request(waiting = inBufSize);
} else if (waiting == -1) {
- for (String node : dest.targets()) {
- getOrCreateBuffer(node).end();
+ for (RemoteDownstream<RowT> buffer : nodeBuffers.values()) {
+ buffer.end();
}
}
}
@@ -265,101 +277,164 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
}
}
- private final class Buffer {
- private final String nodeName;
+ private static final class RemoteDownstream<RowT> {
+ @FunctionalInterface
+ private interface BatchSender<RowT> {
+ void send(String targetNodeName, int batchId, boolean last,
List<RowT> rows) throws IgniteInternalCheckedException;
+ }
- private int hwm = -1;
+ /**
+ * A enumeration of all possible states of the {@link RemoteDownstream
remote downstream}. Below is a state diagram showing possible
+ * transitions from one state to another.
+ *
+ * <p>Node: "batch is full" is ephemeral state, thus not presented in
enumeration.
+ * <pre>
+ * +---+
+ * | * |
+ * +---+
+ * |
+ * v
+ * +---------+
+ * /--------| FILLING |<---\
+ * / +---------+ \
+ * | / ^ \
+ * | row added \ |
+ * | \ no |
+ * | v / |
+ * | /---------------\ |
+ * | | batch is full? | |
+ * | \---------------/ |
+ * | | |
+ * | yes /
+ * | | batch sent
+ * downstream ended v /
+ * | +-------+ /
+ * | | FULL |---/
+ * | +-------+
+ * | |
+ * \ downstream ended
+ * \ |
+ * \ v
+ * \ /-----------\
+ * \-->| LAST BATCH |
+ * \-----------/
+ * |
+ * batch sent
+ * |
+ * v
+ * +-----+
+ * | END |
+ * +-----+
+ * |
+ * v
+ * +---+
+ * | * |
+ * +---+
+ * </pre>
+ */
+ enum State {
+ /** Batch is ready to accept at leas one row. */
+ FILLING,
+
+ /** Batch is full, thus is ready to be sent. */
+ FULL,
- private int lwm = -1;
+ /** No more rows are expected to be added to downstream. The next
batch will be sent, probably, partially filled. */
+ LAST_BATCH,
- private List<RowT> curr;
+ /** Downstream is closed. All resources were released. */
+ END
+ }
+
+ private final String nodeName;
+ private final BatchSender<RowT> sender;
- private Buffer(String nodeName) {
+ private State state = State.FILLING;
+ private int lastSentBatchId = -1;
+
+ private @Nullable List<RowT> curr;
+ private int pendingCount;
+
+ private RemoteDownstream(String nodeName, BatchSender<RowT> sender) {
this.nodeName = nodeName;
+ this.sender = sender;
curr = new ArrayList<>(IO_BATCH_SIZE);
}
- /**
- * Checks whether there is a place for a new row.
- *
- * @return {@code True} is it possible to add a row to a batch.
- */
- private boolean ready() {
- if (hwm == Integer.MAX_VALUE) {
- return false;
+ /** A handler of a requests from downstream. */
+ void onBatchRequested(int amountOfBatches) throws Exception {
+ assert amountOfBatches > 0 : amountOfBatches;
+
+ this.pendingCount = amountOfBatches;
+
+ // if there is a batch which is ready to be sent, then just sent it
+ if (state == State.FULL || state == State.LAST_BATCH) {
+ sendBatch();
}
+ }
- return curr.size() < IO_BATCH_SIZE || hwm - lwm < IO_BATCH_CNT;
+ /** Returns {@code true} if this downstream is ready to accepts at
least one more row. */
+ boolean ready() {
+ return state == State.FILLING;
}
/**
* Adds a row to current batch.
*
- * @param row Row.
+ * @param row Row to add.
*/
- public void add(RowT row) throws IgniteInternalCheckedException {
- assert ready();
+ void add(RowT row) throws Exception {
+ assert ready() : state;
+ assert curr != null;
+
+ curr.add(row);
if (curr.size() == IO_BATCH_SIZE) {
- sendBatch(nodeName, ++hwm, false, curr);
+ state = State.FULL;
- curr = new ArrayList<>(IO_BATCH_SIZE);
+ if (pendingCount > 0) {
+ sendBatch();
+ }
}
-
- curr.add(row);
}
- /**
- * Signals data is over.
- */
- public void end() throws IgniteInternalCheckedException {
- if (hwm == Integer.MAX_VALUE) {
- return;
- }
+ /** Sends current batch to remote downstream. */
+ void sendBatch() throws Exception {
+ assert pendingCount > 0;
+ assert state == State.FULL || state == State.LAST_BATCH : state;
+ assert curr != null;
- int batchId = hwm + 1;
- hwm = Integer.MAX_VALUE;
+ boolean lastBatch = state == State.LAST_BATCH;
- List<RowT> tmp = curr;
- curr = null;
+ sender.send(nodeName, ++lastSentBatchId, lastBatch, curr);
- sendBatch(nodeName, batchId, true, tmp);
- }
+ pendingCount--;
- /**
- * Callback method.
- *
- * @param id batch ID.
- */
- private void acknowledge(int id) throws Exception {
- if (lwm > id) {
- return;
+ if (lastBatch) {
+ state = State.END;
+ curr = null;
+ } else {
+ state = State.FILLING;
+ curr = new ArrayList<>(IO_BATCH_SIZE);
}
+ }
- boolean readyBefore = ready();
+ /** Completes this downstream by sending all collected so far rows. */
+ void end() throws Exception {
+ assert state == State.FILLING || state == State.FULL : state;
- lwm = id;
+ state = State.LAST_BATCH;
- if (!readyBefore && ready()) {
- flush();
+ if (pendingCount > 0) {
+ sendBatch();
}
}
- public void close() {
- final int currBatchId = hwm;
-
- if (hwm == Integer.MAX_VALUE) {
- return;
- }
-
- hwm = Integer.MAX_VALUE;
-
+ /** Closes this downstream and clears all acquired resources. */
+ void close() {
curr = null;
-
- if (currBatchId >= 0) {
- sendInboxClose(nodeName);
- }
+ state = State.END;
}
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchAcknowledgeMessage.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
similarity index 70%
rename from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchAcknowledgeMessage.java
rename to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
index 6076d51a47..7e1d7460b4 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchAcknowledgeMessage.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/QueryBatchRequestMessage.java
@@ -20,18 +20,13 @@ package org.apache.ignite.internal.sql.engine.message;
import org.apache.ignite.network.annotations.Transferable;
/**
- * QueryBatchAcknowledgeMessage interface.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * A message to notify remote fragment (aka remote source) that more batches
required to fulfil the result.
*/
-@Transferable(value = SqlQueryMessageGroup.QUERY_BATCH_ACK)
-public interface QueryBatchAcknowledgeMessage extends
ExecutionContextAwareMessage {
- /**
- * Get exchange ID.
- */
+@Transferable(SqlQueryMessageGroup.QUERY_BATCH_REQUEST)
+public interface QueryBatchRequestMessage extends ExecutionContextAwareMessage
{
+ /** Returns an identifier of the exchange to request batches from. */
long exchangeId();
- /**
- * Get batch ID.
- */
- int batchId();
+ /** Returns amount of batches to request. */
+ int amountOfBatches();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
index 69bfa8c43f..d4cc366191 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/SqlQueryMessageGroup.java
@@ -36,7 +36,8 @@ public final class SqlQueryMessageGroup {
public static final short QUERY_BATCH_MESSAGE = 3;
- public static final short QUERY_BATCH_ACK = 4;
+ /** See {@link QueryBatchRequestMessage} for details. */
+ public static final short QUERY_BATCH_REQUEST = 4;
public static final short INBOX_CLOSE_MESSAGE = 5;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
index 1d388ebe67..a8bd3f6e22 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/Commons.java
@@ -128,6 +128,9 @@ public final class Commons {
public static final int IN_BUFFER_SIZE = 512;
+ public static final int IO_BATCH_SIZE = 256;
+ public static final int IO_BATCH_COUNT = 4;
+
/**
* The number of elements to be prefetched from each partition when
scanning the sorted index.
* The higher the value, the fewer calls to the upstream will be, but at
the same time, the bigger
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
index 7a5d239c77..ecd0889b04 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/benchmarks/SqlBenchmark.java
@@ -19,9 +19,7 @@ package org.apache.ignite.internal.sql.engine.benchmarks;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
-import java.util.Iterator;
import java.util.List;
-import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.schema.NativeTypes;
@@ -54,11 +52,11 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
@Measurement(iterations = 20, time = 1, timeUnit = TimeUnit.SECONDS)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
-@Fork(1)
+@Fork(3)
@State(Scope.Benchmark)
public class SqlBenchmark {
- private final DataProvider<Object[]> dataProvider = new
SameRowDataProvider(
- new Object[] {42, UUID.randomUUID().toString()}, 333
+ private final DataProvider<Object[]> dataProvider = DataProvider.fromRow(
+ new Object[]{42, UUID.randomUUID().toString()}, 3_333
);
// @formatter:off
@@ -95,7 +93,7 @@ public class SqlBenchmark {
/** Very simple test to measure performance of minimal possible
distributed query. */
@Benchmark
public void selectAllSimple(Blackhole bh) {
- for (var row :
await(gatewayNode.executePlan(plan).requestNextAsync(1_000)).items()) {
+ for (var row :
await(gatewayNode.executePlan(plan).requestNextAsync(10_000)).items()) {
bh.consume(row);
}
}
@@ -114,37 +112,4 @@ public class SqlBenchmark {
new Runner(build).run();
}
-
- private static class SameRowDataProvider implements DataProvider<Object[]>
{
- private final int times;
- private final Object[] row;
-
- SameRowDataProvider(Object[] row, int times) {
- this.times = times;
- this.row = row;
- }
-
- @Override
- public Iterator<Object[]> iterator() {
- return new Iterator<>() {
- private int counter;
-
- @Override
- public boolean hasNext() {
- return counter < times;
- }
-
- @Override
- public Object[] next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
-
- counter++;
-
- return row;
- }
- };
- }
- }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 49771678a9..23b55a970f 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -347,14 +347,14 @@ public class ExecutionServiceImplTest {
var messageService = node.messageService();
var mailboxRegistry = new MailboxRegistryImpl();
- var clusterNode = new ClusterNode(UUID.randomUUID().toString(),
nodeName, NetworkAddress.from("127.0.0.1:1111"));
- var exchangeService = new ExchangeServiceImpl(clusterNode,
taskExecutor, mailboxRegistry, messageService);
+ var exchangeService = new ExchangeServiceImpl(mailboxRegistry,
messageService);
var schemaManagerMock = mock(SqlSchemaManager.class);
when(schemaManagerMock.tableById(any(), anyInt())).thenReturn(table);
+ var clusterNode = new ClusterNode(UUID.randomUUID().toString(),
nodeName, NetworkAddress.from("127.0.0.1:1111"));
var executionService = new ExecutionServiceImpl<>(
clusterNode,
messageService,
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index ae73d2c3f9..7e81eb4ede 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -64,11 +64,10 @@ public class AbstractExecutionTest extends
IgniteAbstractTest {
public static final Object[][] EMPTY = new Object[0][];
private Throwable lastE;
-
- private QueryTaskExecutorImpl taskExecutor;
-
private List<UUID> nodes;
+ protected QueryTaskExecutorImpl taskExecutor;
+
@BeforeEach
public void beforeTest() {
taskExecutor = new QueryTaskExecutorImpl("no_node");
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
new file mode 100644
index 0000000000..12af90b9a7
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ExchangeExecutionTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
+import org.apache.ignite.internal.sql.engine.exec.ExchangeService;
+import org.apache.ignite.internal.sql.engine.exec.ExchangeServiceImpl;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.MailboxRegistry;
+import org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
+import org.apache.ignite.internal.sql.engine.framework.ClusterServiceFactory;
+import org.apache.ignite.internal.sql.engine.framework.DataProvider;
+import org.apache.ignite.internal.sql.engine.framework.TestBuilders;
+import org.apache.ignite.internal.sql.engine.message.MessageService;
+import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
+import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
+import org.apache.ignite.internal.sql.engine.trait.AllNodes;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests to verify Outbox to Inbox interoperation.
+ */
+public class ExchangeExecutionTest extends AbstractExecutionTest {
+ private static final String ROOT_NODE_NAME = "N1";
+ private static final String ANOTHER_NODE_NAME = "N2";
+ private static final List<String> NODE_NAMES = List.of(ROOT_NODE_NAME,
ANOTHER_NODE_NAME);
+ private static final ClusterNode ROOT_NODE =
+ new ClusterNode(ROOT_NODE_NAME, ROOT_NODE_NAME,
NetworkAddress.from("127.0.0.1:10001"));
+ private static final ClusterNode ANOTHER_NODE =
+ new ClusterNode(ANOTHER_NODE_NAME, ANOTHER_NODE_NAME,
NetworkAddress.from("127.0.0.1:10002"));
+ private static final int SOURCE_FRAGMENT_ID = 0;
+ private static final int TARGET_FRAGMENT_ID = 1;
+ private static final Comparator<Object[]> COMPARATOR =
Comparator.comparingInt(o -> (Integer) o[0]);
+
+ private final Map<String, MailboxRegistry> mailboxes = new HashMap<>();
+ private final Map<String, ExchangeService> exchangeServices = new
HashMap<>();
+ private final ClusterServiceFactory serviceFactory =
TestBuilders.clusterServiceFactory(List.of(ROOT_NODE_NAME, ANOTHER_NODE_NAME));
+
+ @ParameterizedTest(name = "rowCount={0}, prefetch={1}, ordered={2}")
+ @MethodSource("testArgs")
+ public void test(int rowCount, boolean prefetch, boolean ordered) {
+ UUID queryId = UUID.randomUUID();
+
+ List<Outbox<?>> sourceFragments = new ArrayList<>();
+
+ int idx = 0;
+ for (ClusterNode node : List.of(ROOT_NODE, ANOTHER_NODE)) {
+ Outbox<?> outbox = createSourceFragment(
+ queryId,
+ idx++,
+ node,
+ serviceFactory,
+ rowCount
+ );
+
+ sourceFragments.add(outbox);
+ }
+
+ if (prefetch) {
+ for (Outbox<?> outbox : sourceFragments) {
+ await(outbox.context().submit(outbox::prefetch,
outbox::onError));
+ }
+ }
+
+ AsyncRootNode<Object[], Object[]> root = createRootFragment(
+ queryId,
+ ROOT_NODE,
+ NODE_NAMES,
+ ordered,
+ serviceFactory
+ );
+
+ int expectedRowCount = NODE_NAMES.size() * rowCount;
+
+ BatchedResult<Object[]> res =
await(root.requestNextAsync(expectedRowCount));
+
+ assertEquals(expectedRowCount, res.items().size());
+
+ if (ordered) {
+ List<Object[]> expected = new ArrayList<>(res.items());
+ expected.sort(COMPARATOR);
+
+ assertEquals(expected, res.items());
+ }
+ }
+
+ private static Stream<Arguments> testArgs() {
+ List<Integer> sizes = List.of(
+ // half of the batch size
+ Math.min(Commons.IO_BATCH_SIZE / 2, 1),
+
+ // full batch
+ Commons.IO_BATCH_SIZE,
+
+ // full batch + one extra row
+ Commons.IO_BATCH_SIZE + 1,
+
+ // several batches
+ 2 * Commons.IO_BATCH_SIZE + 1,
+
+ // more than count of so called "in-flight" batches. In flight
batches
+ // are batches that have been sent but not yet acknowledged
+ 2 * Commons.IO_BATCH_SIZE * Commons.IO_BATCH_COUNT
+ );
+
+ List<Boolean> trueFalseList = List.of(true, false);
+
+ List<Arguments> args = new ArrayList<>(2 * sizes.size());
+ for (int size : sizes) {
+ for (boolean prefetch : trueFalseList) {
+ for (boolean ordered : trueFalseList) {
+ args.add(Arguments.of(size, prefetch, ordered));
+ }
+ }
+ }
+
+ return args.stream();
+ }
+
+ private AsyncRootNode<Object[], Object[]> createRootFragment(
+ UUID queryId,
+ ClusterNode localNode,
+ List<String> sourceNodeNames,
+ boolean ordered,
+ ClusterServiceFactory serviceFactory
+ ) {
+ ExecutionContext<Object[]> targetCtx = TestBuilders.executionContext()
+ .queryId(queryId)
+ .executor(taskExecutor)
+ .fragment(new FragmentDescription(TARGET_FRAGMENT_ID, null,
null, Long2ObjectMaps.emptyMap()))
+ .localNode(localNode)
+ .build();
+
+ Comparator<Object[]> comparator = ordered ? COMPARATOR : null;
+
+ MailboxRegistry mailboxRegistry =
mailboxes.computeIfAbsent(localNode.name(), name -> new MailboxRegistryImpl());
+ ExchangeService exchangeService =
exchangeServices.computeIfAbsent(localNode.name(), name ->
+
createExchangeService(serviceFactory.forNode(localNode.name()),
mailboxRegistry));
+
+ Inbox<Object[]> inbox = new Inbox<>(
+ targetCtx, exchangeService, mailboxRegistry, sourceNodeNames,
comparator, SOURCE_FRAGMENT_ID, SOURCE_FRAGMENT_ID
+ );
+
+ mailboxRegistry.register(inbox);
+
+ AsyncRootNode<Object[], Object[]> root = new AsyncRootNode<>(
+ inbox, Function.identity()
+ );
+
+ inbox.onRegister(root);
+
+ return root;
+ }
+
+ private Outbox<?> createSourceFragment(
+ UUID queryId,
+ int rowValue,
+ ClusterNode localNode,
+ ClusterServiceFactory serviceFactory,
+ int rowCount
+ ) {
+ ExecutionContext<Object[]> sourceCtx = TestBuilders.executionContext()
+ .queryId(queryId)
+ .executor(taskExecutor)
+ .fragment(new FragmentDescription(SOURCE_FRAGMENT_ID, null,
null, Long2ObjectMaps.emptyMap()))
+ .localNode(localNode)
+ .build();
+
+ MailboxRegistry mailboxRegistry =
mailboxes.computeIfAbsent(localNode.name(), name -> new MailboxRegistryImpl());
+ ExchangeService exchangeService =
exchangeServices.computeIfAbsent(localNode.name(), name ->
+
createExchangeService(serviceFactory.forNode(localNode.name()),
mailboxRegistry));
+
+ Outbox<Object[]> outbox = new Outbox<>(
+ sourceCtx, exchangeService, mailboxRegistry,
SOURCE_FRAGMENT_ID,
+ TARGET_FRAGMENT_ID, new AllNodes<>(List.of(ROOT_NODE_NAME))
+ );
+ mailboxRegistry.register(outbox);
+
+ ScanNode<Object[]> source = new ScanNode<>(sourceCtx,
DataProvider.fromRow(new Object[]{rowValue, rowValue}, rowCount));
+
+ outbox.register(source);
+
+ return outbox;
+ }
+
+ private ExchangeService createExchangeService(ClusterService
clusterService, MailboxRegistry mailboxRegistry) {
+ MessageService messageService = new MessageServiceImpl(
+ clusterService.topologyService(),
+ clusterService.messagingService(),
+ taskExecutor,
+ new IgniteSpinBusyLock()
+ );
+
+ ExchangeService exchangeService = new ExchangeServiceImpl(
+ mailboxRegistry,
+ messageService
+ );
+
+ messageService.start();
+ exchangeService.start();
+
+ return exchangeService;
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
similarity index 95%
rename from
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java
rename to
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
index a3cf08296d..98198e2e65 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestClusterService.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/ClusterServiceFactory.java
@@ -43,7 +43,7 @@ import org.jetbrains.annotations.Nullable;
* Auxiliary object to create the associated {@link MessagingService}
* and {@link TopologyService} for each node in the cluster.
*/
-class TestClusterService {
+public class ClusterServiceFactory {
private final List<String> allNodes;
private final Map<String, LocalMessagingService> messagingServicesByNode =
new ConcurrentHashMap<>();
@@ -54,11 +54,17 @@ class TestClusterService {
*
* @param allNodes A collection of nodes to create cluster service from.
*/
- TestClusterService(List<String> allNodes) {
+ ClusterServiceFactory(List<String> allNodes) {
this.allNodes = allNodes;
}
- ClusterService spawnForNode(String nodeName) {
+ /**
+ * Creates an instance of {@link ClusterService} for a given node.
+ *
+ * @param nodeName A name of the node to create cluster service for.
+ * @return An instance of cluster service.
+ */
+ public ClusterService forNode(String nodeName) {
return new ClusterService() {
/** {@inheritDoc} */
@Override
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java
index a143f4c6ae..94d7a61b2a 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/DataProvider.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.sql.engine.framework;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
/**
* Producer of the rows to use with {@link TestTable} in execution-related
scenarios.
@@ -40,4 +42,44 @@ public interface DataProvider<T> extends Iterable<T> {
static <T> DataProvider<T> fromCollection(Collection<T> collection) {
return collection::iterator;
}
+
+ /**
+ * Creates data provider from repeating the given row specified amount of
times.
+ *
+ * @param row A row to repeat.
+ * @param repeatTimes An amount of times to repeat the row.
+ * @param <T> A type of the produced elements.
+ * @return A data provider instance.
+ */
+ static <T> DataProvider<T> fromRow(T row, int repeatTimes) {
+ return new DataProvider<>() {
+ private final int times = repeatTimes;
+
+ /** {@inheritDoc} */
+ @Override
+ public Iterator<T> iterator() {
+ return new Iterator<>() {
+ private int counter;
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean hasNext() {
+ return counter < times;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ counter++;
+
+ return row;
+ }
+ };
+ }
+ };
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 6a6ae1bb52..7885aa5b5e 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -18,24 +18,35 @@
package org.apache.ignite.internal.sql.engine.framework;
import static org.apache.ignite.lang.IgniteStringFormatter.format;
+import static org.mockito.Mockito.mock;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.calcite.schema.Table;
import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
+import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptor;
import org.apache.ignite.internal.sql.engine.schema.ColumnDescriptorImpl;
import org.apache.ignite.internal.sql.engine.schema.DefaultValueStrategy;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.schema.TableDescriptorImpl;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
+import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.network.ClusterNode;
/**
* A collection of builders to create test objects.
@@ -51,6 +62,16 @@ public class TestBuilders {
return new TableBuilderImpl();
}
+ /** Returns a builder of the execution context. */
+ public static ExecutionContextBuilder executionContext() {
+ return new ExecutionContextBuilderImpl();
+ }
+
+ /** Factory method to create a cluster service factory for cluster
consisting of provided nodes. */
+ public static ClusterServiceFactory clusterServiceFactory(List<String>
nodes) {
+ return new ClusterServiceFactory(nodes);
+ }
+
/**
* A builder to create a test cluster object.
*
@@ -111,6 +132,88 @@ public class TestBuilders {
ClusterTableBuilder defaultDataProvider(DataProvider<?> dataProvider);
}
+ /**
+ * A builder to create an execution context.
+ *
+ * @see ExecutionContext
+ */
+ public interface ExecutionContextBuilder {
+ /** Sets the identifier of the query. */
+ ExecutionContextBuilder queryId(UUID queryId);
+
+ /** Sets the description of the fragment this context will be created
for. */
+ ExecutionContextBuilder fragment(FragmentDescription
fragmentDescription);
+
+ /** Sets the query task executor. */
+ ExecutionContextBuilder executor(QueryTaskExecutor executor);
+
+ /** Sets the node this fragment will be executed on. */
+ ExecutionContextBuilder localNode(ClusterNode node);
+
+ /**
+ * Builds the context object.
+ *
+ * @return Created context object.
+ */
+ ExecutionContext<Object[]> build();
+ }
+
+ private static class ExecutionContextBuilderImpl implements
ExecutionContextBuilder {
+ private FragmentDescription description = new FragmentDescription(0,
null, null, Long2ObjectMaps.emptyMap());
+
+ private UUID queryId = null;
+ private QueryTaskExecutor executor = null;
+ private ClusterNode node = null;
+
+ /** {@inheritDoc} */
+ @Override
+ public ExecutionContextBuilder queryId(UUID queryId) {
+ this.queryId = Objects.requireNonNull(queryId, "queryId");
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ExecutionContextBuilder fragment(FragmentDescription
fragmentDescription) {
+ this.description = Objects.requireNonNull(fragmentDescription,
"fragmentDescription");
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ExecutionContextBuilder executor(QueryTaskExecutor executor) {
+ this.executor = Objects.requireNonNull(executor, "executor");
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ExecutionContextBuilder localNode(ClusterNode node) {
+ this.node = Objects.requireNonNull(node, "node");
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public ExecutionContext<Object[]> build() {
+ return new ExecutionContext<>(
+ BaseQueryContext.builder().build(),
+ Objects.requireNonNull(executor, "executor"),
+ queryId,
+ Objects.requireNonNull(node, "node"),
+ node.name(),
+ description,
+ ArrayRowHandler.INSTANCE,
+ Map.of(),
+ mock(InternalTransaction.class)
+ );
+ }
+ }
+
private static class ClusterBuilderImpl implements ClusterBuilder {
private final List<ClusterTableBuilderImpl> tableBuilders = new
ArrayList<>();
private List<String> nodeNames;
@@ -135,7 +238,7 @@ public class TestBuilders {
/** {@inheritDoc} */
@Override
public TestCluster build() {
- var clusterService = new TestClusterService(nodeNames);
+ var clusterService = new ClusterServiceFactory(nodeNames);
for (ClusterTableBuilderImpl tableBuilder : tableBuilders) {
validateTableBuilder(tableBuilder);
@@ -149,7 +252,7 @@ public class TestBuilders {
var schemaManager = new PredefinedSchemaManager(new
IgniteSchema("PUBLIC", tableMap, null));
Map<String, TestNode> nodes = nodeNames.stream()
- .map(name -> new TestNode(name,
clusterService.spawnForNode(name), schemaManager))
+ .map(name -> new TestNode(name,
clusterService.forNode(name), schemaManager))
.collect(Collectors.toMap(TestNode::name,
Function.identity()));
return new TestCluster(nodes);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
index cba801716e..bf778cc357 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestNode.java
@@ -104,7 +104,7 @@ public class TestNode implements LifecycleAware {
topologyService, messagingService, taskExecutor, new
IgniteSpinBusyLock()
));
ExchangeService exchangeService = registerService(new
ExchangeServiceImpl(
- topologyService.localMember(), taskExecutor, mailboxRegistry,
messageService
+ mailboxRegistry, messageService
));
executionService = registerService(new ExecutionServiceImpl<>(