This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a51248c03 IGNITE-16669 Implement executeColocated method of
IgniteCompute interface
a51248c03 is described below
commit a51248c039b2e81aba1343abd8e0bb20110d180d
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Mar 25 16:11:21 2022 +0400
IGNITE-16669 Implement executeColocated method of IgniteCompute interface
---
.../org/apache/ignite/compute/IgniteCompute.java | 57 +++++++++++++
.../internal/client/compute/ClientCompute.java | 36 ++++++++
.../ignite/client/fakes/FakeIgniteTables.java | 14 +++-
.../ignite/client/fakes/FakeInternalTable.java | 7 ++
modules/compute/pom.xml | 5 ++
.../ignite/internal/compute/IgniteComputeImpl.java | 95 +++++++++++++++++++++-
.../internal/compute/IgniteComputeImplTest.java | 47 ++++++++++-
.../ignite/internal/compute/ItComputeTest.java | 75 +++++++++++++++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 2 +-
.../distributed/ItInternalTableScanTest.java | 7 ++
.../ignite/distributed/ItTablePersistenceTest.java | 9 ++
.../distributed/ItTxDistributedTestSingleNode.java | 7 ++
.../internal/table/IgniteTablesInternal.java | 31 +++++++
.../ignite/internal/table/InternalTable.java | 10 +++
.../apache/ignite/internal/table/TableImpl.java | 49 +++++++++--
.../internal/table/distributed/TableManager.java | 27 ++++--
.../distributed/storage/InternalTableImpl.java | 19 +++++
.../table/impl/DummyInternalTableImpl.java | 2 +-
18 files changed, 480 insertions(+), 19 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index ca856dd63..8aeec87b3 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -21,6 +21,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
/**
* Provides access to the Compute functionality: the ability to execute
compute jobs.
@@ -51,6 +53,61 @@ public interface IgniteCompute {
*/
<R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String
jobClassName, Object... args);
+ /**
+ * Executes a job represented by the given class on one node where the
given key is located. The node is the leader
+ * of the corresponding Raft group.
+ *
+ * @param table name of the table which key is used to determine the
node on which the job will be executed
+ * @param key key which will be used to determine the node on which
the job will be executed
+ * @param jobClass class of the job to execute
+ * @param args arguments of the job
+ * @param <R> job result type
+ * @return future job result
+ */
+ <R> CompletableFuture<R> executeColocated(String table, Tuple key, Class<?
extends ComputeJob<R>> jobClass, Object... args);
+
+ /**
+ * Executes a job represented by the given class on one node where the
given key is located. The node is the leader
+ * of the corresponding Raft group.
+ *
+ * @param table name of the table which key is used to determine the
node on which the job will be executed
+ * @param key key which will be used to determine the node on which
the job will be executed
+ * @param keyMapper mapper that will be used to map the key to a binary
representation
+ * @param jobClass class of the job to execute
+ * @param args arguments of the job
+ * @param <R> job result type
+ * @return future job result
+ */
+ <K, R> CompletableFuture<R> executeColocated(String table, K key,
Mapper<K> keyMapper,
+ Class<? extends
ComputeJob<R>> jobClass, Object... args);
+
+ /**
+ * Executes a job represented by the given class on one node where the
given key is located. The node is the leader
+ * of the corresponding Raft group.
+ *
+ * @param table name of the table which key is used to determine the
node on which the job will be executed
+ * @param key key which will be used to determine the node on which
the job will be executed
+ * @param jobClassName name of the job class to execute
+ * @param args arguments of the job
+ * @param <R> job result type
+ * @return future job result
+ */
+ <R> CompletableFuture<R> executeColocated(String table, Tuple key, String
jobClassName, Object... args);
+
+ /**
+ * Executes a job represented by the given class on one node where the
given key is located. The node is the leader
+ * of the corresponding Raft group.
+ *
+ * @param table name of the table which key is used to determine the
node on which the job will be executed
+ * @param key key which will be used to determine the node on which
the job will be executed
+ * @param keyMapper mapper that will be used to map the key to a binary
representation
+ * @param jobClassName name of the job class to execute
+ * @param args arguments of the job
+ * @param <R> job result type
+ * @return future job result
+ */
+ <K, R> CompletableFuture<R> executeColocated(String table, K key,
Mapper<K> keyMapper, String jobClassName, Object... args);
+
/**
* Executes a {@link ComputeJob} represented by the given class on all
nodes from the given nodes set.
*
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index aa17dc17f..5aaa95d07 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -29,6 +29,8 @@ import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.client.ReliableChannel;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
/**
* Client compute implementation.
@@ -68,6 +70,40 @@ public class ClientCompute implements IgniteCompute {
return executeOnOneNode(node, jobClassName, args);
}
+ /** {@inheritDoc} */
+ @Override
+ public <R> CompletableFuture<R> executeColocated(String table, Tuple key,
Class<? extends ComputeJob<R>> jobClass, Object... args) {
+ // TODO: IGNITE-16786 - implement this
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <K, R> CompletableFuture<R> executeColocated(
+ String table,
+ K key,
+ Mapper<K> keyMapper,
+ Class<? extends ComputeJob<R>> jobClass,
+ Object... args
+ ) {
+ // TODO: IGNITE-16786 - implement this
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <R> CompletableFuture<R> executeColocated(String table, Tuple key,
String jobClassName, Object... args) {
+ // TODO: IGNITE-16786 - implement this
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <K, R> CompletableFuture<R> executeColocated(String table, K key,
Mapper<K> keyMapper, String jobClassName, Object... args) {
+ // TODO: IGNITE-16786 - implement this
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
+
/** {@inheritDoc} */
@Override
public <R> Map<ClusterNode, CompletableFuture<R>>
broadcast(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass,
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
index a8c429db4..c2a7be4ce 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
@@ -117,7 +117,7 @@ public class FakeIgniteTables implements IgniteTables,
IgniteTablesInternal {
/** {@inheritDoc} */
@Override
public Table table(String name) {
- return tables.get(name);
+ return tableImpl(name);
}
/** {@inheritDoc} */
@@ -138,6 +138,18 @@ public class FakeIgniteTables implements IgniteTables,
IgniteTablesInternal {
return CompletableFuture.completedFuture(tablesById.get(id));
}
+ /** {@inheritDoc} */
+ @Override
+ public TableImpl tableImpl(String name) {
+ return tables.get(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<TableImpl> tableImplAsync(String name) {
+ return CompletableFuture.completedFuture(tableImpl(name));
+ }
+
@NotNull
private TableImpl getNewTable(String name) {
Function<Integer, SchemaDescriptor> history;
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index b41962cf9..64d098234 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.storage.engine.TableStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -273,6 +274,12 @@ public class FakeInternalTable implements InternalTable {
throw new IgniteInternalException(new
OperationNotSupportedException());
}
+ /** {@inheritDoc} */
+ @Override
+ public ClusterNode leaderAssignment(int partition) {
+ throw new IgniteInternalException(new
OperationNotSupportedException());
+ }
+
/** {@inheritDoc} */
@Override
public int partition(BinaryRow keyRow) {
diff --git a/modules/compute/pom.xml b/modules/compute/pom.xml
index ffe5b6d83..4071c74ca 100644
--- a/modules/compute/pom.xml
+++ b/modules/compute/pom.xml
@@ -43,6 +43,11 @@
<artifactId>ignite-network</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-table</artifactId>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
<groupId>org.hamcrest</groupId>
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index feddc7821..dc9037ba1 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -27,20 +27,30 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
/**
* Implementation of {@link IgniteCompute}.
*/
public class IgniteComputeImpl implements IgniteCompute {
private final TopologyService topologyService;
+ private final IgniteTablesInternal tables;
private final ComputeComponent computeComponent;
private final ThreadLocalRandom random = ThreadLocalRandom.current();
- public IgniteComputeImpl(TopologyService topologyService, ComputeComponent
computeComponent) {
+ /**
+ * Create new instance.
+ */
+ public IgniteComputeImpl(TopologyService topologyService,
IgniteTablesInternal tables, ComputeComponent computeComponent) {
this.topologyService = topologyService;
+ this.tables = tables;
this.computeComponent = computeComponent;
}
@@ -101,6 +111,89 @@ public class IgniteComputeImpl implements IgniteCompute {
return targetNode.equals(topologyService.localMember());
}
+ /** {@inheritDoc} */
+ @Override
+ public <R> CompletableFuture<R> executeColocated(String tableName, Tuple
key, Class<? extends ComputeJob<R>> jobClass, Object... args) {
+ Objects.requireNonNull(tableName);
+ Objects.requireNonNull(key);
+ Objects.requireNonNull(jobClass);
+
+ return requiredTable(tableName)
+ .thenApply(table -> leaderOfTablePartitionByTupleKey(table,
key))
+ .thenCompose(primaryNode -> executeOnOneNode(primaryNode,
jobClass, args));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <K, R> CompletableFuture<R> executeColocated(
+ String tableName,
+ K key,
+ Mapper<K> keyMapper,
+ Class<? extends ComputeJob<R>> jobClass,
+ Object... args
+ ) {
+ Objects.requireNonNull(tableName);
+ Objects.requireNonNull(key);
+ Objects.requireNonNull(keyMapper);
+ Objects.requireNonNull(jobClass);
+
+ return requiredTable(tableName)
+ .thenApply(table -> leaderOfTablePartitionByMappedKey(table,
key, keyMapper))
+ .thenCompose(primaryNode -> executeOnOneNode(primaryNode,
jobClass, args));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <R> CompletableFuture<R> executeColocated(String tableName, Tuple
key, String jobClassName, Object... args) {
+ Objects.requireNonNull(tableName);
+ Objects.requireNonNull(key);
+ Objects.requireNonNull(jobClassName);
+
+ return requiredTable(tableName)
+ .thenApply(table -> leaderOfTablePartitionByTupleKey(table,
key))
+ .thenCompose(primaryNode -> executeOnOneNode(primaryNode,
jobClassName, args));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <K, R> CompletableFuture<R> executeColocated(String tableName, K
key, Mapper<K> keyMapper, String jobClassName, Object... args) {
+ Objects.requireNonNull(tableName);
+ Objects.requireNonNull(key);
+ Objects.requireNonNull(keyMapper);
+ Objects.requireNonNull(jobClassName);
+
+ return requiredTable(tableName)
+ .thenApply(table -> leaderOfTablePartitionByMappedKey(table,
key, keyMapper))
+ .thenCompose(primaryNode -> executeOnOneNode(primaryNode,
jobClassName, args));
+ }
+
+ private CompletableFuture<TableImpl> requiredTable(String tableName) {
+ return tables.tableImplAsync(tableName)
+ .thenApply(table -> {
+ if (table == null) {
+ throw new IgniteInternalException(String.format("Did
not find a table by name '%s'", tableName));
+ }
+ return table;
+ });
+ }
+
+ private ClusterNode leaderOfTablePartitionByTupleKey(TableImpl table,
Tuple key) {
+ return requiredLeaderByPartition(table, table.partition(key));
+ }
+
+ private <K> ClusterNode leaderOfTablePartitionByMappedKey(TableImpl table,
K key, Mapper<K> keyMapper) {
+ return requiredLeaderByPartition(table, table.partition(key,
keyMapper));
+ }
+
+ private ClusterNode requiredLeaderByPartition(TableImpl table, int
partitionIndex) {
+ ClusterNode leaderNode = table.leaderAssignment(partitionIndex);
+ if (leaderNode == null) {
+ throw new IgniteInternalException("Leader not found for partition
" + partitionIndex);
+ }
+
+ return leaderNode;
+ }
+
/** {@inheritDoc} */
@Override
public <R> Map<ClusterNode, CompletableFuture<R>> broadcast(
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
index 42ceb2bd6..01cb42493 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
@@ -20,16 +20,23 @@ package org.apache.ignite.internal.compute;
import static java.util.Collections.singleton;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -42,12 +49,18 @@ class IgniteComputeImplTest {
@Mock
private TopologyService topologyService;
+ @Mock
+ private IgniteTablesInternal igniteTables;
+
@Mock
private ComputeComponent computeComponent;
@InjectMocks
private IgniteComputeImpl compute;
+ @Mock
+ private TableImpl table;
+
private final ClusterNode localNode = new ClusterNode("local", "local",
new NetworkAddress("local-host", 1, "local"));
private final ClusterNode remoteNode = new ClusterNode("remote", "remote",
new NetworkAddress("remote-host", 1, "remote"));
@@ -70,8 +83,7 @@ class IgniteComputeImplTest {
@Test
void whenNodeIsRemoteThenExecutesRemotely() throws Exception {
- when(computeComponent.executeRemotely(remoteNode, SimpleJob.class,
"a", 42))
-
.thenReturn(CompletableFuture.completedFuture("remoteResponse"));
+ respondWhenExecutingSimpleJobRemotely();
String result = compute.execute(singleton(remoteNode),
SimpleJob.class, "a", 42).get();
@@ -80,6 +92,37 @@ class IgniteComputeImplTest {
verify(computeComponent).executeRemotely(remoteNode, SimpleJob.class,
"a", 42);
}
+ private void respondWhenExecutingSimpleJobRemotely() {
+ when(computeComponent.executeRemotely(remoteNode, SimpleJob.class,
"a", 42))
+
.thenReturn(CompletableFuture.completedFuture("remoteResponse"));
+ }
+
+ @Test
+ void executesColocatedOnLeaderNodeOfPartitionCorrespondingToTupleKey()
throws Exception {
+ respondWhenExecutingSimpleJobRemotely();
+
+
when(igniteTables.tableImplAsync("PUBLIC.test")).thenReturn(CompletableFuture.completedFuture(table));
+ doReturn(42).when(table).partition(any());
+ doReturn(remoteNode).when(table).leaderAssignment(42);
+
+ String result = compute.executeColocated("PUBLIC.test",
Tuple.create(Map.of("k", 1)), SimpleJob.class, "a", 42).get();
+
+ assertThat(result, is("remoteResponse"));
+ }
+
+ @Test
+ void executesColocatedOnLeaderNodeOfPartitionCorrespondingToMappedKey()
throws Exception {
+ respondWhenExecutingSimpleJobRemotely();
+
+
when(igniteTables.tableImplAsync("PUBLIC.test")).thenReturn(CompletableFuture.completedFuture(table));
+ doReturn(42).when(table).partition(any(), any());
+ doReturn(remoteNode).when(table).leaderAssignment(42);
+
+ String result = compute.executeColocated("PUBLIC.test", 1,
Mapper.of(Integer.class), SimpleJob.class, "a", 42).get();
+
+ assertThat(result, is("remoteResponse"));
+ }
+
private static class SimpleJob implements ComputeJob<String> {
/** {@inheritDoc} */
@Override
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
index 03b54f353..eddecb5de 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
@@ -18,16 +18,20 @@
package org.apache.ignite.internal.compute;
import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -40,6 +44,8 @@ import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.internal.AbstractClusterIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
import org.junit.jupiter.api.Test;
/**
@@ -207,6 +213,75 @@ class ItComputeTest extends AbstractClusterIntegrationTest
{
}
}
+ @Test
+ void executesColocatedWithTupleKey() throws Exception {
+ createTestTableWithOneRow();
+
+ IgniteImpl entryNode = node(0);
+
+ String actualNodeName = entryNode.compute()
+ .executeColocated("PUBLIC.test", Tuple.create(Map.of("k", 1)),
GetNodeNameJob.class)
+ .get(1, TimeUnit.SECONDS);
+
+ assertThat(actualNodeName, in(allNodeNames()));
+ }
+
+ private void createTestTableWithOneRow() {
+ executeSql("CREATE TABLE test (k int, v int, CONSTRAINT PK PRIMARY KEY
(k))");
+ executeSql("INSERT INTO test(k, v) VALUES (1, 101)");
+ }
+
+ private List<String> allNodeNames() {
+ return clusterNodes.stream()
+ .map(Ignite::name)
+ .collect(toList());
+ }
+
+ @Test
+ void executesColocatedByClassNameWithTupleKey() throws Exception {
+ createTestTableWithOneRow();
+
+ IgniteImpl entryNode = node(0);
+
+ String actualNodeName = entryNode.compute()
+ .<String>executeColocated("PUBLIC.test",
Tuple.create(Map.of("k", 1)), GetNodeNameJob.class.getName())
+ .get(1, TimeUnit.SECONDS);
+
+ assertThat(actualNodeName, in(allNodeNames()));
+ }
+
+ @Test
+ void executesColocatedWithMappedKey() throws Exception {
+ createTestTableWithOneRow();
+
+ IgniteImpl entryNode = node(0);
+
+ String actualNodeName = entryNode.compute()
+ .executeColocated("PUBLIC.test", 1, Mapper.of(Integer.class),
GetNodeNameJob.class)
+ .get(1, TimeUnit.SECONDS);
+
+ assertThat(actualNodeName, in(allNodeNames()));
+ }
+
+ @Test
+ void executesColocatedByClassNameWithMappedKey() throws Exception {
+ createTestTableWithOneRow();
+
+ IgniteImpl entryNode = node(0);
+
+ String actualNodeName = entryNode.compute()
+ .<Integer, String>executeColocated("PUBLIC.test", 1,
Mapper.of(Integer.class), GetNodeNameJob.class.getName())
+ .get(1, TimeUnit.SECONDS);
+
+ assertThat(actualNodeName, in(allNodeNames()));
+ }
+
+ private List<List<?>> executeSql(String sql, Object... args) {
+ return getAllFromCursor(
+ node(0).queryEngine().query("PUBLIC", sql, args).get(0)
+ );
+ }
+
private static class ConcatJob implements ComputeJob<String> {
/** {@inheritDoc} */
@Override
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index efe723cbb..ef396dd76 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -272,7 +272,7 @@ public class IgniteImpl implements Ignite {
distributedTblMgr
);
- compute = new IgniteComputeImpl(clusterSvc.topologyService(),
computeComponent);
+ compute = new IgniteComputeImpl(clusterSvc.topologyService(),
distributedTblMgr, computeComponent);
clientHandlerModule = new ClientHandlerModule(
qryEngine,
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
index c20c5ca9a..5c4978aaf 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.server.RaftServer;
@@ -68,6 +69,7 @@ import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Pair;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.StaticNodeFinder;
@@ -117,6 +119,10 @@ public class ItInternalTableScanTest {
/** Executor for raft group services. */
ScheduledExecutorService executor;
+ private final Function<NetworkAddress, ClusterNode> addressToNode = addr
-> {
+ throw new UnsupportedOperationException();
+ };
+
/**
* Prepare test environment.
* <ol>
@@ -188,6 +194,7 @@ public class ItInternalTableScanTest {
Int2ObjectMaps.singleton(0, raftGrpSvc),
1,
NetworkAddress::toString,
+ addressToNode,
txManager,
mock(TableStorage.class)
);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 50b41c31a..3a124b1ce 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BooleanSupplier;
+import java.util.function.Function;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
@@ -46,6 +47,7 @@ import
org.apache.ignite.internal.table.distributed.storage.VersionedRowStore;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.service.ItAbstractListenerSnapshotTest;
@@ -78,6 +80,10 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
private final List<TxManager> managers = new ArrayList<>();
+ private final Function<NetworkAddress, ClusterNode> addressToNode = addr
-> {
+ throw new UnsupportedOperationException();
+ };
+
@AfterEach
@Override
public void afterTest() throws Exception {
@@ -103,6 +109,7 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
Int2ObjectMaps.singleton(0, service),
1,
NetworkAddress::toString,
+ addressToNode,
txManager,
mock(TableStorage.class)
);
@@ -125,6 +132,7 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
Int2ObjectMaps.singleton(0, service),
1,
NetworkAddress::toString,
+ addressToNode,
txManager,
mock(TableStorage.class)
);
@@ -153,6 +161,7 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
Int2ObjectMaps.singleton(0, service),
1,
NetworkAddress::toString,
+ addressToNode,
txManager,
mock(TableStorage.class)
);
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 5da992cbc..5d5c27094 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -32,6 +32,7 @@ import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.affinity.RendezvousAffinityFunction;
import org.apache.ignite.internal.raft.Loza;
@@ -92,6 +93,10 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
private ScheduledThreadPoolExecutor executor;
+ private final Function<NetworkAddress, ClusterNode> addressToNode = addr
-> {
+ throw new UnsupportedOperationException();
+ };
+
/**
* Returns a count of nodes.
*
@@ -220,6 +225,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
accRaftClients,
1,
NetworkAddress::toString,
+ addressToNode,
txMgr,
Mockito.mock(TableStorage.class)
), new DummySchemaManagerImpl(ACCOUNTS_SCHEMA));
@@ -230,6 +236,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
custRaftClients,
1,
NetworkAddress::toString,
+ addressToNode,
txMgr,
Mockito.mock(TableStorage.class)
), new DummySchemaManagerImpl(CUSTOMERS_SCHEMA));
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java
index 4369c37c3..9abd78d0f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.NodeStoppingException;
/**
@@ -42,4 +43,34 @@ public interface IgniteTablesInternal {
* @throws NodeStoppingException If an implementation stopped before the
method was invoked.
*/
CompletableFuture<TableImpl> tableAsync(UUID id) throws
NodeStoppingException;
+
+ // TODO: IGNITE-16750 - the following two methods look a bit ugly,
separation of public/internal Table aspects should help
+
+ /**
+ * Gets a table by name, if it was created before.
+ *
+ * @param name Canonical name of the table ([schemaName].[tableName]) with
SQL-parser style quotation, e.g.
+ * "public.tbl0" - the table "PUBLIC.TBL0" will be looked up,
+ * "PUBLIC.\"Tbl0\"" - "PUBLIC.Tbl0", "\"MySchema\".\"Tbl0\""
- "MySchema.Tbl0", etc.
+ * @return Tables with corresponding name or {@code null} if table isn't
created.
+ * @throws IgniteException If an unspecified platform exception has
happened internally. Is thrown when:
+ * <ul>
+ * <li>the node is stopping.</li>
+ * </ul>
+ */
+ TableImpl tableImpl(String name);
+
+ /**
+ * Gets a table by name, if it was created before.
+ *
+ * @param name Canonical name of the table ([schemaName].[tableName]) with
SQL-parser style quotation, e.g.
+ * "public.tbl0" - the table "PUBLIC.TBL0" will be looked up,
+ * "PUBLIC.\"Tbl0\"" - "PUBLIC.Tbl0", "\"MySchema\".\"Tbl0\""
- "MySchema.Tbl0", etc.
+ * @return Future representing pending completion of the operation.
+ * @throws IgniteException If an unspecified platform exception has
happened internally. Is thrown when:
+ * <ul>
+ * <li>the node is stopping.</li>
+ * </ul>
+ */
+ CompletableFuture<TableImpl> tableImplAsync(String name);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 1205b33d3..7ecc1bc24 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.engine.TableStorage;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.LockException;
+import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -227,5 +228,14 @@ public interface InternalTable extends AutoCloseable {
*/
@NotNull List<String> assignments();
+ /**
+ * Returns cluster node that is the leader of the corresponding partition
group or throws an exception if
+ * it cannot be found.
+ *
+ * @param partition partition number
+ * @return leader node of the partition group corresponding to the
partition
+ */
+ ClusterNode leaderAssignment(int partition);
+
//TODO: IGNITE-14488. Add invoke() methods.
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 3687f7815..3cbdcaed3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -19,18 +19,21 @@ package org.apache.ignite.internal.table;
import java.util.Objects;
import java.util.UUID;
+import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.marshaller.MarshallerException;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import
org.apache.ignite.internal.schema.marshaller.reflection.KvMarshallerImpl;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.TestOnly;
/**
* Table view implementation for binary objects.
@@ -106,21 +109,53 @@ public class TableImpl implements Table {
}
/**
- * Returns a partition for a tuple.
+ * Returns a partition for a key tuple.
*
- * @param t The tuple.
+ * @param key The tuple.
* @return The partition.
*/
- @TestOnly
- public int partition(Tuple t) {
- Objects.requireNonNull(t);
+ public int partition(Tuple key) {
+ Objects.requireNonNull(key);
try {
- final Row keyRow = new
TupleMarshallerImpl(schemaReg).marshalKey(t);
+ final Row keyRow = new
TupleMarshallerImpl(schemaReg).marshalKey(key);
return tbl.partition(keyRow);
} catch (TupleMarshallerException e) {
throw new IgniteInternalException(e);
}
}
+
+ /**
+ * Returns a partition for a key.
+ *
+ * @param key The key.
+ * @param keyMapper Key mapper
+ * @return The partition.
+ */
+ public <K> int partition(K key, Mapper<K> keyMapper) {
+ Objects.requireNonNull(key);
+ Objects.requireNonNull(keyMapper);
+
+ BinaryRow keyRow;
+ var marshaller = new KvMarshallerImpl<>(schemaReg.schema(), keyMapper,
keyMapper);
+ try {
+ keyRow = marshaller.marshal(key);
+ } catch (MarshallerException e) {
+ throw new IgniteInternalException("Cannot marshal key", e);
+ }
+
+ return tbl.partition(keyRow);
+ }
+
+ /**
+ * Returns cluster node that is the leader of the corresponding partition
group or throws an exception if
+ * it cannot be found.
+ *
+ * @param partition partition number
+ * @return leader node of the partition group corresponding to the
partition
+ */
+ public ClusterNode leaderAssignment(int partition) {
+ return tbl.leaderAssignment(partition);
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 1a0c31958..741e0848d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -157,6 +157,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
/** Resolver that resolves a network address to node id. */
private final Function<NetworkAddress, String> netAddrResolver;
+ /** Resolver that resolves a network address to cluster node. */
+ private final Function<NetworkAddress, ClusterNode> clusterNodeResolver;
+
/** Data region instances. */
private final Map<String, DataRegion> dataRegions = new
ConcurrentHashMap<>();
@@ -203,6 +206,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
return node.id();
};
+ clusterNodeResolver = topologyService::getByAddress;
engine = new RocksDbStorageEngine();
@@ -562,7 +566,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
tableStorage.start();
InternalTableImpl internalTable = new InternalTableImpl(name, tblId,
new Int2ObjectOpenHashMap<>(partitions),
- partitions, netAddrResolver, txManager, tableStorage);
+ partitions, netAddrResolver, clusterNodeResolver, txManager,
tableStorage);
var schemaRegistry = new SchemaRegistryImpl(v -> {
if (!busyLock.enterBusy()) {
@@ -1225,7 +1229,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
/** {@inheritDoc} */
@Override
public CompletableFuture<Table> tableAsync(String name) {
- return tableAsyncInternal(IgniteObjectName.parseCanonicalName(name));
+ return tableAsyncInternal(IgniteObjectName.parseCanonicalName(name))
+ .thenApply(Function.identity());
}
/** {@inheritDoc} */
@@ -1241,14 +1246,25 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
}
+ /** {@inheritDoc} */
+ @Override
+ public TableImpl tableImpl(String name) {
+ return join(tableImplAsync(name));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<TableImpl> tableImplAsync(String name) {
+ return tableAsyncInternal(IgniteObjectName.parseCanonicalName(name));
+ }
+
/**
* Gets a table by name, if it was created before. Doesn't parse canonical
name.
*
* @param name Table name.
* @return Future representing pending completion of the {@code
TableManager#tableAsyncInternal} operation.
* */
- @NotNull
- private CompletableFuture<Table> tableAsyncInternal(String name) {
+ private CompletableFuture<TableImpl> tableAsyncInternal(String name) {
if (!busyLock.enterBusy()) {
throw new IgniteException(new NodeStoppingException());
}
@@ -1259,7 +1275,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
return CompletableFuture.completedFuture(null);
}
- return (CompletableFuture) tableAsyncInternal(tableId, false);
+ return tableAsyncInternal(tableId, false);
} finally {
busyLock.leaveBusy();
}
@@ -1272,7 +1288,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
* @param checkConfiguration {@code True} when the method checks a
configuration before trying to get a table, {@code false} otherwise.
* @return Future representing pending completion of the operation.
*/
- @NotNull
private CompletableFuture<TableImpl> tableAsyncInternal(UUID id, boolean
checkConfiguration) {
if (checkConfiguration && !isTableConfigured(id)) {
return CompletableFuture.completedFuture(null);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 55ed72f2a..5e4bde4e9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -62,10 +62,12 @@ import
org.apache.ignite.internal.table.distributed.command.scan.ScanInitCommand
import
org.apache.ignite.internal.table.distributed.command.scan.ScanRetrieveBatchCommand;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.lang.IgniteUuidGenerator;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.Peer;
@@ -99,6 +101,9 @@ public class InternalTableImpl implements InternalTable {
/** Resolver that resolves a network address to node id. */
private final Function<NetworkAddress, String> netAddrResolver;
+ /** Resolver that resolves a network address to cluster node. */
+ private final Function<NetworkAddress, ClusterNode> clusterNodeResolver;
+
/** Transactional manager. */
private final TxManager txManager;
@@ -124,6 +129,7 @@ public class InternalTableImpl implements InternalTable {
Int2ObjectMap<RaftGroupService> partMap,
int partitions,
Function<NetworkAddress, String> netAddrResolver,
+ Function<NetworkAddress, ClusterNode> clusterNodeResolver,
TxManager txManager,
TableStorage tableStorage
) {
@@ -132,6 +138,7 @@ public class InternalTableImpl implements InternalTable {
this.partitionMap = partMap;
this.partitions = partitions;
this.netAddrResolver = netAddrResolver;
+ this.clusterNodeResolver = clusterNodeResolver;
this.txManager = txManager;
this.tableStorage = tableStorage;
}
@@ -394,6 +401,18 @@ public class InternalTableImpl implements InternalTable {
.collect(Collectors.toList());
}
+ @Override
+ public ClusterNode leaderAssignment(int partition) {
+ awaitLeaderInitialization();
+
+ RaftGroupService raftGroupService = partitionMap.get(partition);
+ if (raftGroupService == null) {
+ throw new IgniteInternalException("No such partition " + partition
+ " in table " + tableName);
+ }
+
+ return clusterNodeResolver.apply(raftGroupService.leader().address());
+ }
+
private void awaitLeaderInitialization() {
List<CompletableFuture<Void>> futs = new ArrayList<>();
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index d168179ee..3d6a0f14b 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -68,7 +68,7 @@ public class DummyInternalTableImpl extends InternalTableImpl
{
public DummyInternalTableImpl(VersionedRowStore store, TxManager
txManager) {
super("test", UUID.randomUUID(),
Int2ObjectMaps.singleton(0, mock(RaftGroupService.class)),
- 1, null, txManager, mock(TableStorage.class));
+ 1, null, null, txManager, mock(TableStorage.class));
RaftGroupService svc = partitionMap.get(0);