This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a51248c03 IGNITE-16669 Implement executeColocated method of 
IgniteCompute interface
a51248c03 is described below

commit a51248c039b2e81aba1343abd8e0bb20110d180d
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Mar 25 16:11:21 2022 +0400

    IGNITE-16669 Implement executeColocated method of IgniteCompute interface
---
 .../org/apache/ignite/compute/IgniteCompute.java   | 57 +++++++++++++
 .../internal/client/compute/ClientCompute.java     | 36 ++++++++
 .../ignite/client/fakes/FakeIgniteTables.java      | 14 +++-
 .../ignite/client/fakes/FakeInternalTable.java     |  7 ++
 modules/compute/pom.xml                            |  5 ++
 .../ignite/internal/compute/IgniteComputeImpl.java | 95 +++++++++++++++++++++-
 .../internal/compute/IgniteComputeImplTest.java    | 47 ++++++++++-
 .../ignite/internal/compute/ItComputeTest.java     | 75 +++++++++++++++++
 .../org/apache/ignite/internal/app/IgniteImpl.java |  2 +-
 .../distributed/ItInternalTableScanTest.java       |  7 ++
 .../ignite/distributed/ItTablePersistenceTest.java |  9 ++
 .../distributed/ItTxDistributedTestSingleNode.java |  7 ++
 .../internal/table/IgniteTablesInternal.java       | 31 +++++++
 .../ignite/internal/table/InternalTable.java       | 10 +++
 .../apache/ignite/internal/table/TableImpl.java    | 49 +++++++++--
 .../internal/table/distributed/TableManager.java   | 27 ++++--
 .../distributed/storage/InternalTableImpl.java     | 19 +++++
 .../table/impl/DummyInternalTableImpl.java         |  2 +-
 18 files changed, 480 insertions(+), 19 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java 
b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index ca856dd63..8aeec87b3 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -21,6 +21,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
 
 /**
  * Provides access to the Compute functionality: the ability to execute 
compute jobs.
@@ -51,6 +53,61 @@ public interface IgniteCompute {
      */
     <R> CompletableFuture<R> execute(Set<ClusterNode> nodes, String 
jobClassName, Object... args);
 
+    /**
+     * Executes a job represented by the given class on one node where the 
given key is located. The node is the leader
+     * of the corresponding Raft group.
+     *
+     * @param table    name of the table which key is used to determine the 
node on which the job will be executed
+     * @param key      key which will be used to determine the node on which 
the job will be executed
+     * @param jobClass class of the job to execute
+     * @param args     arguments of the job
+     * @param <R>      job result type
+     * @return future job result
+     */
+    <R> CompletableFuture<R> executeColocated(String table, Tuple key, Class<? 
extends ComputeJob<R>> jobClass, Object... args);
+
+    /**
+     * Executes a job represented by the given class on one node where the 
given key is located. The node is the leader
+     * of the corresponding Raft group.
+     *
+     * @param table    name of the table which key is used to determine the 
node on which the job will be executed
+     * @param key      key which will be used to determine the node on which 
the job will be executed
+     * @param keyMapper mapper that will be used to map the key to a binary 
representation
+     * @param jobClass class of the job to execute
+     * @param args     arguments of the job
+     * @param <R>      job result type
+     * @return future job result
+     */
+    <K, R> CompletableFuture<R> executeColocated(String table, K key, 
Mapper<K> keyMapper,
+                                                 Class<? extends 
ComputeJob<R>> jobClass, Object... args);
+
+    /**
+     * Executes a job represented by the given class on one node where the 
given key is located. The node is the leader
+     * of the corresponding Raft group.
+     *
+     * @param table    name of the table which key is used to determine the 
node on which the job will be executed
+     * @param key      key which will be used to determine the node on which 
the job will be executed
+     * @param jobClassName name of the job class to execute
+     * @param args     arguments of the job
+     * @param <R>      job result type
+     * @return future job result
+     */
+    <R> CompletableFuture<R> executeColocated(String table, Tuple key, String 
jobClassName, Object... args);
+
+    /**
+     * Executes a job represented by the given class on one node where the 
given key is located. The node is the leader
+     * of the corresponding Raft group.
+     *
+     * @param table    name of the table which key is used to determine the 
node on which the job will be executed
+     * @param key      key which will be used to determine the node on which 
the job will be executed
+     * @param keyMapper mapper that will be used to map the key to a binary 
representation
+     * @param jobClassName name of the job class to execute
+     * @param args     arguments of the job
+     * @param <R>      job result type
+     * @return future job result
+     */
+    <K, R> CompletableFuture<R> executeColocated(String table, K key, 
Mapper<K> keyMapper, String jobClassName, Object... args);
+
     /**
      * Executes a {@link ComputeJob} represented by the given class on all 
nodes from the given nodes set.
      *
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index aa17dc17f..5aaa95d07 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -29,6 +29,8 @@ import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.internal.client.ReliableChannel;
 import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
 
 /**
  * Client compute implementation.
@@ -68,6 +70,40 @@ public class ClientCompute implements IgniteCompute {
         return executeOnOneNode(node, jobClassName, args);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeColocated(String table, Tuple key, 
Class<? extends ComputeJob<R>> jobClass, Object... args) {
+        // TODO: IGNITE-16786 - implement this
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <K, R> CompletableFuture<R> executeColocated(
+            String table,
+            K key,
+            Mapper<K> keyMapper,
+            Class<? extends ComputeJob<R>> jobClass,
+            Object... args
+    ) {
+        // TODO: IGNITE-16786 - implement this
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeColocated(String table, Tuple key, 
String jobClassName, Object... args) {
+        // TODO: IGNITE-16786 - implement this
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <K, R> CompletableFuture<R> executeColocated(String table, K key, 
Mapper<K> keyMapper, String jobClassName, Object... args) {
+        // TODO: IGNITE-16786 - implement this
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
     /** {@inheritDoc} */
     @Override
     public <R> Map<ClusterNode, CompletableFuture<R>> 
