This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3062df341f IGNITE-18063 Use consistent IDs in assignments (#1354)
3062df341f is described below
commit 3062df341f2810d6bce80a618ac2e485b4e4337c
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Fri Nov 18 11:55:10 2022 +0300
IGNITE-18063 Use consistent IDs in assignments (#1354)
---
.../internal/sql/engine/exec/ExchangeService.java | 44 +++++-----
.../sql/engine/exec/ExchangeServiceImpl.java | 52 ++++++------
.../internal/sql/engine/exec/ExecutionContext.java | 30 +++----
.../sql/engine/exec/ExecutionServiceImpl.java | 61 +++++++-------
.../sql/engine/exec/LogicalRelImplementor.java | 12 +--
.../sql/engine/exec/MailboxRegistryImpl.java | 4 +-
.../sql/engine/exec/RemoteFragmentKey.java | 18 ++--
.../ignite/internal/sql/engine/exec/rel/Inbox.java | 80 +++++++++---------
.../internal/sql/engine/exec/rel/Outbox.java | 55 +++++++------
.../sql/engine/message/MessageListener.java | 6 +-
.../sql/engine/message/MessageService.java | 13 ++-
.../sql/engine/message/MessageServiceImpl.java | 40 ++++-----
.../sql/engine/metadata/ColocationGroup.java | 56 ++++++-------
.../sql/engine/metadata/FragmentDescription.java | 4 +-
.../sql/engine/metadata/FragmentMapping.java | 10 +--
.../engine/metadata/IgniteMdFragmentMapping.java | 2 +-
.../sql/engine/metadata/MappingServiceImpl.java | 2 +-
.../sql/engine/metadata/RemoteException.java | 18 ++--
.../sql/engine/prepare/AbstractMultiStepPlan.java | 2 +-
.../internal/sql/engine/prepare/Fragment.java | 8 +-
.../sql/engine/prepare/MappingQueryContext.java | 12 +--
.../sql/engine/trait/DistributionFunction.java | 12 +--
.../internal/sql/engine/StopCalciteModuleTest.java | 33 +++-----
.../sql/engine/exec/ExecutionServiceImplTest.java | 95 +++++++++++-----------
.../exec/rel/TableScanNodeExecutionTest.java | 7 +-
.../ignite/distributed/ItTablePersistenceTest.java | 3 -
.../distributed/ItTxDistributedTestSingleNode.java | 2 -
.../ignite/internal/table/ItColocationTest.java | 3 +-
.../internal/table/distributed/TableManager.java | 25 ++----
.../distributed/storage/InternalTableImpl.java | 9 +-
.../table/impl/DummyInternalTableImpl.java | 3 +-
31 files changed, 342 insertions(+), 379 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
index f0d9eaffee..3724784dae 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeService.java
@@ -29,64 +29,64 @@ public interface ExchangeService extends LifecycleAware {
/**
* Sends a batch of data to remote node.
*
- * @param nodeId Target node ID.
- * @param qryId Query ID.
+ * @param nodeName Target node consistent ID.
+ * @param qryId Query ID.
* @param fragmentId Target fragment ID.
* @param exchangeId Exchange ID.
- * @param batchId Batch ID.
- * @param last Last batch flag.
- * @param rows Data rows.
+ * @param batchId Batch ID.
+ * @param last Last batch flag.
+ * @param rows Data rows.
*/
- <RowT> void sendBatch(String nodeId, UUID qryId, long fragmentId, long
exchangeId, int batchId, boolean last,
+ <RowT> void sendBatch(String nodeName, UUID qryId, long fragmentId, long
exchangeId, int batchId, boolean last,
List<RowT> rows) throws IgniteInternalCheckedException;
/**
* Acknowledges a batch with given ID is processed.
*
- * @param nodeId Node ID to notify.
- * @param qryId Query ID.
+ * @param nodeName Node consistent ID to notify.
+ * @param qryId Query ID.
* @param fragmentId Target fragment ID.
* @param exchangeId Exchange ID.
- * @param batchId Batch ID.
+ * @param batchId Batch ID.
*/
- void acknowledge(String nodeId, UUID qryId, long fragmentId, long
exchangeId, int batchId) throws IgniteInternalCheckedException;
+ void acknowledge(String nodeName, UUID qryId, long fragmentId, long
exchangeId, int batchId) throws IgniteInternalCheckedException;
/**
* Sends cancel request.
*
- * @param nodeId Target node ID.
- * @param qryId Query ID.
+ * @param nodeName Target node consistent ID.
+ * @param qryId Query ID.
* @param fragmentId Target fragment ID.
* @param exchangeId Exchange ID.
*/
- void closeInbox(String nodeId, UUID qryId, long fragmentId, long
exchangeId) throws IgniteInternalCheckedException;
+ void closeInbox(String nodeName, UUID qryId, long fragmentId, long
exchangeId) throws IgniteInternalCheckedException;
/**
* Sends cancel request.
*
- * @param nodeId Target node ID.
- * @param qryId Query ID.
+ * @param nodeName Target node consistent ID.
+ * @param qryId Query ID.
*/
- void closeQuery(String nodeId, UUID qryId) throws
IgniteInternalCheckedException;
+ void closeQuery(String nodeName, UUID qryId) throws
IgniteInternalCheckedException;
/**
* Send error.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*
- * @param nodeId Target node ID.
- * @param qryId Query ID.
+ * @param nodeName Target node consistent ID.
+ * @param qryId Query ID.
* @param fragmentId Source fragment ID.
- * @param err Exception to send.
+ * @param err Exception to send.
* @throws IgniteInternalCheckedException On error marshaling or send
ErrorMessage.
*/
- void sendError(String nodeId, UUID qryId, long fragmentId, Throwable err)
throws IgniteInternalCheckedException;
+ void sendError(String nodeName, UUID qryId, long fragmentId, Throwable
err) throws IgniteInternalCheckedException;
/**
* Alive.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*
- * @param nodeId Node ID.
+ * @param nodeName Target node consistent ID.
* @return {@code true} if node is alive, {@code false} otherwise.
*/
- boolean alive(String nodeId);
+ boolean alive(String nodeName);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
index 73d94a4ef0..5f26aa2585 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.java
@@ -83,10 +83,10 @@ public class ExchangeServiceImpl implements ExchangeService
{
/** {@inheritDoc} */
@Override
- public <RowT> void sendBatch(String nodeId, UUID qryId, long fragmentId,
long exchangeId, int batchId,
+ public <RowT> void sendBatch(String nodeName, UUID qryId, long fragmentId,
long exchangeId, int batchId,
boolean last, List<RowT> rows) throws
IgniteInternalCheckedException {
msgSrvc.send(
- nodeId,
+ nodeName,
FACTORY.queryBatchMessage()
.queryId(qryId)
.fragmentId(fragmentId)
@@ -100,10 +100,10 @@ public class ExchangeServiceImpl implements
ExchangeService {
/** {@inheritDoc} */
@Override
- public void acknowledge(String nodeId, UUID qryId, long fragmentId, long
exchangeId, int batchId)
+ public void acknowledge(String nodeName, UUID qryId, long fragmentId, long
exchangeId, int batchId)
throws IgniteInternalCheckedException {
msgSrvc.send(
- nodeId,
+ nodeName,
FACTORY.queryBatchAcknowledgeMessage()
.queryId(qryId)
.fragmentId(fragmentId)
@@ -115,9 +115,9 @@ public class ExchangeServiceImpl implements ExchangeService
{
/** {@inheritDoc} */
@Override
- public void closeQuery(String nodeId, UUID qryId) throws
IgniteInternalCheckedException {
+ public void closeQuery(String nodeName, UUID qryId) throws
IgniteInternalCheckedException {
msgSrvc.send(
- nodeId,
+ nodeName,
FACTORY.queryCloseMessage()
.queryId(qryId)
.build()
@@ -126,9 +126,9 @@ public class ExchangeServiceImpl implements ExchangeService
{
/** {@inheritDoc} */
@Override
- public void closeInbox(String nodeId, UUID qryId, long fragmentId, long
exchangeId) throws IgniteInternalCheckedException {
+ public void closeInbox(String nodeName, UUID qryId, long fragmentId, long
exchangeId) throws IgniteInternalCheckedException {
msgSrvc.send(
- nodeId,
+ nodeName,
FACTORY.inboxCloseMessage()
.queryId(qryId)
.fragmentId(fragmentId)
@@ -139,9 +139,9 @@ public class ExchangeServiceImpl implements ExchangeService
{
/** {@inheritDoc} */
@Override
- public void sendError(String nodeId, UUID qryId, long fragmentId,
Throwable err) throws IgniteInternalCheckedException {
+ public void sendError(String nodeName, UUID qryId, long fragmentId,
Throwable err) throws IgniteInternalCheckedException {
msgSrvc.send(
- nodeId,
+ nodeName,
FACTORY.errorMessage()
.queryId(qryId)
.fragmentId(fragmentId)
@@ -152,11 +152,11 @@ public class ExchangeServiceImpl implements
ExchangeService {
/** {@inheritDoc} */
@Override
- public boolean alive(String nodeId) {
- return msgSrvc.alive(nodeId);
+ public boolean alive(String nodeName) {
+ return msgSrvc.alive(nodeName);
}
- protected void onMessage(String nodeId, InboxCloseMessage msg) {
+ private void onMessage(String nodeName, InboxCloseMessage msg) {
Collection<Inbox<?>> inboxes = mailboxRegistry.inboxes(msg.queryId(),
msg.fragmentId(), msg.exchangeId());
if (!nullOrEmpty(inboxes)) {
@@ -164,35 +164,35 @@ public class ExchangeServiceImpl implements
ExchangeService {
inbox.context().execute(inbox::close, inbox::onError);
}
} else if (LOG.isDebugEnabled()) {
- LOG.debug("Stale inbox cancel message received [nodeId={},
queryId={}, fragmentId={}, exchangeId={}]",
- nodeId, msg.queryId(), msg.fragmentId(), msg.exchangeId());
+ LOG.debug("Stale inbox cancel message received [nodeName={},
queryId={}, fragmentId={}, exchangeId={}]",
+ nodeName, msg.queryId(), msg.fragmentId(),
msg.exchangeId());
}
}
- protected void onMessage(String nodeId, QueryBatchAcknowledgeMessage msg) {
+ private void onMessage(String nodeName, QueryBatchAcknowledgeMessage msg) {
Outbox<?> outbox = mailboxRegistry.outbox(msg.queryId(),
msg.exchangeId());
if (outbox != null) {
try {
- outbox.onAcknowledge(nodeId, msg.batchId());
+ outbox.onAcknowledge(nodeName, msg.batchId());
} catch (Throwable e) {
outbox.onError(e);
throw new IgniteInternalException(UNEXPECTED_ERR, "Unexpected
exception", e);
}
} else if (LOG.isDebugEnabled()) {
- LOG.debug("Stale acknowledge message received: [nodeId={},
queryId={}, fragmentId={}, exchangeId={}, batchId={}]",
- nodeId, msg.queryId(), msg.fragmentId(), msg.exchangeId(),
msg.batchId());
+ LOG.debug("Stale acknowledge message received: [nodeName={},
queryId={}, fragmentId={}, exchangeId={}, batchId={}]",
+ nodeName, msg.queryId(), msg.fragmentId(),
msg.exchangeId(), msg.batchId());
}
}
- protected void onMessage(String nodeId, QueryBatchMessage msg) {
+ private void onMessage(String nodeName, QueryBatchMessage msg) {
Inbox<?> inbox = mailboxRegistry.inbox(msg.queryId(),
msg.exchangeId());
if (inbox == null && msg.batchId() == 0) {
// first message sent before a fragment is built
// note that an inbox source fragment id is also used as an
exchange id
- Inbox<?> newInbox = new Inbox<>(baseInboxContext(nodeId,
msg.queryId(), msg.fragmentId()),
+ Inbox<?> newInbox = new Inbox<>(baseInboxContext(nodeName,
msg.queryId(), msg.fragmentId()),
this, mailboxRegistry, msg.exchangeId(), msg.exchangeId());
inbox = mailboxRegistry.register(newInbox);
@@ -200,22 +200,22 @@ public class ExchangeServiceImpl implements
ExchangeService {
if (inbox != null) {
try {
- inbox.onBatchReceived(nodeId, msg.batchId(), msg.last(),
Commons.cast(msg.rows()));
+ inbox.onBatchReceived(nodeName, msg.batchId(), msg.last(),
Commons.cast(msg.rows()));
} catch (Throwable e) {
inbox.onError(e);
throw new IgniteInternalException(UNEXPECTED_ERR, "Unexpected
exception", e);
}
} else if (LOG.isDebugEnabled()) {
- LOG.debug("Stale batch message received: [nodeId={}, queryId={},
fragmentId={}, exchangeId={}, batchId={}]",
- nodeId, msg.queryId(), msg.fragmentId(), msg.exchangeId(),
msg.batchId());
+ LOG.debug("Stale batch message received: [nodeName={}, queryId={},
fragmentId={}, exchangeId={}, batchId={}]",
+ nodeName, msg.queryId(), msg.fragmentId(),
msg.exchangeId(), msg.batchId());
}
}
/**
* Get minimal execution context to meet Inbox needs.
*/
- private ExecutionContext<?> baseInboxContext(String nodeId, UUID qryId,
long fragmentId) {
+ private ExecutionContext<?> baseInboxContext(String nodeName, UUID qryId,
long fragmentId) {
return new ExecutionContext<>(
BaseQueryContext.builder()
.logger(LOG)
@@ -223,7 +223,7 @@ public class ExchangeServiceImpl implements ExchangeService
{
taskExecutor,
qryId,
localNode,
- nodeId,
+ nodeName,
new FragmentDescription(
fragmentId,
null,
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index 42bd2aa5f1..5d79cf740a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -84,7 +84,7 @@ public class ExecutionContext<RowT> extends
AbstractQueryContext implements Data
private final ClusterNode localNode;
- private final String originatingNodeId;
+ private final String originatingNodeName;
private final RowHandler<RowT> handler;
@@ -106,13 +106,13 @@ public class ExecutionContext<RowT> extends
AbstractQueryContext implements Data
/**
* Constructor.
*
- * @param executor Task executor.
- * @param qctx Base query context.
- * @param qryId Query ID.
+ * @param executor Task executor.
+ * @param qctx Base query context.
+ * @param qryId Query ID.
* @param fragmentDesc Partitions information.
- * @param handler Row handler.
- * @param params Parameters.
- * @param tx Transaction.
+ * @param handler Row handler.
+ * @param params Parameters.
+ * @param tx Transaction.
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public ExecutionContext(
@@ -120,7 +120,7 @@ public class ExecutionContext<RowT> extends
AbstractQueryContext implements Data
QueryTaskExecutor executor,
UUID qryId,
ClusterNode localNode,
- String originatingNodeId,
+ String originatingNodeName,
FragmentDescription fragmentDesc,
RowHandler<RowT> handler,
Map<String, Object> params,
@@ -135,7 +135,7 @@ public class ExecutionContext<RowT> extends
AbstractQueryContext implements Data
this.handler = handler;
this.params = params;
this.localNode = localNode;
- this.originatingNodeId = originatingNodeId;
+ this.originatingNodeName = originatingNodeName;
this.tx = tx;
expressionFactory = new ExpressionFactoryImpl<>(
@@ -220,10 +220,10 @@ public class ExecutionContext<RowT> extends
AbstractQueryContext implements Data
}
/**
- * Get originating node ID.
+ * Get originating node consistent ID.
*/
- public String originatingNodeId() {
- return originatingNodeId;
+ public String originatingNodeName() {
+ return originatingNodeName;
}
/**
@@ -290,7 +290,7 @@ public class ExecutionContext<RowT> extends
AbstractQueryContext implements Data
/**
* Sets correlated value.
*
- * @param id Correlation ID.
+ * @param id Correlation ID.
* @param value Correlated value.
*/
public void setCorrelated(@NotNull Object value, int id) {
@@ -323,8 +323,8 @@ public class ExecutionContext<RowT> extends
AbstractQueryContext implements Data
}
/**
- * Submits a Runnable task for execution and returns a Future representing
that task. The Future's {@code get} method will return {@code
- * null} upon <em>successful</em> completion.
+ * Submits a Runnable task for execution and returns a Future representing
that task. The Future's {@code get} method will return
+ * {@code null} upon <em>successful</em> completion.
*
* @param task the task to submit.
* @return a {@link CompletableFuture} representing pending task
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 81c70e51f4..540c9ec6bf 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -289,8 +289,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
return new AsyncWrapper<>(res.iterator());
}
- private void onMessage(String nodeId, QueryStartRequest msg) {
- assert nodeId != null && msg != null;
+ private void onMessage(String nodeName, QueryStartRequest msg) {
+ assert nodeName != null && msg != null;
DistributedQueryManager queryManager =
queryManagerMap.computeIfAbsent(msg.queryId(), key -> {
BaseQueryContext ctx = createQueryContext(key, msg.schema(),
msg.parameters(), msg.txTime());
@@ -298,33 +298,33 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
return new DistributedQueryManager(ctx);
});
- queryManager.submitFragment(nodeId, msg.root(),
msg.fragmentDescription());
+ queryManager.submitFragment(nodeName, msg.root(),
msg.fragmentDescription());
}
- private void onMessage(String nodeId, QueryStartResponse msg) {
- assert nodeId != null && msg != null;
+ private void onMessage(String nodeName, QueryStartResponse msg) {
+ assert nodeName != null && msg != null;
DistributedQueryManager dqm = queryManagerMap.get(msg.queryId());
if (dqm != null) {
- dqm.acknowledgeFragment(nodeId, msg.fragmentId(), msg.error());
+ dqm.acknowledgeFragment(nodeName, msg.fragmentId(), msg.error());
}
}
- private void onMessage(String nodeId, ErrorMessage msg) {
- assert nodeId != null && msg != null;
+ private void onMessage(String nodeName, ErrorMessage msg) {
+ assert nodeName != null && msg != null;
DistributedQueryManager dqm = queryManagerMap.get(msg.queryId());
if (dqm != null) {
- RemoteException e = new RemoteException(nodeId, msg.queryId(),
msg.fragmentId(), msg.error());
+ RemoteException e = new RemoteException(nodeName, msg.queryId(),
msg.fragmentId(), msg.error());
dqm.onError(e);
}
}
- private void onMessage(String nodeId, QueryCloseMessage msg) {
- assert nodeId != null && msg != null;
+ private void onMessage(String nodeName, QueryCloseMessage msg) {
+ assert nodeName != null && msg != null;
DistributedQueryManager dqm = queryManagerMap.get(msg.queryId());
@@ -352,7 +352,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
/** {@inheritDoc} */
@Override
public void onDisappeared(ClusterNode member) {
- queryManagerMap.values().forEach(qm -> qm.onNodeLeft(member.id()));
+ queryManagerMap.values().forEach(qm -> qm.onNodeLeft(member.name()));
}
/** Returns local fragments for the query with given id. */
@@ -413,7 +413,9 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
return List.copyOf(localFragments);
}
- private void sendFragment(String targetNodeId, Fragment fragment,
FragmentDescription desc) throws IgniteInternalCheckedException {
+ private void sendFragment(
+ String targetNodeName, Fragment fragment, FragmentDescription
desc
+ ) throws IgniteInternalCheckedException {
QueryStartRequest req = FACTORY.queryStartRequest()
.queryId(ctx.queryId())
.fragmentId(fragment.fragmentId())
@@ -425,10 +427,10 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
.build();
var fut = new CompletableFuture<Void>();
- remoteFragmentInitCompletion.put(new
RemoteFragmentKey(targetNodeId, fragment.fragmentId()), fut);
+ remoteFragmentInitCompletion.put(new
RemoteFragmentKey(targetNodeName, fragment.fragmentId()), fut);
try {
- msgSrvc.send(targetNodeId, req);
+ msgSrvc.send(targetNodeName, req);
} catch (Exception ex) {
fut.complete(null);
@@ -440,7 +442,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
}
}
- private void acknowledgeFragment(String nodeId, long fragmentId,
@Nullable Throwable ex) {
+ private void acknowledgeFragment(String nodeName, long fragmentId,
@Nullable Throwable ex) {
if (ex != null) {
Long rootFragmentId0 = rootFragmentId;
@@ -455,7 +457,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
}
}
- remoteFragmentInitCompletion.get(new RemoteFragmentKey(nodeId,
fragmentId)).complete(null);
+ remoteFragmentInitCompletion.get(new RemoteFragmentKey(nodeName,
fragmentId)).complete(null);
}
private void onError(RemoteException ex) {
@@ -466,15 +468,16 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
});
}
- private void onNodeLeft(String nodeId) {
- remoteFragmentInitCompletion.entrySet().stream().filter(e ->
nodeId.equals(e.getKey().nodeId()))
+ private void onNodeLeft(String nodeName) {
+ remoteFragmentInitCompletion.entrySet().stream()
+ .filter(e -> nodeName.equals(e.getKey().nodeName()))
.forEach(e -> e.getValue()
.completeExceptionally(new IgniteInternalException(
- NODE_LEFT_ERR, "Node left the cluster
[nodeId=" + nodeId + "]")));
+ NODE_LEFT_ERR, "Node left the cluster
[nodeName=" + nodeName + "]")));
}
private void executeFragment(FragmentPlan plan, ExecutionContext<RowT>
ectx) {
- String origNodeId = ectx.originatingNodeId();
+ String origNodeName = ectx.originatingNodeName();
AbstractNode<RowT> node =
implementorFactory.create(ectx).go(plan.root());
@@ -503,14 +506,14 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
try {
msgSrvc.send(
- origNodeId,
+ origNodeName,
FACTORY.queryStartResponse()
.queryId(ectx.queryId())
.fragmentId(ectx.fragmentId())
.build()
);
} catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalException(MESSAGE_SEND_ERR, "Failed to
send reply. [nodeId=" + origNodeId + ']', e);
+ throw new IgniteInternalException(MESSAGE_SEND_ERR, "Failed to
send reply. [nodeName=" + origNodeName + ']', e);
}
if (node instanceof Outbox) {
@@ -518,13 +521,13 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
}
}
- private ExecutionContext<RowT> createContext(String initiatorNodeId,
FragmentDescription desc) {
+ private ExecutionContext<RowT> createContext(String initiatorNodeName,
FragmentDescription desc) {
return new ExecutionContext<>(
ctx,
taskExecutor,
ctx.queryId(),
localNode,
- initiatorNodeId,
+ initiatorNodeName,
desc,
handler,
Commons.parametersMap(ctx.parameters()),
@@ -561,7 +564,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
private AsyncCursor<List<Object>> execute(MultiStepPlan plan) {
taskExecutor.execute(() -> {
- plan.init(mappingSrvc, new
MappingQueryContext(localNode.id()));
+ plan.init(mappingSrvc, new
MappingQueryContext(localNode.name()));
List<Fragment> fragments = plan.fragments();
@@ -585,8 +588,8 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
plan.remotes(fragment)
);
- for (String nodeId : fragmentDesc.nodeIds()) {
- sendFragment(nodeId, fragment, fragmentDesc);
+ for (String nodeName : fragmentDesc.nodeNames()) {
+ sendFragment(nodeName, fragment, fragmentDesc);
}
}
} catch (Throwable e) {
@@ -628,7 +631,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
.thenCompose(tmp -> {
Map<String, List<CompletableFuture<?>>>
requestsPerNode = new HashMap<>();
for (Map.Entry<RemoteFragmentKey,
CompletableFuture<Void>> entry : remoteFragmentInitCompletion.entrySet()) {
-
requestsPerNode.computeIfAbsent(entry.getKey().nodeId(), key -> new
ArrayList<>()).add(entry.getValue());
+
requestsPerNode.computeIfAbsent(entry.getKey().nodeName(), key -> new
ArrayList<>()).add(entry.getValue());
}
List<CompletableFuture<?>> cancelFuts = new
ArrayList<>();
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index de2e94c06b..0c19d63b03 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -190,9 +190,9 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
IgniteDistribution distr = rel.distribution();
Destination<RowT> dest = distr.destination(ctx, affSrvc,
ctx.group(rel.sourceId()));
- String localNodeId = ctx.localNode().id();
+ String localNodeName = ctx.localNode().name();
- FilterNode<RowT> node = new FilterNode<>(ctx, r ->
Objects.equals(localNodeId, first(dest.targets(r))));
+ FilterNode<RowT> node = new FilterNode<>(ctx, r ->
Objects.equals(localNodeName, first(dest.targets(r))));
Node<RowT> input = visit(rel.getInput());
@@ -320,7 +320,7 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
ColocationGroup group = ctx.group(rel.sourceId());
Comparator<RowT> comp = idx.type() == Type.SORTED ?
ctx.expressionFactory().comparator(outputCollation) : null;
- if (!group.nodeIds().contains(ctx.localNode().id())) {
+ if (!group.nodeNames().contains(ctx.localNode().name())) {
return new ScanNode<>(ctx, Collections.emptyList());
}
@@ -330,7 +330,7 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
idx,
tbl,
rel.collation().getKeys(),
- group.partitions(ctx.localNode().id()),
+ group.partitions(ctx.localNode().name()),
comp,
ranges,
filters,
@@ -359,7 +359,7 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
ColocationGroup group = ctx.group(rel.sourceId());
- if (!group.nodeIds().contains(ctx.localNode().id())) {
+ if (!group.nodeNames().contains(ctx.localNode().name())) {
return new ScanNode<>(ctx, Collections.emptyList());
}
@@ -367,7 +367,7 @@ public class LogicalRelImplementor<RowT> implements
IgniteRelVisitor<Node<RowT>>
ctx,
ctx.rowHandler().factory(ctx.getTypeFactory(), rowType),
tbl,
- group.partitions(ctx.localNode().id()),
+ group.partitions(ctx.localNode().name()),
filters,
prj,
requiredColumns == null ? null : requiredColumns.toBitSet()
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
index c004c45e78..2f00be6d6b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/MailboxRegistryImpl.java
@@ -177,8 +177,8 @@ public class MailboxRegistryImpl implements
MailboxRegistry, TopologyEventHandle
/** {@inheritDoc} */
@Override
public void onDisappeared(ClusterNode member) {
- locals.values().forEach(n -> n.onNodeLeft(member.id()));
- remotes.values().forEach(n -> n.onNodeLeft(member.id()));
+ locals.values().forEach(n -> n.onNodeLeft(member.name()));
+ remotes.values().forEach(n -> n.onNodeLeft(member.name()));
}
private static class MailboxKey {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RemoteFragmentKey.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RemoteFragmentKey.java
index 3ae6d665e0..312f3491c5 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RemoteFragmentKey.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RemoteFragmentKey.java
@@ -24,24 +24,24 @@ package org.apache.ignite.internal.sql.engine.exec;
* fragment id only to distinguish between remote fragments, the node id
should be attached.
*/
public class RemoteFragmentKey {
- private final String nodeId;
+ private final String nodeName;
private final long fragmentId;
/**
* Creates an object.
*
- * @param nodeId Id of the node that own a fragment.
+ * @param nodeName Consistent id of the node that own a fragment.
* @param fragmentId Id of the particular fragment owned by a remote node.
*/
- public RemoteFragmentKey(String nodeId, long fragmentId) {
- this.nodeId = nodeId;
+ public RemoteFragmentKey(String nodeName, long fragmentId) {
+ this.nodeName = nodeName;
this.fragmentId = fragmentId;
}
- /** Returns an id of the remote node. */
- public String nodeId() {
- return nodeId;
+ /** Returns the consistent id of a remote node. */
+ public String nodeName() {
+ return nodeName;
}
/** Returns an id os the fragment. */
@@ -63,12 +63,12 @@ public class RemoteFragmentKey {
if (fragmentId != that.fragmentId) {
return false;
}
- return nodeId.equals(that.nodeId);
+ return nodeName.equals(that.nodeName);
}
/** {@inheritDoc} */
@Override public int hashCode() {
- int res = nodeId.hashCode();
+ int res = nodeName.hashCode();
res = 31 * res + (int) (fragmentId ^ (fragmentId >>> 32));
return res;
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
index 0069540164..c519c9a019 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Inbox.java
@@ -51,7 +51,7 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
private final Map<String, Buffer> perNodeBuffers;
- private volatile Collection<String> srcNodeIds;
+ private volatile Collection<String> srcNodeNames;
private Comparator<RowT> comp;
@@ -64,10 +64,10 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
/**
* Constructor.
*
- * @param ctx Execution context.
- * @param exchange Exchange service.
- * @param registry Mailbox registry.
- * @param exchangeId Exchange ID.
+ * @param ctx Execution context.
+ * @param exchange Exchange service.
+ * @param registry Mailbox registry.
+ * @param exchangeId Exchange ID.
* @param srcFragmentId Source fragment ID.
*/
public Inbox(
@@ -97,13 +97,13 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
* Inits this Inbox.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*
- * @param ctx Execution context.
- * @param srcNodeIds Source node IDs.
- * @param comp Optional comparator for merge exchange.
+ * @param ctx Execution context.
+ * @param srcNodeNames Source nodes' consistent IDs.
+ * @param comp Optional comparator for merge exchange.
*/
public void init(
- ExecutionContext<RowT> ctx, Collection<String> srcNodeIds,
@Nullable Comparator<RowT> comp) {
- assert srcNodeIds != null : "Collection srcNodeIds not found for
exchangeId: " + exchangeId;
+ ExecutionContext<RowT> ctx, Collection<String> srcNodeNames,
@Nullable Comparator<RowT> comp) {
+ assert srcNodeNames != null : "Collection srcNodeNames not found for
exchangeId: " + exchangeId;
assert context().fragmentId() == ctx.fragmentId() : "different
fragments unsupported: previous=" + context().fragmentId()
+ " current=" + ctx.fragmentId();
@@ -115,13 +115,13 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
this.comp = comp;
// memory barier
- this.srcNodeIds = new HashSet<>(srcNodeIds);
+ this.srcNodeNames = new HashSet<>(srcNodeNames);
}
/** {@inheritDoc} */
@Override
public void request(int rowsCnt) throws Exception {
- assert srcNodeIds != null;
+ assert srcNodeNames != null;
assert rowsCnt > 0 && requested == 0;
checkState();
@@ -162,13 +162,13 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
/**
* Pushes a batch into a buffer.
*
- * @param srcNodeId Source node id.
- * @param batchId Batch ID.
- * @param last Last batch flag.
- * @param rows Rows.
+ * @param srcNodeName Source node consistent id.
+ * @param batchId Batch ID.
+ * @param last Last batch flag.
+ * @param rows Rows.
*/
- public void onBatchReceived(String srcNodeId, int batchId, boolean last,
List<RowT> rows) throws Exception {
- Buffer buf = getOrCreateBuffer(srcNodeId);
+ public void onBatchReceived(String srcNodeName, int batchId, boolean last,
List<RowT> rows) throws Exception {
+ Buffer buf = getOrCreateBuffer(srcNodeName);
boolean waitingBefore = buf.check() == State.WAITING;
@@ -187,11 +187,11 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
private void push() throws Exception {
if (buffers == null) {
- for (String node : srcNodeIds) {
+ for (String node : srcNodeNames) {
checkNode(node);
}
- buffers = srcNodeIds.stream()
+ buffers = srcNodeNames.stream()
.map(this::getOrCreateBuffer)
.collect(Collectors.toList());
@@ -328,41 +328,41 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
}
}
- private void acknowledge(String nodeId, int batchId) throws
IgniteInternalCheckedException {
- exchange.acknowledge(nodeId, queryId(), srcFragmentId, exchangeId,
batchId);
+ private void acknowledge(String nodeName, int batchId) throws
IgniteInternalCheckedException {
+ exchange.acknowledge(nodeName, queryId(), srcFragmentId, exchangeId,
batchId);
}
- private Buffer getOrCreateBuffer(String nodeId) {
- return perNodeBuffers.computeIfAbsent(nodeId, this::createBuffer);
+ private Buffer getOrCreateBuffer(String nodeName) {
+ return perNodeBuffers.computeIfAbsent(nodeName, this::createBuffer);
}
- private Buffer createBuffer(String nodeId) {
- return new Buffer(nodeId);
+ private Buffer createBuffer(String nodeName) {
+ return new Buffer(nodeName);
}
/**
* OnNodeLeft.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public void onNodeLeft(String nodeId) {
- if (context().originatingNodeId().equals(nodeId) && srcNodeIds ==
null) {
+ public void onNodeLeft(String nodeName) {
+ if (context().originatingNodeName().equals(nodeName) && srcNodeNames
== null) {
context().execute(this::close, this::onError);
- } else if (srcNodeIds != null && srcNodeIds.contains(nodeId)) {
- context().execute(() -> onNodeLeft0(nodeId), this::onError);
+ } else if (srcNodeNames != null && srcNodeNames.contains(nodeName)) {
+ context().execute(() -> onNodeLeft0(nodeName), this::onError);
}
}
- private void onNodeLeft0(String nodeId) throws Exception {
+ private void onNodeLeft0(String nodeName) throws Exception {
checkState();
- if (getOrCreateBuffer(nodeId).check() != State.END) {
- throw new IgniteInternalCheckedException(NODE_LEFT_ERR, "Failed to
execute query, node left [nodeId=" + nodeId + ']');
+ if (getOrCreateBuffer(nodeName).check() != State.END) {
+ throw new IgniteInternalCheckedException(NODE_LEFT_ERR, "Failed to
execute query, node left [nodeName=" + nodeName + ']');
}
}
- private void checkNode(String nodeId) throws
IgniteInternalCheckedException {
- if (!exchange.alive(nodeId)) {
- throw new IgniteInternalCheckedException(NODE_LEFT_ERR, "Failed to
execute query, node left [nodeId=" + nodeId + ']');
+ private void checkNode(String nodeName) throws
IgniteInternalCheckedException {
+ if (!exchange.alive(nodeName)) {
+ throw new IgniteInternalCheckedException(NODE_LEFT_ERR, "Failed to
execute query, node left [nodeName=" + nodeName + ']');
}
}
@@ -422,7 +422,7 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
private static final Batch<?> END = new Batch<>(0, false, null);
private final class Buffer {
- private final String nodeId;
+ private final String nodeName;
private int lastEnqueued = -1;
@@ -430,8 +430,8 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
private Batch<RowT> curr = waitingMark();
- private Buffer(String nodeId) {
- this.nodeId = nodeId;
+ private Buffer(String nodeName) {
+ this.nodeName = nodeName;
}
private void offer(int id, boolean last, List<RowT> rows) {
@@ -488,7 +488,7 @@ public class Inbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, Si
RowT row = curr.rows.set(curr.idx++, null);
if (curr.idx == curr.rows.size()) {
- acknowledge(nodeId, curr.batchId);
+ acknowledge(nodeName, curr.batchId);
if (!isEnd()) {
curr = pollBatch();
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
index 007463ac97..d18a2ef5c0 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/Outbox.java
@@ -63,12 +63,12 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*
- * @param ctx Execution context.
- * @param exchange Exchange service.
- * @param registry Mailbox registry.
- * @param exchangeId Exchange ID.
+ * @param ctx Execution context.
+ * @param exchange Exchange service.
+ * @param registry Mailbox registry.
+ * @param exchangeId Exchange ID.
* @param targetFragmentId Target fragment ID.
- * @param dest Destination.
+ * @param dest Destination.
*/
public Outbox(
ExecutionContext<RowT> ctx,
@@ -95,15 +95,15 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
/**
* Callback method.
*
- * @param nodeId Target ID.
+ * @param nodeName Target consistent ID.
* @param batchId Batch ID.
*/
- public void onAcknowledge(String nodeId, int batchId) throws Exception {
- assert nodeBuffers.containsKey(nodeId);
+ public void onAcknowledge(String nodeName, int batchId) throws Exception {
+ assert nodeBuffers.containsKey(nodeName);
checkState();
- nodeBuffers.get(nodeId).acknowledge(batchId);
+ nodeBuffers.get(nodeName).acknowledge(batchId);
}
/**
@@ -153,7 +153,8 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
}
/** {@inheritDoc} */
- @Override public void onError(Throwable e) {
+ @Override
+ public void onError(Throwable e) {
try {
sendError(e);
} catch (IgniteInternalCheckedException ex) {
@@ -198,28 +199,28 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
return this;
}
- private void sendBatch(String nodeId, int batchId, boolean last,
List<RowT> rows) throws IgniteInternalCheckedException {
- exchange.sendBatch(nodeId, queryId(), targetFragmentId, exchangeId,
batchId, last, rows);
+ private void sendBatch(String nodeName, int batchId, boolean last,
List<RowT> rows) throws IgniteInternalCheckedException {
+ exchange.sendBatch(nodeName, queryId(), targetFragmentId, exchangeId,
batchId, last, rows);
}
private void sendError(Throwable err) throws
IgniteInternalCheckedException {
- exchange.sendError(context().originatingNodeId(), queryId(),
fragmentId(), err);
+ exchange.sendError(context().originatingNodeName(), queryId(),
fragmentId(), err);
}
- private void sendInboxClose(String nodeId) {
+ private void sendInboxClose(String nodeName) {
try {
- exchange.closeInbox(nodeId, queryId(), targetFragmentId,
exchangeId);
+ exchange.closeInbox(nodeName, queryId(), targetFragmentId,
exchangeId);
} catch (IgniteInternalCheckedException e) {
LOG.info("Unable to send cancel message", e);
}
}
- private Buffer getOrCreateBuffer(String nodeId) {
- return nodeBuffers.computeIfAbsent(nodeId, this::createBuffer);
+ private Buffer getOrCreateBuffer(String nodeName) {
+ return nodeBuffers.computeIfAbsent(nodeName, this::createBuffer);
}
- private Buffer createBuffer(String nodeId) {
- return new Buffer(nodeId);
+ private Buffer createBuffer(String nodeName) {
+ return new Buffer(nodeName);
}
private void flush() throws Exception {
@@ -258,14 +259,14 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
* OnNodeLeft.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public void onNodeLeft(String nodeId) {
- if (nodeId.equals(context().originatingNodeId())) {
+ public void onNodeLeft(String nodeName) {
+ if (nodeName.equals(context().originatingNodeName())) {
context().execute(this::close, this::onError);
}
}
private final class Buffer {
- private final String nodeId;
+ private final String nodeName;
private int hwm = -1;
@@ -273,8 +274,8 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
private List<RowT> curr;
- private Buffer(String nodeId) {
- this.nodeId = nodeId;
+ private Buffer(String nodeName) {
+ this.nodeName = nodeName;
curr = new ArrayList<>(IO_BATCH_SIZE);
}
@@ -301,7 +302,7 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
assert ready();
if (curr.size() == IO_BATCH_SIZE) {
- sendBatch(nodeId, ++hwm, false, curr);
+ sendBatch(nodeName, ++hwm, false, curr);
curr = new ArrayList<>(IO_BATCH_SIZE);
}
@@ -323,7 +324,7 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
List<RowT> tmp = curr;
curr = null;
- sendBatch(nodeId, batchId, true, tmp);
+ sendBatch(nodeName, batchId, true, tmp);
}
/**
@@ -357,7 +358,7 @@ public class Outbox<RowT> extends AbstractNode<RowT>
implements Mailbox<RowT>, S
curr = null;
if (currBatchId >= 0) {
- sendInboxClose(nodeId);
+ sendInboxClose(nodeName);
}
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageListener.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageListener.java
index f45dce72e0..555d2687e2 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageListener.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageListener.java
@@ -27,8 +27,8 @@ public interface MessageListener {
/**
* OnMessage.
*
- * @param nodeId Sender node ID.
- * @param msg Message.
+ * @param nodeName Sender node consistent ID.
+ * @param msg Message.
*/
- void onMessage(String nodeId, NetworkMessage msg);
+ void onMessage(String nodeName, NetworkMessage msg);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageService.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageService.java
index c4ba4fe88c..633ef7f86d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageService.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageService.java
@@ -25,28 +25,27 @@ import org.apache.ignite.network.NetworkMessage;
* MessageService interface.
*/
// TODO: Documentation https://issues.apache.org/jira/browse/IGNITE-15859
-// TODO: Service should operate with consistent IDs, see
https://issues.apache.org/jira/browse/IGNITE-18063
public interface MessageService extends LifecycleAware {
/**
* Sends a message to given node.
*
- * @param nodeId Node ID.
- * @param msg Message.
+ * @param nodeName Node consistent ID.
+ * @param msg Message.
*/
- void send(String nodeId, NetworkMessage msg) throws
IgniteInternalCheckedException;
+ void send(String nodeName, NetworkMessage msg) throws
IgniteInternalCheckedException;
/**
* Checks whether a node with given ID is alive.
*
- * @param nodeId Node ID.
+ * @param nodeName Node consistent ID.
* @return {@code True} if node is alive.
*/
- boolean alive(String nodeId);
+ boolean alive(String nodeName);
/**
* Registers a listener for messages of a given type.
*
- * @param lsnr Listener.
+ * @param lsnr Listener.
* @param msgId Message id.
*/
void register(MessageListener lsnr, short msgId);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
index d74516862f..425a5c5c68 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/message/MessageServiceImpl.java
@@ -43,7 +43,7 @@ public class MessageServiceImpl implements MessageService {
private final MessagingService messagingSrvc;
- private final String locNodeId;
+ private final String locNodeName;
private final QueryTaskExecutor taskExecutor;
@@ -66,7 +66,7 @@ public class MessageServiceImpl implements MessageService {
this.taskExecutor = taskExecutor;
this.busyLock = busyLock;
- locNodeId = topSrvc.localMember().id();
+ locNodeName = topSrvc.localMember().name();
}
/** {@inheritDoc} */
@@ -77,20 +77,22 @@ public class MessageServiceImpl implements MessageService {
/** {@inheritDoc} */
@Override
- public void send(String nodeId, NetworkMessage msg) throws
IgniteInternalCheckedException {
+ public void send(String nodeName, NetworkMessage msg) throws
IgniteInternalCheckedException {
if (!busyLock.enterBusy()) {
return;
}
try {
- if (locNodeId.equals(nodeId)) {
- onMessage(nodeId, msg);
+ if (locNodeName.equals(nodeName)) {
+ onMessage(nodeName, msg);
} else {
- ClusterNode node = topSrvc.allMembers().stream()
- .filter(cn -> nodeId.equals(cn.id()))
- .findFirst()
- .orElseThrow(() -> new IgniteInternalException(
- NODE_LEFT_ERR, "Failed to send message to node
(has node left grid?): " + nodeId));
+ ClusterNode node = topSrvc.getByConsistentId(nodeName);
+
+ if (node == null) {
+ throw new IgniteInternalException(
+ NODE_LEFT_ERR, "Failed to send message to node
(has node left grid?): " + nodeName
+ );
+ }
try {
messagingSrvc.send(node, msg).join();
@@ -121,18 +123,16 @@ public class MessageServiceImpl implements MessageService
{
/** {@inheritDoc} */
@Override
- public boolean alive(String nodeId) {
- return topSrvc.allMembers().stream()
- .map(ClusterNode::id)
- .anyMatch(id -> id.equals(nodeId));
+ public boolean alive(String nodeName) {
+ return topSrvc.getByConsistentId(nodeName) != null;
}
- private void onMessage(String nodeId, NetworkMessage msg) {
+ private void onMessage(String consistentId, NetworkMessage msg) {
if (msg instanceof ExecutionContextAwareMessage) {
ExecutionContextAwareMessage msg0 = (ExecutionContextAwareMessage)
msg;
- taskExecutor.execute(msg0.queryId(), msg0.fragmentId(), () ->
onMessageInternal(nodeId, msg));
+ taskExecutor.execute(msg0.queryId(), msg0.fragmentId(), () ->
onMessageInternal(consistentId, msg));
} else {
- taskExecutor.execute(() -> onMessageInternal(nodeId, msg));
+ taskExecutor.execute(() -> onMessageInternal(consistentId, msg));
}
}
@@ -144,13 +144,13 @@ public class MessageServiceImpl implements MessageService
{
try {
assert msg.groupType() == GROUP_TYPE : "unexpected message group
grpType=" + msg.groupType();
- onMessage(sender.id(), msg);
+ onMessage(sender.name(), msg);
} finally {
busyLock.leaveBusy();
}
}
- private void onMessageInternal(String nodeId, NetworkMessage msg) {
+ private void onMessageInternal(String consistentId, NetworkMessage msg) {
if (!busyLock.enterBusy()) {
return;
}
@@ -161,7 +161,7 @@ public class MessageServiceImpl implements MessageService {
"there is no listener for msgType=" + msg.messageType()
);
- lsnr.onMessage(nodeId, msg);
+ lsnr.onMessage(consistentId, msg);
} finally {
busyLock.leaveBusy();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java
index 8a08a4bf2a..21f8f08bfb 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/ColocationGroup.java
@@ -42,18 +42,18 @@ public class ColocationGroup implements Serializable {
private static final int SYNTHETIC_PARTITIONS_COUNT = 512;
// TODO:
IgniteSystemProperties.getInteger("IGNITE_CALCITE_SYNTHETIC_PARTITIONS_COUNT",
512);
- private List<Long> sourceIds;
+ private final List<Long> sourceIds;
- private List<String> nodeIds;
+ private final List<String> nodeNames;
- private List<List<String>> assignments;
+ private final List<List<String>> assignments;
/**
* ForNodes.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public static ColocationGroup forNodes(List<String> nodeIds) {
- return new ColocationGroup(null, nodeIds, null);
+ public static ColocationGroup forNodes(List<String> nodeNames) {
+ return new ColocationGroup(null, nodeNames, null);
}
/**
@@ -76,9 +76,9 @@ public class ColocationGroup implements Serializable {
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- private ColocationGroup(List<Long> sourceIds, List<String> nodeIds,
List<List<String>> assignments) {
+ private ColocationGroup(List<Long> sourceIds, List<String> nodeNames,
List<List<String>> assignments) {
this.sourceIds = sourceIds;
- this.nodeIds = nodeIds;
+ this.nodeNames = nodeNames;
this.assignments = assignments;
}
@@ -92,8 +92,8 @@ public class ColocationGroup implements Serializable {
/**
* Get lists of nodes capable to execute a query fragment for what the
mapping is calculated.
*/
- public List<String> nodeIds() {
- return nodeIds == null ? Collections.emptyList() : nodeIds;
+ public List<String> nodeNames() {
+ return nodeNames == null ? Collections.emptyList() : nodeNames;
}
/**
@@ -139,14 +139,14 @@ public class ColocationGroup implements Serializable {
sourceIds = Commons.combine(this.sourceIds, other.sourceIds);
}
- List<String> nodeIds;
- if (this.nodeIds == null || other.nodeIds == null) {
- nodeIds = firstNotNull(this.nodeIds, other.nodeIds);
+ List<String> nodeNames;
+ if (this.nodeNames == null || other.nodeNames == null) {
+ nodeNames = firstNotNull(this.nodeNames, other.nodeNames);
} else {
- nodeIds = Commons.intersect(other.nodeIds, this.nodeIds);
+ nodeNames = Commons.intersect(other.nodeNames, this.nodeNames);
}
- if (nodeIds != null && nodeIds.isEmpty()) {
+ if (nodeNames != null && nodeNames.isEmpty()) {
throw new ColocationMappingException("Failed to map fragment to
location. "
+ "Replicated query parts are not co-located on all
nodes");
}
@@ -155,8 +155,8 @@ public class ColocationGroup implements Serializable {
if (this.assignments == null || other.assignments == null) {
assignments = firstNotNull(this.assignments, other.assignments);
- if (assignments != null && nodeIds != null) {
- Set<String> filter = new HashSet<>(nodeIds);
+ if (assignments != null && nodeNames != null) {
+ Set<String> filter = new HashSet<>(nodeNames);
List<List<String>> assignments0 = new
ArrayList<>(assignments.size());
for (int i = 0; i < assignments.size(); i++) {
@@ -175,7 +175,7 @@ public class ColocationGroup implements Serializable {
} else {
assert this.assignments.size() == other.assignments.size();
assignments = new ArrayList<>(this.assignments.size());
- Set<String> filter = nodeIds == null ? null : new
HashSet<>(nodeIds);
+ Set<String> filter = nodeNames == null ? null : new
HashSet<>(nodeNames);
for (int i = 0; i < this.assignments.size(); i++) {
List<String> assignment =
Commons.intersect(this.assignments.get(i), other.assignments.get(i));
@@ -191,7 +191,7 @@ public class ColocationGroup implements Serializable {
}
}
- return new ColocationGroup(sourceIds, nodeIds, assignments);
+ return new ColocationGroup(sourceIds, nodeNames, assignments);
}
/**
@@ -199,7 +199,7 @@ public class ColocationGroup implements Serializable {
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public ColocationGroup finalaze() {
- if (assignments == null && nodeIds == null) {
+ if (assignments == null && nodeNames == null) {
return this;
}
@@ -217,38 +217,38 @@ public class ColocationGroup implements Serializable {
return new ColocationGroup(sourceIds, new ArrayList<>(nodes),
assignments);
}
- return forNodes0(nodeIds);
+ return forNodes0(nodeNames);
}
/**
* MapToNodes.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public ColocationGroup mapToNodes(List<String> nodeIds) {
- return !nullOrEmpty(this.nodeIds) ? this : forNodes0(nodeIds);
+ public ColocationGroup mapToNodes(List<String> nodeNames) {
+ return !nullOrEmpty(this.nodeNames) ? this : forNodes0(nodeNames);
}
@NotNull
- private ColocationGroup forNodes0(List<String> nodeIds) {
+ private ColocationGroup forNodes0(List<String> nodeNames) {
List<List<String>> assignments = new
ArrayList<>(SYNTHETIC_PARTITIONS_COUNT);
for (int i = 0; i < SYNTHETIC_PARTITIONS_COUNT; i++) {
- assignments.add(asList(nodeIds.get(i % nodeIds.size())));
+ assignments.add(asList(nodeNames.get(i % nodeNames.size())));
}
- return new ColocationGroup(sourceIds, nodeIds, assignments);
+ return new ColocationGroup(sourceIds, nodeNames, assignments);
}
/**
* Returns List of partitions to scan on the given node.
*
- * @param nodeId Cluster node ID.
+ * @param nodeNames Cluster node consistent ID.
* @return List of partitions to scan on the given node.
*/
- public int[] partitions(String nodeId) {
+ public int[] partitions(String nodeNames) {
IgniteIntList parts = new IgniteIntList(assignments.size());
for (int i = 0; i < assignments.size(); i++) {
List<String> assignment = assignments.get(i);
- if (Objects.equals(nodeId, first(assignment))) {
+ if (Objects.equals(nodeNames, first(assignment))) {
parts.add(i);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/FragmentDescription.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/FragmentDescription.java
index abd28b6965..a17c0c64f9 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/FragmentDescription.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/FragmentDescription.java
@@ -63,8 +63,8 @@ public class FragmentDescription implements Serializable {
/**
* Get node ids.
*/
- public List<String> nodeIds() {
- return mapping.nodeIds();
+ public List<String> nodeNames() {
+ return mapping.nodeNames();
}
/**
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/FragmentMapping.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/FragmentMapping.java
index e1164da0b4..6951bb0fe7 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/FragmentMapping.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/FragmentMapping.java
@@ -64,8 +64,8 @@ public class FragmentMapping implements Serializable {
* Create.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public static FragmentMapping create(String nodeId) {
- return new
FragmentMapping(ColocationGroup.forNodes(Collections.singletonList(nodeId)));
+ public static FragmentMapping create(String nodeName) {
+ return new
FragmentMapping(ColocationGroup.forNodes(Collections.singletonList(nodeName)));
}
/**
@@ -139,9 +139,9 @@ public class FragmentMapping implements Serializable {
* NodeIds.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
- public List<String> nodeIds() {
+ public List<String> nodeNames() {
return colocationGroups.stream()
- .flatMap(g -> g.nodeIds().stream())
+ .flatMap(g -> g.nodeNames().stream())
.distinct().collect(Collectors.toList());
}
@@ -158,7 +158,7 @@ public class FragmentMapping implements Serializable {
colocationGroups = Commons.transform(colocationGroups,
ColocationGroup::finalaze);
- List<String> nodes = nodeIds();
+ List<String> nodes = nodeNames();
List<String> nodes0 = nodes.isEmpty() ? nodesSource.get() : nodes;
colocationGroups = Commons.transform(colocationGroups, g ->
g.mapToNodes(nodes0));
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
index b68eeccf04..33d9bf189d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/IgniteMdFragmentMapping.java
@@ -237,7 +237,7 @@ public class IgniteMdFragmentMapping implements
MetadataHandler<FragmentMappingM
List<List<String>> fakeAssignments = new
ArrayList<>(group.assignments().size());
for (int i = 0; i < group.assignments().size(); i++) {
- fakeAssignments.add(List.of(ctx.localNodeId()));
+ fakeAssignments.add(List.of(ctx.locNodeName()));
}
group = ColocationGroup.forAssignments(fakeAssignments);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/MappingServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/MappingServiceImpl.java
index 11d600ca12..c0e81d2803 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/MappingServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/MappingServiceImpl.java
@@ -58,6 +58,6 @@ public class MappingServiceImpl implements MappingService {
throw new IllegalStateException("failed to map query to execution
nodes. Nodes list is empty.");
}
- return Commons.transform(nodes, ClusterNode::id);
+ return Commons.transform(nodes, ClusterNode::name);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/RemoteException.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/RemoteException.java
index 4139b8edcc..95822b032c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/RemoteException.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/metadata/RemoteException.java
@@ -24,7 +24,7 @@ import java.util.UUID;
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public class RemoteException extends RuntimeException {
- private final String nodeId;
+ private final String nodeName;
private final UUID queryId;
@@ -33,23 +33,23 @@ public class RemoteException extends RuntimeException {
/**
* Constructor.
*
- * @param cause Cause.
- * @param nodeId Node ID.
- * @param queryId Query ID.
+ * @param cause Cause.
+ * @param nodeName Node consistent ID.
+ * @param queryId Query ID.
* @param fragmentId Fragment ID.
*/
- public RemoteException(String nodeId, UUID queryId, long fragmentId,
Throwable cause) {
+ public RemoteException(String nodeName, UUID queryId, long fragmentId,
Throwable cause) {
super("Remote query execution", cause);
- this.nodeId = nodeId;
+ this.nodeName = nodeName;
this.queryId = queryId;
this.fragmentId = fragmentId;
}
/**
- * Get node ID.
+ * Get node consistent ID.
*/
- public String nodeId() {
- return nodeId;
+ public String nodeName() {
+ return nodeName;
}
/**
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/AbstractMultiStepPlan.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/AbstractMultiStepPlan.java
index a787aadbc7..d8268617c0 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/AbstractMultiStepPlan.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/AbstractMultiStepPlan.java
@@ -96,7 +96,7 @@ public abstract class AbstractMultiStepPlan implements
MultiStepPlan {
Long2ObjectOpenHashMap<List<String>> res = new
Long2ObjectOpenHashMap<>(capacity(remotes.size()));
for (IgniteReceiver remote : remotes) {
- res.put(remote.exchangeId(),
mapping(remote.sourceFragmentId()).nodeIds());
+ res.put(remote.exchangeId(),
mapping(remote.sourceFragmentId()).nodeNames());
}
return res;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
index bcce4f89ac..81daa573c4 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/Fragment.java
@@ -111,15 +111,15 @@ public class Fragment {
FragmentMapping mapping =
IgniteMdFragmentMapping.fragmentMappingForMetadataQuery(root, mq, ctx);
if (rootFragment()) {
- mapping =
FragmentMapping.create(ctx.localNodeId()).colocate(mapping);
+ mapping =
FragmentMapping.create(ctx.locNodeName()).colocate(mapping);
}
- if (single() && mapping.nodeIds().size() > 1) {
+ if (single() && mapping.nodeNames().size() > 1) {
// this is possible when the fragment contains scan of a
replicated cache, which brings
// several nodes (actually all containing nodes) to the
colocation group, but this fragment
// supposed to be executed on a single node, so let's choose
one wisely
- mapping = FragmentMapping.create(mapping.nodeIds()
-
.get(ThreadLocalRandom.current().nextInt(mapping.nodeIds().size()))).colocate(mapping);
+ mapping = FragmentMapping.create(mapping.nodeNames()
+
.get(ThreadLocalRandom.current().nextInt(mapping.nodeNames().size()))).colocate(mapping);
}
return mapping.finalize(nodesSource);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MappingQueryContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MappingQueryContext.java
index d4cd8a8e44..74204d236f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MappingQueryContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MappingQueryContext.java
@@ -27,17 +27,17 @@ import org.apache.ignite.internal.sql.engine.util.Commons;
* Query mapping context.
*/
public class MappingQueryContext {
- private final String locNodeId;
+ private final String locNodeName;
private RelOptCluster cluster;
/**
* Constructor.
*
- * @param locNodeId Local node identifier.
+ * @param locNodeName Local node consistent ID.
*/
- public MappingQueryContext(String locNodeId) {
- this.locNodeId = locNodeId;
+ public MappingQueryContext(String locNodeName) {
+ this.locNodeName = locNodeName;
}
/** Creates a cluster. */
@@ -52,7 +52,7 @@ public class MappingQueryContext {
return cluster;
}
- public String localNodeId() {
- return locNodeId;
+ public String locNodeName() {
+ return locNodeName;
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java
index 3df332c31f..697a54502e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/DistributionFunction.java
@@ -174,9 +174,9 @@ public abstract class DistributionFunction {
@Override
public <RowT> Destination<RowT> destination(ExecutionContext<RowT>
ctx, AffinityService affinityService,
ColocationGroup m, ImmutableIntList k) {
- assert m != null && !nullOrEmpty(m.nodeIds());
+ assert m != null && !nullOrEmpty(m.nodeNames());
- return new AllNodes<>(m.nodeIds());
+ return new AllNodes<>(m.nodeNames());
}
}
@@ -193,9 +193,9 @@ public abstract class DistributionFunction {
@Override
public <RowT> Destination<RowT> destination(ExecutionContext<RowT>
ctx, AffinityService affinityService,
ColocationGroup m, ImmutableIntList k) {
- assert m != null && !nullOrEmpty(m.nodeIds());
+ assert m != null && !nullOrEmpty(m.nodeNames());
- return new RandomNode<>(m.nodeIds());
+ return new RandomNode<>(m.nodeNames());
}
}
@@ -212,11 +212,11 @@ public abstract class DistributionFunction {
@Override
public <RowT> Destination<RowT> destination(ExecutionContext<RowT>
ctx, AffinityService affinityService,
ColocationGroup m, ImmutableIntList k) {
- if (m == null || m.nodeIds() == null || m.nodeIds().size() != 1) {
+ if (m == null || m.nodeNames() == null || m.nodeNames().size() !=
1) {
throw new IllegalStateException();
}
- return new
AllNodes<>(Collections.singletonList(Objects.requireNonNull(first(m.nodeIds()))));
+ return new
AllNodes<>(Collections.singletonList(Objects.requireNonNull(first(m.nodeNames()))));
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index 7c95dad225..3cec811cb1 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -36,7 +36,6 @@ import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -45,8 +44,6 @@ import java.util.concurrent.Flow;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.configuration.ConfigurationValue;
-import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
-import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.index.IndexManager;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -59,7 +56,6 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
-import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException;
@@ -91,7 +87,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
/**
* Stop Calcite module test.
*/
-@ExtendWith(ConfigurationExtension.class)
@ExtendWith(MockitoExtension.class)
public class StopCalciteModuleTest {
/** The logger. */
@@ -102,41 +97,38 @@ public class StopCalciteModuleTest {
private static final String NODE_NAME = "mock-node-name";
@Mock
- ClusterService clusterSrvc;
+ private ClusterService clusterSrvc;
@Mock
- TableManager tableManager;
+ private TableManager tableManager;
@Mock
- IndexManager indexManager;
+ private IndexManager indexManager;
@Mock
- SchemaManager schemaManager;
+ private SchemaManager schemaManager;
@Mock
- DataStorageManager dataStorageManager;
+ private DataStorageManager dataStorageManager;
@Mock
- MessagingService msgSrvc;
+ private MessagingService msgSrvc;
@Mock
- TxManager txManager;
+ private TxManager txManager;
@Mock
- TopologyService topologySrvc;
+ private TopologyService topologySrvc;
@Mock
- InternalTable tbl;
+ private InternalTable tbl;
@Mock
- HybridClock clock;
+ private HybridClock clock;
- SchemaRegistry schemaReg;
+ private SchemaRegistry schemaReg;
- TestRevisionRegister testRevisionRegister = new TestRevisionRegister();
-
- @InjectConfiguration
- TablesConfiguration tablesConfig;
+ private final TestRevisionRegister testRevisionRegister = new
TestRevisionRegister();
/**
* Before.
@@ -149,7 +141,6 @@ public class StopCalciteModuleTest {
ClusterNode node = new ClusterNode("mock-node-id", NODE_NAME, null);
when(topologySrvc.localMember()).thenReturn(node);
-
when(topologySrvc.allMembers()).thenReturn(Collections.singleton(node));
SchemaDescriptor schemaDesc = new SchemaDescriptor(
1,
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index e7a90739a4..4931e073de 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -39,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
+import java.util.UUID;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
@@ -97,12 +98,12 @@ public class ExecutionServiceImplTest {
/** Timeout in ms for async operations. */
private static final long TIMEOUT_IN_MS = 2_000;
- private final List<String> nodeIds = List.of("node_1", "node_2", "node_3");
+ private final List<String> nodeNames = List.of("node_1", "node_2",
"node_3");
private final Map<String, List<Object[]>> dataPerNode = Map.of(
- nodeIds.get(0), List.of(new Object[]{0, 0}, new Object[]{3, 3},
new Object[]{6, 6}),
- nodeIds.get(1), List.of(new Object[]{1, 1}, new Object[]{4, 4},
new Object[]{7, 7}),
- nodeIds.get(2), List.of(new Object[]{2, 2}, new Object[]{5, 5},
new Object[]{8, 8})
+ nodeNames.get(0), List.of(new Object[]{0, 0}, new Object[]{3, 3},
new Object[]{6, 6}),
+ nodeNames.get(1), List.of(new Object[]{1, 1}, new Object[]{4, 4},
new Object[]{7, 7}),
+ nodeNames.get(2), List.of(new Object[]{2, 2}, new Object[]{5, 5},
new Object[]{8, 8})
);
private final TestTable table = createTable("TEST_TBL", 1_000_000,
IgniteDistributions.random(),
@@ -117,7 +118,7 @@ public class ExecutionServiceImplTest {
@BeforeEach
public void init() {
testCluster = new TestCluster();
- executionServices =
nodeIds.stream().map(this::create).collect(Collectors.toList());
+ executionServices =
nodeNames.stream().map(this::create).collect(Collectors.toList());
prepareService = new PrepareServiceImpl("test", 0, null);
prepareService.start();
@@ -137,7 +138,7 @@ public class ExecutionServiceImplTest {
var ctx = createContext();
var plan = prepare("SELECT * FROM test_tbl", ctx);
- nodeIds.stream().map(testCluster::node).forEach(TestNode::pauseScan);
+ nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
var cursor = execService.executePlan(plan, ctx);
@@ -171,7 +172,7 @@ public class ExecutionServiceImplTest {
var ctx = createContext();
var plan = prepare("SELECT * FROM test_tbl", ctx);
- nodeIds.stream().map(testCluster::node).forEach(TestNode::pauseScan);
+ nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
var cursor = execService.executePlan(plan, ctx);
@@ -205,14 +206,14 @@ public class ExecutionServiceImplTest {
var ctx = createContext();
var plan = prepare("SELECT * FROM test_tbl", ctx);
- nodeIds.stream().map(testCluster::node).forEach(TestNode::pauseScan);
+ nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
var expectedEx = new RuntimeException("Test error");
- testCluster.node(nodeIds.get(2)).interceptor((nodeId, msg, original)
-> {
+ testCluster.node(nodeNames.get(2)).interceptor((nodeName, msg,
original) -> {
if (msg instanceof QueryStartRequest) {
try {
-
testCluster.node(nodeIds.get(2)).messageService().send(nodeId, new
SqlQueryMessagesFactory().queryStartResponse()
+
testCluster.node(nodeNames.get(2)).messageService().send(nodeName, new
SqlQueryMessagesFactory().queryStartResponse()
.queryId(((QueryStartRequest) msg).queryId())
.fragmentId(((QueryStartRequest) msg).fragmentId())
.error(expectedEx)
@@ -222,7 +223,7 @@ public class ExecutionServiceImplTest {
throw new
IgniteInternalException(OPERATION_INTERRUPTED_ERR, e);
}
} else {
- original.onMessage(nodeId, msg);
+ original.onMessage(nodeName, msg);
}
});
@@ -254,7 +255,7 @@ public class ExecutionServiceImplTest {
var ctx = createContext();
var plan = prepare("SELECT * FROM test_tbl", ctx);
- nodeIds.stream().map(testCluster::node).forEach(TestNode::pauseScan);
+ nodeNames.stream().map(testCluster::node).forEach(TestNode::pauseScan);
var cursor = execService.executePlan(plan, ctx);
@@ -327,36 +328,32 @@ public class ExecutionServiceImplTest {
.mapToInt(i -> i).sum() == 0, TIMEOUT_IN_MS));
}
- /** Creates an execution service instance for the node with given id. */
- public ExecutionServiceImpl<Object[]> create(String nodeId) {
- if (!nodeIds.contains(nodeId)) {
- throw new IllegalArgumentException(format("Node id should be one
of {}, but was '{}'", nodeIds, nodeId));
+ /** Creates an execution service instance for the node with given
consistent id. */
+ public ExecutionServiceImpl<Object[]> create(String nodeName) {
+ if (!nodeNames.contains(nodeName)) {
+ throw new IllegalArgumentException(format("Node id should be one
of {}, but was '{}'", nodeNames, nodeName));
}
- var taskExecutor = new QueryTaskExecutorImpl(nodeId);
+ var taskExecutor = new QueryTaskExecutorImpl(nodeName);
- var node = testCluster.addNode(nodeId, taskExecutor);
+ var node = testCluster.addNode(nodeName, taskExecutor);
- node.dataset(dataPerNode.get(nodeId));
+ node.dataset(dataPerNode.get(nodeName));
var messageService = node.messageService();
var mailboxRegistry = new MailboxRegistryImpl();
+ var clusterNode = new ClusterNode(UUID.randomUUID().toString(),
nodeName, NetworkAddress.from("127.0.0.1:1111"));
- var exchangeService = new ExchangeServiceImpl(
- new ClusterNode(nodeId, "fake-test-node",
NetworkAddress.from("127.0.0.1:1111")),
- taskExecutor,
- mailboxRegistry,
- messageService
- );
+ var exchangeService = new ExchangeServiceImpl(clusterNode,
taskExecutor, mailboxRegistry, messageService);
var schemaManagerMock = mock(SqlSchemaManager.class);
when(schemaManagerMock.tableById(any(), anyInt())).thenReturn(table);
var executionService = new ExecutionServiceImpl<>(
- new ClusterNode(nodeId, "fake-test-node",
NetworkAddress.from("127.0.0.1:1111")),
+ clusterNode,
messageService,
- (single, filter) -> single ?
List.of(nodeIds.get(ThreadLocalRandom.current().nextInt(nodeIds.size()))) :
nodeIds,
+ (single, filter) -> single ?
List.of(nodeNames.get(ThreadLocalRandom.current().nextInt(nodeNames.size()))) :
nodeNames,
schemaManagerMock,
mock(DdlCommandHandler.class),
taskExecutor,
@@ -403,12 +400,12 @@ public class ExecutionServiceImplTest {
static class TestCluster {
private final Map<String, TestNode> nodes = new ConcurrentHashMap<>();
- public TestNode addNode(String nodeId, QueryTaskExecutor taskExecutor)
{
- return nodes.computeIfAbsent(nodeId, key -> new TestNode(nodeId,
taskExecutor));
+ public TestNode addNode(String nodeName, QueryTaskExecutor
taskExecutor) {
+ return nodes.computeIfAbsent(nodeName, key -> new
TestNode(nodeName, taskExecutor));
}
- public TestNode node(String nodeId) {
- return nodes.get(nodeId);
+ public TestNode node(String nodeName) {
+ return nodes.get(nodeName);
}
class TestNode {
@@ -419,12 +416,12 @@ public class ExecutionServiceImplTest {
private volatile MessageInterceptor interceptor = null;
private final QueryTaskExecutor taskExecutor;
- private final String nodeId;
+ private final String nodeName;
private boolean scanPaused = false;
- public TestNode(String nodeId, QueryTaskExecutor taskExecutor) {
- this.nodeId = nodeId;
+ public TestNode(String nodeName, QueryTaskExecutor taskExecutor) {
+ this.nodeName = nodeName;
this.taskExecutor = taskExecutor;
}
@@ -473,16 +470,16 @@ public class ExecutionServiceImplTest {
return new MessageService() {
/** {@inheritDoc} */
@Override
- public void send(String targetNodeId, NetworkMessage msg) {
- TestNode node = nodes.get(targetNodeId);
+ public void send(String nodeName, NetworkMessage msg) {
+ TestNode node = nodes.get(nodeName);
- node.onReceive(nodeId, msg);
+ node.onReceive(TestNode.this.nodeName, msg);
}
/** {@inheritDoc} */
@Override
- public boolean alive(String nodeId) {
- return !nodes.get(nodeId).dead();
+ public boolean alive(String nodeName) {
+ return !nodes.get(nodeName).dead();
}
/** {@inheritDoc} */
@@ -491,7 +488,7 @@ public class ExecutionServiceImplTest {
var old = msgListeners.put(msgId, lsnr);
if (old != null) {
- throw new RuntimeException(format("Listener was
replaced [nodeId={}, msgId={}]", nodeId, msgId));
+ throw new RuntimeException(format("Listener was
replaced [nodeName={}, msgId={}]", nodeName, msgId));
}
}
@@ -539,35 +536,35 @@ public class ExecutionServiceImplTest {
};
}
- private void onReceive(String senderNodeId, NetworkMessage
message) {
- MessageListener original = (nodeId, msg) -> {
+ private void onReceive(String senderNodeName, NetworkMessage
message) {
+ MessageListener original = (nodeName, msg) -> {
MessageListener listener =
msgListeners.get(msg.messageType());
if (listener == null) {
throw new IllegalStateException(
- format("Listener not found [senderNodeId={},
msgId={}]", nodeId, msg.messageType()));
+ format("Listener not found [senderNodeName={},
msgId={}]", nodeName, msg.messageType()));
}
if (msg instanceof ExecutionContextAwareMessage) {
ExecutionContextAwareMessage msg0 =
(ExecutionContextAwareMessage) msg;
- taskExecutor.execute(msg0.queryId(),
msg0.fragmentId(), () -> listener.onMessage(nodeId, msg));
+ taskExecutor.execute(msg0.queryId(),
msg0.fragmentId(), () -> listener.onMessage(nodeName, msg));
} else {
- taskExecutor.execute(() -> listener.onMessage(nodeId,
msg));
+ taskExecutor.execute(() ->
listener.onMessage(nodeName, msg));
}
};
MessageInterceptor interceptor = this.interceptor;
if (interceptor != null) {
- interceptor.intercept(senderNodeId, message, original);
+ interceptor.intercept(senderNodeName, message, original);
} else {
- original.onMessage(senderNodeId, message);
+ original.onMessage(senderNodeName, message);
}
}
}
interface MessageInterceptor {
- void intercept(String senderNodeId, NetworkMessage msg,
MessageListener original);
+ void intercept(String senderNodeName, NetworkMessage msg,
MessageListener original);
}
}
@@ -602,7 +599,7 @@ public class ExecutionServiceImplTest {
@Override
public ColocationGroup colocationGroup(MappingQueryContext ctx) {
- return ColocationGroup.forNodes(nodeIds);
+ return ColocationGroup.forNodes(nodeNames);
}
};
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index e16becdf44..ac18d3c684 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -27,7 +27,6 @@ import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.function.Function;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -57,7 +56,6 @@ import org.apache.ignite.raft.client.service.RaftGroupService;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
/**
* Tests execution flow of TableScanNode.
@@ -126,7 +124,7 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest {
@Override
public InternalTable table() {
- return new
TestInternalTableImpl(Mockito.mock(ReplicaService.class));
+ return new TestInternalTableImpl(mock(ReplicaService.class));
}
@Override
@@ -151,8 +149,7 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest {
UUID.randomUUID(),
Int2ObjectMaps.singleton(0, mock(RaftGroupService.class)),
PART_CNT,
- Function.identity(),
- addr -> Mockito.mock(ClusterNode.class),
+ addr -> mock(ClusterNode.class),
new TxManagerImpl(replicaSvc, new HeapLockManager(), new
HybridClockImpl()),
mock(MvTableStorage.class),
mock(TxStateTableStorage.class),
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index b4d4255cd8..d74a09714d 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -130,7 +130,6 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
UUID.randomUUID(),
Int2ObjectMaps.singleton(0, service),
1,
- Function.identity(),
consistentIdToNode,
txManager,
mock(MvTableStorage.class),
@@ -157,7 +156,6 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
UUID.randomUUID(),
Int2ObjectMaps.singleton(0, service),
1,
- Function.identity(),
consistentIdToNode,
txManager,
mock(MvTableStorage.class),
@@ -190,7 +188,6 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
UUID.randomUUID(),
Int2ObjectMaps.singleton(0, service),
1,
- Function.identity(),
consistentIdToNode,
txManager,
mock(MvTableStorage.class),
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index f6b094a324..50c6109df6 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -333,7 +333,6 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
accTblId,
accRaftClients,
1,
- Function.identity(),
consistentIdToNode,
txMgr,
Mockito.mock(MvTableStorage.class),
@@ -347,7 +346,6 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
custTblId,
custRaftClients,
1,
- Function.identity(),
consistentIdToNode,
txMgr,
Mockito.mock(MvTableStorage.class),
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index b910c8fb6a..23312776c4 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -206,8 +206,7 @@ public class ItColocationTest {
tblId,
partRafts,
PARTS,
- null,
- address -> clusterNode,
+ name -> clusterNode,
txManager,
Mockito.mock(MvTableStorage.class),
new TestTxStateTableStorage(),
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 8b62691b85..ff0fd9a51d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -249,9 +249,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
*/
private final Map<UUID, TableImpl> tablesToStopInCaseOfError = new
ConcurrentHashMap<>();
- /** Resolver that resolves a node consistent ID to node id. */
- private final Function<String, String> nodeIdResolver;
-
/** Resolver that resolves a node consistent ID to cluster node. */
private final Function<String, ClusterNode> clusterNodeResolver;
@@ -354,16 +351,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
placementDriver = new PlacementDriver(replicaSvc);
- nodeIdResolver = consistentId -> {
- ClusterNode node = topologyService.getByConsistentId(consistentId);
-
- if (node == null) {
- throw new IllegalStateException("Can't resolve ClusterNode by
its consistent ID =" + consistentId);
- }
-
- return node.id();
- };
-
clusterNodeResolver = topologyService::getByConsistentId;
tablesByIdVv = new VersionedValue<>(null, HashMap::new);
@@ -855,8 +842,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
txStatePartitionStorage,
topologyService,
placementDriver,
- peer
-> clusterNodeResolver.apply(peer.consistentId())
-
.equals(topologyService.localMember())
+
this::isLocalPeer
)
);
} catch
(NodeStoppingException ex) {
@@ -889,6 +875,10 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
allOf(futures).join();
}
+ private boolean isLocalPeer(Peer peer) {
+ return
peer.consistentId().equals(raftMgr.topologyService().localMember().name());
+ }
+
private PartitionDataStorage partitionDataStorage(MvPartitionStorage
partitionStorage, InternalTable internalTbl, int partId) {
return new SnapshotAwarePartitionDataStorage(
partitionStorage,
@@ -1053,7 +1043,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
TxStateTableStorage txStateStorage =
createTxStateTableStorage(tableCfg);
InternalTableImpl internalTable = new InternalTableImpl(name, tblId,
new Int2ObjectOpenHashMap<>(partitions),
- partitions, nodeIdResolver, clusterNodeResolver, txManager,
tableStorage, txStateStorage, replicaSvc, clock);
+ partitions, clusterNodeResolver, txManager, tableStorage,
txStateStorage, replicaSvc, clock);
// TODO: IGNITE-16288 directIndexIds should use async configuration API
var table = new TableImpl(internalTable, lockMgr, () ->
CompletableFuture.supplyAsync(() -> directIndexIds()));
@@ -1882,8 +1872,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
txStatePartitionStorage,
raftMgr.topologyService(),
placementDriver,
- peer ->
clusterNodeResolver.apply(peer.consistentId())
-
.equals(raftMgr.topologyService().localMember())
+ peer -> isLocalPeer(peer)
)
);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 60e5e8e332..674107ff1f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -98,9 +98,6 @@ public class InternalTableImpl implements InternalTable {
/** Table identifier. */
private final UUID tableId;
- /** Resolver that resolves a node consistent ID to node ID. */
- private final Function<String, String> nodeIdResolver;
-
/** Resolver that resolves a node consistent ID to cluster node. */
private final Function<String, ClusterNode> clusterNodeResolver;
@@ -143,7 +140,6 @@ public class InternalTableImpl implements InternalTable {
UUID tableId,
Int2ObjectMap<RaftGroupService> partMap,
int partitions,
- Function<String, String> nodeIdResolver,
Function<String, ClusterNode> clusterNodeResolver,
TxManager txManager,
MvTableStorage tableStorage,
@@ -155,7 +151,6 @@ public class InternalTableImpl implements InternalTable {
this.tableId = tableId;
this.partitionMap = partMap;
this.partitions = partitions;
- this.nodeIdResolver = nodeIdResolver;
this.clusterNodeResolver = clusterNodeResolver;
this.txManager = txManager;
this.tableStorage = tableStorage;
@@ -996,9 +991,7 @@ public class InternalTableImpl implements InternalTable {
return partitionMap.int2ObjectEntrySet().stream()
.sorted(Comparator.comparingInt(Int2ObjectOpenHashMap.Entry::getIntKey))
.map(Map.Entry::getValue)
- .map(RaftGroupService::leader)
- .map(Peer::consistentId)
- .map(nodeIdResolver)
+ .map(service -> service.leader().consistentId())
.collect(Collectors.toList());
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 1e857f0dff..f25de52421 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -147,8 +147,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
UUID.randomUUID(),
Int2ObjectMaps.singleton(0, mock(RaftGroupService.class)),
1,
- Function.identity(),
- addr -> mock(ClusterNode.class),
+ name -> mock(ClusterNode.class),
txManager == null ? new TxManagerImpl(replicaSvc, new
HeapLockManager(), new HybridClockImpl()) : txManager,
mock(MvTableStorage.class),
new TestTxStateTableStorage(),