broadcast(Set<ClusterNode> nodes, Class<? extends ComputeJob<R>> jobClass,
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
index a8c429db4..c2a7be4ce 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
@@ -117,7 +117,7 @@ public class FakeIgniteTables implements IgniteTables, 
IgniteTablesInternal {
     /** {@inheritDoc} */
     @Override
     public Table table(String name) {
-        return tables.get(name);
+        return tableImpl(name);
     }
 
     /** {@inheritDoc} */
@@ -138,6 +138,18 @@ public class FakeIgniteTables implements IgniteTables, 
IgniteTablesInternal {
         return CompletableFuture.completedFuture(tablesById.get(id));
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public TableImpl tableImpl(String name) {
+        return tables.get(name);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<TableImpl> tableImplAsync(String name) {
+        return CompletableFuture.completedFuture(tableImpl(name));
+    }
+
     @NotNull
     private TableImpl getNewTable(String name) {
         Function<Integer, SchemaDescriptor> history;
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index b41962cf9..64d098234 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.storage.engine.TableStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -273,6 +274,12 @@ public class FakeInternalTable implements InternalTable {
         throw new IgniteInternalException(new 
OperationNotSupportedException());
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public ClusterNode leaderAssignment(int partition) {
+        throw new IgniteInternalException(new 
OperationNotSupportedException());
+    }
+
     /** {@inheritDoc} */
     @Override
     public int partition(BinaryRow keyRow) {
diff --git a/modules/compute/pom.xml b/modules/compute/pom.xml
index ffe5b6d83..4071c74ca 100644
--- a/modules/compute/pom.xml
+++ b/modules/compute/pom.xml
@@ -43,6 +43,11 @@
             <artifactId>ignite-network</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-table</artifactId>
+        </dependency>
+
         <!-- Test dependencies -->
         <dependency>
             <groupId>org.hamcrest</groupId>
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index feddc7821..dc9037ba1 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -27,20 +27,30 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
 
 /**
  * Implementation of {@link IgniteCompute}.
  */
 public class IgniteComputeImpl implements IgniteCompute {
     private final TopologyService topologyService;
+    private final IgniteTablesInternal tables;
     private final ComputeComponent computeComponent;
 
     private final ThreadLocalRandom random = ThreadLocalRandom.current();
 
-    public IgniteComputeImpl(TopologyService topologyService, ComputeComponent 
computeComponent) {
+    /**
+     * Create new instance.
+     */
+    public IgniteComputeImpl(TopologyService topologyService, 
IgniteTablesInternal tables, ComputeComponent computeComponent) {
         this.topologyService = topologyService;
+        this.tables = tables;
         this.computeComponent = computeComponent;
     }
 
@@ -101,6 +111,89 @@ public class IgniteComputeImpl implements IgniteCompute {
         return targetNode.equals(topologyService.localMember());
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeColocated(String tableName, Tuple 
key, Class<? extends ComputeJob<R>> jobClass, Object... args) {
+        Objects.requireNonNull(tableName);
+        Objects.requireNonNull(key);
+        Objects.requireNonNull(jobClass);
+
+        return requiredTable(tableName)
+                .thenApply(table -> leaderOfTablePartitionByTupleKey(table, 
key))
+                .thenCompose(primaryNode -> executeOnOneNode(primaryNode, 
jobClass, args));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <K, R> CompletableFuture<R> executeColocated(
+            String tableName,
+            K key,
+            Mapper<K> keyMapper,
+            Class<? extends ComputeJob<R>> jobClass,
+            Object... args
+    ) {
+        Objects.requireNonNull(tableName);
+        Objects.requireNonNull(key);
+        Objects.requireNonNull(keyMapper);
+        Objects.requireNonNull(jobClass);
+
+        return requiredTable(tableName)
+                .thenApply(table -> leaderOfTablePartitionByMappedKey(table, 
key, keyMapper))
+                .thenCompose(primaryNode -> executeOnOneNode(primaryNode, 
jobClass, args));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <R> CompletableFuture<R> executeColocated(String tableName, Tuple 
key, String jobClassName, Object... args) {
+        Objects.requireNonNull(tableName);
+        Objects.requireNonNull(key);
+        Objects.requireNonNull(jobClassName);
+
+        return requiredTable(tableName)
+                .thenApply(table -> leaderOfTablePartitionByTupleKey(table, 
key))
+                .thenCompose(primaryNode -> executeOnOneNode(primaryNode, 
jobClassName, args));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <K, R> CompletableFuture<R> executeColocated(String tableName, K 
key, Mapper<K> keyMapper, String jobClassName, Object... args) {
+        Objects.requireNonNull(tableName);
+        Objects.requireNonNull(key);
+        Objects.requireNonNull(keyMapper);
+        Objects.requireNonNull(jobClassName);
+
+        return requiredTable(tableName)
+                .thenApply(table -> leaderOfTablePartitionByMappedKey(table, 
key, keyMapper))
+                .thenCompose(primaryNode -> executeOnOneNode(primaryNode, 
jobClassName, args));
+    }
+
+    private CompletableFuture<TableImpl> requiredTable(String tableName) {
+        return tables.tableImplAsync(tableName)
+                .thenApply(table -> {
+                    if (table == null) {
+                        throw new IgniteInternalException(String.format("Did 
not find a table by name '%s'", tableName));
+                    }
+                    return table;
+                });
+    }
+
+    private ClusterNode leaderOfTablePartitionByTupleKey(TableImpl table, 
Tuple key) {
+        return requiredLeaderByPartition(table, table.partition(key));
+    }
+
+    private <K> ClusterNode leaderOfTablePartitionByMappedKey(TableImpl table, 
K key, Mapper<K> keyMapper) {
+        return requiredLeaderByPartition(table, table.partition(key, 
keyMapper));
+    }
+
+    private ClusterNode requiredLeaderByPartition(TableImpl table, int 
partitionIndex) {
+        ClusterNode leaderNode = table.leaderAssignment(partitionIndex);
+        if (leaderNode == null) {
+            throw new IgniteInternalException("Leader not found for partition 
" + partitionIndex);
+        }
+
+        return leaderNode;
+    }
+
     /** {@inheritDoc} */
     @Override
     public <R> Map<ClusterNode, CompletableFuture<R>> broadcast(
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
index 42ceb2bd6..01cb42493 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
@@ -20,16 +20,23 @@ package org.apache.ignite.internal.compute;
 import static java.util.Collections.singleton;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -42,12 +49,18 @@ class IgniteComputeImplTest {
     @Mock
     private TopologyService topologyService;
 
+    @Mock
+    private IgniteTablesInternal igniteTables;
+
     @Mock
     private ComputeComponent computeComponent;
 
     @InjectMocks
     private IgniteComputeImpl compute;
 
+    @Mock
+    private TableImpl table;
+
     private final ClusterNode localNode = new ClusterNode("local", "local", 
new NetworkAddress("local-host", 1, "local"));
     private final ClusterNode remoteNode = new ClusterNode("remote", "remote", 
new NetworkAddress("remote-host", 1, "remote"));
 
@@ -70,8 +83,7 @@ class IgniteComputeImplTest {
 
     @Test
     void whenNodeIsRemoteThenExecutesRemotely() throws Exception {
-        when(computeComponent.executeRemotely(remoteNode, SimpleJob.class, 
"a", 42))
-                
.thenReturn(CompletableFuture.completedFuture("remoteResponse"));
+        respondWhenExecutingSimpleJobRemotely();
 
         String result = compute.execute(singleton(remoteNode), 
SimpleJob.class, "a", 42).get();
 
@@ -80,6 +92,37 @@ class IgniteComputeImplTest {
         verify(computeComponent).executeRemotely(remoteNode, SimpleJob.class, 
"a", 42);
     }
 
+    private void respondWhenExecutingSimpleJobRemotely() {
+        when(computeComponent.executeRemotely(remoteNode, SimpleJob.class, 
"a", 42))
+                
.thenReturn(CompletableFuture.completedFuture("remoteResponse"));
+    }
+
+    @Test
+    void executesColocatedOnLeaderNodeOfPartitionCorrespondingToTupleKey() 
throws Exception {
+        respondWhenExecutingSimpleJobRemotely();
+
+        
when(igniteTables.tableImplAsync("PUBLIC.test")).thenReturn(CompletableFuture.completedFuture(table));
+        doReturn(42).when(table).partition(any());
+        doReturn(remoteNode).when(table).leaderAssignment(42);
+
+        String result = compute.executeColocated("PUBLIC.test", 
Tuple.create(Map.of("k", 1)), SimpleJob.class, "a", 42).get();
+
+        assertThat(result, is("remoteResponse"));
+    }
+
+    @Test
+    void executesColocatedOnLeaderNodeOfPartitionCorrespondingToMappedKey() 
throws Exception {
+        respondWhenExecutingSimpleJobRemotely();
+
+        
when(igniteTables.tableImplAsync("PUBLIC.test")).thenReturn(CompletableFuture.completedFuture(table));
+        doReturn(42).when(table).partition(any(), any());
+        doReturn(remoteNode).when(table).leaderAssignment(42);
+
+        String result = compute.executeColocated("PUBLIC.test", 1, 
Mapper.of(Integer.class), SimpleJob.class, "a", 42).get();
+
+        assertThat(result, is("remoteResponse"));
+    }
+
     private static class SimpleJob implements ComputeJob<String> {
         /** {@inheritDoc} */
         @Override
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
index 03b54f353..eddecb5de 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTest.java
@@ -18,16 +18,20 @@
 package org.apache.ignite.internal.compute;
 
 import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.sql.engine.util.CursorUtils.getAllFromCursor;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.aMapWithSize;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.in;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -40,6 +44,8 @@ import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.internal.AbstractClusterIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -207,6 +213,75 @@ class ItComputeTest extends AbstractClusterIntegrationTest 
{
         }
     }
 
+    @Test
+    void executesColocatedWithTupleKey() throws Exception {
+        createTestTableWithOneRow();
+
+        IgniteImpl entryNode = node(0);
+
+        String actualNodeName = entryNode.compute()
+                .executeColocated("PUBLIC.test", Tuple.create(Map.of("k", 1)), 
GetNodeNameJob.class)
+                .get(1, TimeUnit.SECONDS);
+
+        assertThat(actualNodeName, in(allNodeNames()));
+    }
+
+    private void createTestTableWithOneRow() {
+        executeSql("CREATE TABLE test (k int, v int, CONSTRAINT PK PRIMARY KEY 
(k))");
+        executeSql("INSERT INTO test(k, v) VALUES (1, 101)");
+    }
+
+    private List<String> allNodeNames() {
+        return clusterNodes.stream()
+                .map(Ignite::name)
+                .collect(toList());
+    }
+
+    @Test
+    void executesColocatedByClassNameWithTupleKey() throws Exception {
+        createTestTableWithOneRow();
+
+        IgniteImpl entryNode = node(0);
+
+        String actualNodeName = entryNode.compute()
+                .<String>executeColocated("PUBLIC.test", 
Tuple.create(Map.of("k", 1)), GetNodeNameJob.class.getName())
+                .get(1, TimeUnit.SECONDS);
+
+        assertThat(actualNodeName, in(allNodeNames()));
+    }
+
+    @Test
+    void executesColocatedWithMappedKey() throws Exception {
+        createTestTableWithOneRow();
+
+        IgniteImpl entryNode = node(0);
+
+        String actualNodeName = entryNode.compute()
+                .executeColocated("PUBLIC.test", 1, Mapper.of(Integer.class), 
GetNodeNameJob.class)
+                .get(1, TimeUnit.SECONDS);
+
+        assertThat(actualNodeName, in(allNodeNames()));
+    }
+
+    @Test
+    void executesColocatedByClassNameWithMappedKey() throws Exception {
+        createTestTableWithOneRow();
+
+        IgniteImpl entryNode = node(0);
+
+        String actualNodeName = entryNode.compute()
+                .<Integer, String>executeColocated("PUBLIC.test", 1, 
Mapper.of(Integer.class), GetNodeNameJob.class.getName())
+                .get(1, TimeUnit.SECONDS);
+
+        assertThat(actualNodeName, in(allNodeNames()));
+    }
+
+    private List<List<?>> executeSql(String sql, Object... args) {
+        return getAllFromCursor(
+                node(0).queryEngine().query("PUBLIC", sql, args).get(0)
+        );
+    }
+
     private static class ConcatJob implements ComputeJob<String> {
         /** {@inheritDoc} */
         @Override
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index efe723cbb..ef396dd76 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -272,7 +272,7 @@ public class IgniteImpl implements Ignite {
                 distributedTblMgr
         );
 
-        compute = new IgniteComputeImpl(clusterSvc.topologyService(), 
computeComponent);
+        compute = new IgniteComputeImpl(clusterSvc.topologyService(), 
distributedTblMgr, computeComponent);
 
         clientHandlerModule = new ClientHandlerModule(
                 qryEngine,
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
index c20c5ca9a..5c4978aaf 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableScanTest.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.server.RaftServer;
@@ -68,6 +69,7 @@ import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.Pair;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.network.StaticNodeFinder;
@@ -117,6 +119,10 @@ public class ItInternalTableScanTest {
     /** Executor for raft group services. */
     ScheduledExecutorService executor;
 
+    private final Function<NetworkAddress, ClusterNode> addressToNode = addr 
-> {
+        throw new UnsupportedOperationException();
+    };
+
     /**
      * Prepare test environment.
      * <ol>
@@ -188,6 +194,7 @@ public class ItInternalTableScanTest {
                 Int2ObjectMaps.singleton(0, raftGrpSvc),
                 1,
                 NetworkAddress::toString,
+                addressToNode,
                 txManager,
                 mock(TableStorage.class)
         );
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 50b41c31a..3a124b1ce 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BooleanSupplier;
+import java.util.function.Function;
 import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.ByteBufferRow;
@@ -46,6 +47,7 @@ import 
org.apache.ignite.internal.table.distributed.storage.VersionedRowStore;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.HeapLockManager;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.client.service.ItAbstractListenerSnapshotTest;
@@ -78,6 +80,10 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
 
     private final List<TxManager> managers = new ArrayList<>();
 
+    private final Function<NetworkAddress, ClusterNode> addressToNode = addr 
-> {
+        throw new UnsupportedOperationException();
+    };
+
     @AfterEach
     @Override
     public void afterTest() throws Exception {
@@ -103,6 +109,7 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
                 Int2ObjectMaps.singleton(0, service),
                 1,
                 NetworkAddress::toString,
+                addressToNode,
                 txManager,
                 mock(TableStorage.class)
         );
@@ -125,6 +132,7 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
                 Int2ObjectMaps.singleton(0, service),
                 1,
                 NetworkAddress::toString,
+                addressToNode,
                 txManager,
                 mock(TableStorage.class)
         );
@@ -153,6 +161,7 @@ public class ItTablePersistenceTest extends 
ItAbstractListenerSnapshotTest<Parti
                 Int2ObjectMaps.singleton(0, service),
                 1,
                 NetworkAddress::toString,
+                addressToNode,
                 txManager,
                 mock(TableStorage.class)
         );
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 5da992cbc..5d5c27094 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -32,6 +32,7 @@ import java.util.UUID;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.affinity.RendezvousAffinityFunction;
 import org.apache.ignite.internal.raft.Loza;
@@ -92,6 +93,10 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
 
     private ScheduledThreadPoolExecutor executor;
 
+    private final Function<NetworkAddress, ClusterNode> addressToNode = addr 
-> {
+        throw new UnsupportedOperationException();
+    };
+
     /**
      * Returns a count of nodes.
      *
@@ -220,6 +225,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                 accRaftClients,
                 1,
                 NetworkAddress::toString,
+                addressToNode,
                 txMgr,
                 Mockito.mock(TableStorage.class)
         ), new DummySchemaManagerImpl(ACCOUNTS_SCHEMA));
@@ -230,6 +236,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                 custRaftClients,
                 1,
                 NetworkAddress::toString,
+                addressToNode,
                 txMgr,
                 Mockito.mock(TableStorage.class)
         ), new DummySchemaManagerImpl(CUSTOMERS_SCHEMA));
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java
index 4369c37c3..9abd78d0f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/IgniteTablesInternal.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.NodeStoppingException;
 
 /**
@@ -42,4 +43,34 @@ public interface IgniteTablesInternal {
      * @throws NodeStoppingException If an implementation stopped before the 
method was invoked.
      */
     CompletableFuture<TableImpl> tableAsync(UUID id) throws 
NodeStoppingException;
+
+    // TODO: IGNITE-16750 - the following two methods look a bit ugly, 
separation of public/internal Table aspects should help
+
+    /**
+     * Gets a table by name, if it was created before.
+     *
+     * @param name Canonical name of the table ([schemaName].[tableName]) with 
SQL-parser style quotation, e.g.
+     *             "public.tbl0" - the table "PUBLIC.TBL0" will be looked up,
+     *             "PUBLIC.\"Tbl0\"" - "PUBLIC.Tbl0", "\"MySchema\".\"Tbl0\"" 
- "MySchema.Tbl0", etc.
+     * @return Tables with corresponding name or {@code null} if table isn't 
created.
+     * @throws IgniteException If an unspecified platform exception has 
happened internally. Is thrown when:
+     *                         <ul>
+     *                             <li>the node is stopping.</li>
+     *                         </ul>
+     */
+    TableImpl tableImpl(String name);
+
+    /**
+     * Gets a table by name, if it was created before.
+     *
+     * @param name Canonical name of the table ([schemaName].[tableName]) with 
SQL-parser style quotation, e.g.
+     *             "public.tbl0" - the table "PUBLIC.TBL0" will be looked up,
+     *             "PUBLIC.\"Tbl0\"" - "PUBLIC.Tbl0", "\"MySchema\".\"Tbl0\"" 
- "MySchema.Tbl0", etc.
+     * @return Future representing pending completion of the operation.
+     * @throws IgniteException If an unspecified platform exception has 
happened internally. Is thrown when:
+     *                         <ul>
+     *                             <li>the node is stopping.</li>
+     *                         </ul>
+     */
+    CompletableFuture<TableImpl> tableImplAsync(String name);
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 1205b33d3..7ecc1bc24 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.storage.engine.TableStorage;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.LockException;
+import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -227,5 +228,14 @@ public interface InternalTable extends AutoCloseable {
      */
     @NotNull List<String> assignments();
 
+    /**
+     * Returns cluster node that is the leader of the corresponding partition 
group or throws an exception if
+     * it cannot be found.
+     *
+     * @param partition partition number
+     * @return leader node of the partition group corresponding to the 
partition
+     */
+    ClusterNode leaderAssignment(int partition);
+
     //TODO: IGNITE-14488. Add invoke() methods.
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 3687f7815..3cbdcaed3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -19,18 +19,21 @@ package org.apache.ignite.internal.table;
 
 import java.util.Objects;
 import java.util.UUID;
+import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.marshaller.MarshallerException;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import 
org.apache.ignite.internal.schema.marshaller.reflection.KvMarshallerImpl;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
 import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.TestOnly;
 
 /**
  * Table view implementation for binary objects.
@@ -106,21 +109,53 @@ public class TableImpl implements Table {
     }
 
     /**
-     * Returns a partition for a tuple.
+     * Returns a partition for a key tuple.
      *
-     * @param t The tuple.
+     * @param key The tuple.
      * @return The partition.
      */
-    @TestOnly
-    public int partition(Tuple t) {
-        Objects.requireNonNull(t);
+    public int partition(Tuple key) {
+        Objects.requireNonNull(key);
 
         try {
-            final Row keyRow = new 
TupleMarshallerImpl(schemaReg).marshalKey(t);
+            final Row keyRow = new 
TupleMarshallerImpl(schemaReg).marshalKey(key);
 
             return tbl.partition(keyRow);
         } catch (TupleMarshallerException e) {
             throw new IgniteInternalException(e);
         }
     }
+
+    /**
+     * Returns a partition for a key.
+     *
+     * @param key The key.
+     * @param keyMapper Key mapper
+     * @return The partition.
+     */
+    public <K> int partition(K key, Mapper<K> keyMapper) {
+        Objects.requireNonNull(key);
+        Objects.requireNonNull(keyMapper);
+
+        BinaryRow keyRow;
+        var marshaller = new KvMarshallerImpl<>(schemaReg.schema(), keyMapper, 
keyMapper);
+        try {
+            keyRow = marshaller.marshal(key);
+        } catch (MarshallerException e) {
+            throw new IgniteInternalException("Cannot marshal key", e);
+        }
+
+        return tbl.partition(keyRow);
+    }
+
+    /**
+     * Returns cluster node that is the leader of the corresponding partition 
group or throws an exception if
+     * it cannot be found.
+     *
+     * @param partition partition number
+     * @return leader node of the partition group corresponding to the 
partition
+     */
+    public ClusterNode leaderAssignment(int partition) {
+        return tbl.leaderAssignment(partition);
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 1a0c31958..741e0848d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -157,6 +157,9 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     /** Resolver that resolves a network address to node id. */
     private final Function<NetworkAddress, String> netAddrResolver;
 
+    /** Resolver that resolves a network address to cluster node. */
+    private final Function<NetworkAddress, ClusterNode> clusterNodeResolver;
+
     /** Data region instances. */
     private final Map<String, DataRegion> dataRegions = new 
ConcurrentHashMap<>();
 
@@ -203,6 +206,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
             return node.id();
         };
+        clusterNodeResolver = topologyService::getByAddress;
 
         engine = new RocksDbStorageEngine();
 
@@ -562,7 +566,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         tableStorage.start();
 
         InternalTableImpl internalTable = new InternalTableImpl(name, tblId, 
new Int2ObjectOpenHashMap<>(partitions),
-                partitions, netAddrResolver, txManager, tableStorage);
+                partitions, netAddrResolver, clusterNodeResolver, txManager, 
tableStorage);
 
         var schemaRegistry = new SchemaRegistryImpl(v -> {
             if (!busyLock.enterBusy()) {
@@ -1225,7 +1229,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Table> tableAsync(String name) {
-        return tableAsyncInternal(IgniteObjectName.parseCanonicalName(name));
+        return tableAsyncInternal(IgniteObjectName.parseCanonicalName(name))
+                .thenApply(Function.identity());
     }
 
     /** {@inheritDoc} */
@@ -1241,14 +1246,25 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
         }
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public TableImpl tableImpl(String name) {
+        return join(tableImplAsync(name));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<TableImpl> tableImplAsync(String name) {
+        return tableAsyncInternal(IgniteObjectName.parseCanonicalName(name));
+    }
+
     /**
      * Gets a table by name, if it was created before. Doesn't parse canonical 
name.
      *
      * @param name Table name.
      * @return Future representing pending completion of the {@code 
TableManager#tableAsyncInternal} operation.
      * */
-    @NotNull
-    private CompletableFuture<Table> tableAsyncInternal(String name) {
+    private CompletableFuture<TableImpl> tableAsyncInternal(String name) {
         if (!busyLock.enterBusy()) {
             throw new IgniteException(new NodeStoppingException());
         }
@@ -1259,7 +1275,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 return CompletableFuture.completedFuture(null);
             }
 
-            return (CompletableFuture) tableAsyncInternal(tableId, false);
+            return tableAsyncInternal(tableId, false);
         } finally {
             busyLock.leaveBusy();
         }
@@ -1272,7 +1288,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
      * @param checkConfiguration {@code True} when the method checks a 
configuration before trying to get a table, {@code false} otherwise.
      * @return Future representing pending completion of the operation.
      */
-    @NotNull
     private CompletableFuture<TableImpl> tableAsyncInternal(UUID id, boolean 
checkConfiguration) {
         if (checkConfiguration && !isTableConfigured(id)) {
             return CompletableFuture.completedFuture(null);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 55ed72f2a..5e4bde4e9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -62,10 +62,12 @@ import 
org.apache.ignite.internal.table.distributed.command.scan.ScanInitCommand
 import 
org.apache.ignite.internal.table.distributed.command.scan.ScanRetrieveBatchCommand;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.lang.IgniteStringFormatter;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.lang.IgniteUuidGenerator;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.client.Command;
 import org.apache.ignite.raft.client.Peer;
@@ -99,6 +101,9 @@ public class InternalTableImpl implements InternalTable {
     /** Resolver that resolves a network address to node id. */
     private final Function<NetworkAddress, String> netAddrResolver;
 
+    /** Resolver that resolves a network address to cluster node. */
+    private final Function<NetworkAddress, ClusterNode> clusterNodeResolver;
+
     /** Transactional manager. */
     private final TxManager txManager;
 
@@ -124,6 +129,7 @@ public class InternalTableImpl implements InternalTable {
             Int2ObjectMap<RaftGroupService> partMap,
             int partitions,
             Function<NetworkAddress, String> netAddrResolver,
+            Function<NetworkAddress, ClusterNode> clusterNodeResolver,
             TxManager txManager,
             TableStorage tableStorage
     ) {
@@ -132,6 +138,7 @@ public class InternalTableImpl implements InternalTable {
         this.partitionMap = partMap;
         this.partitions = partitions;
         this.netAddrResolver = netAddrResolver;
+        this.clusterNodeResolver = clusterNodeResolver;
         this.txManager = txManager;
         this.tableStorage = tableStorage;
     }
@@ -394,6 +401,18 @@ public class InternalTableImpl implements InternalTable {
                 .collect(Collectors.toList());
     }
 
+    @Override
+    public ClusterNode leaderAssignment(int partition) {
+        awaitLeaderInitialization();
+
+        RaftGroupService raftGroupService = partitionMap.get(partition);
+        if (raftGroupService == null) {
+            throw new IgniteInternalException("No such partition " + partition 
+ " in table " + tableName);
+        }
+
+        return clusterNodeResolver.apply(raftGroupService.leader().address());
+    }
+
     private void awaitLeaderInitialization() {
         List<CompletableFuture<Void>> futs = new ArrayList<>();
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index d168179ee..3d6a0f14b 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -68,7 +68,7 @@ public class DummyInternalTableImpl extends InternalTableImpl 
{
     public DummyInternalTableImpl(VersionedRowStore store, TxManager 
txManager) {
         super("test", UUID.randomUUID(),
                 Int2ObjectMaps.singleton(0, mock(RaftGroupService.class)),
-                1, null, txManager, mock(TableStorage.class));
+                1, null, null, txManager, mock(TableStorage.class));
 
         RaftGroupService svc = partitionMap.get(0);
 

Reply via email to