This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ae5d75b97 IGNITE-22149 Java thin: Implement partition API (#3764)
1ae5d75b97 is described below

commit 1ae5d75b973ad8069af886e1dcfcfa2f9c801ab4
Author: Mikhail <[email protected]>
AuthorDate: Wed May 22 17:24:01 2024 +0300

    IGNITE-22149 Java thin: Implement partition API (#3764)
---
 .../org/apache/ignite/network/NodeMetadata.java    |   9 ++
 .../ignite/internal/client/proto/ClientOp.java     |   3 +
 .../handler/ClientInboundMessageHandler.java       |   3 +
 .../cluster/ClientClusterGetNodesRequest.java      |  25 ++-
 ...blePartitionPrimaryReplicasNodesGetRequest.java |  65 ++++++++
 .../apache/ignite/client/ClientOperationType.java  |   8 +-
 .../org/apache/ignite/client/RetryReadPolicy.java  |   1 +
 .../apache/ignite/internal/client/ClientUtils.java |   3 +
 .../ignite/internal/client/TcpIgniteClient.java    |  27 +++-
 .../client/table/ClientPartitionManager.java       | 169 +++++++++++++++++++++
 .../ignite/internal/client/table/ClientTable.java  |  13 +-
 .../internal/table/partition/HashPartition.java    |   0
 .../ItAbstractPartitionManagerTest.java}           |  59 +++++--
 .../ItStandalonePartitionManagerTest.java}         |  36 +----
 .../ItThinClientPartitionManagerTest.java}         |  44 ++----
 15 files changed, 378 insertions(+), 87 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/network/NodeMetadata.java 
b/modules/api/src/main/java/org/apache/ignite/network/NodeMetadata.java
index 154c602d2c..5291973b75 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/NodeMetadata.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/NodeMetadata.java
@@ -75,4 +75,13 @@ public class NodeMetadata implements Serializable {
     public int hashCode() {
         return Objects.hash(restHost, httpPort, httpsPort);
     }
+
+    @Override
+    public String toString() {
+        return "NodeMetadata{"
+                + "restHost='" + restHost + '\''
+                + ", httpPort=" + httpPort
+                + ", httpsPort=" + httpsPort
+                + '}';
+    }
 }
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index bb371faaed..dd670e89ae 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -175,4 +175,7 @@ public class ClientOp {
      * Execute MapReduce task.
      */
     public static final int COMPUTE_EXECUTE_MAPREDUCE = 64;
+
+    /** Get all primary replicas mapping to cluster nodes. */
+    public static final int PRIMARY_REPLICAS_GET = 65;
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index a9a75b8405..360e80c6cb 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -89,6 +89,7 @@ import 
org.apache.ignite.client.handler.requests.table.ClientTupleReplaceExactRe
 import 
org.apache.ignite.client.handler.requests.table.ClientTupleReplaceRequest;
 import 
org.apache.ignite.client.handler.requests.table.ClientTupleUpsertAllRequest;
 import 
org.apache.ignite.client.handler.requests.table.ClientTupleUpsertRequest;
+import 
org.apache.ignite.client.handler.requests.table.partition.ClientTablePartitionPrimaryReplicasNodesGetRequest;
 import 
org.apache.ignite.client.handler.requests.tx.ClientTransactionBeginRequest;
 import 
org.apache.ignite.client.handler.requests.tx.ClientTransactionCommitRequest;
 import 
org.apache.ignite.client.handler.requests.tx.ClientTransactionRollbackRequest;
@@ -778,6 +779,8 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
             case ClientOp.STREAMER_BATCH_SEND:
                 return ClientStreamerBatchSendRequest.process(in, out, 
igniteTables);
 
+            case ClientOp.PRIMARY_REPLICAS_GET:
+                return 
ClientTablePartitionPrimaryReplicasNodesGetRequest.process(in, out, 
igniteTables);
             default:
                 throw new IgniteException(PROTOCOL_ERR, "Unexpected operation 
code: " + opCode);
         }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java
index f0c8fa7b8b..dfe562444d 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java
@@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
 
 /**
  * Cluster nodes request.
@@ -42,15 +43,27 @@ public class ClientClusterGetNodesRequest {
         out.packInt(nodes.size());
 
         for (ClusterNode node : nodes) {
-            out.packInt(4);
-
-            out.packString(node.id());
-            out.packString(node.name());
-            out.packString(node.address().host());
-            out.packInt(node.address().port());
+            packClusterNode(node, out);
         }
 
         // Null future indicates synchronous completion.
         return null;
     }
+
+    /**
+     * Pack {@link ClusterNode} instance to client message.
+     *
+     * @param clusterNode Cluster node.
+     * @param out Client message packer.
+     */
+    public static void packClusterNode(ClusterNode clusterNode, 
ClientMessagePacker out) {
+        out.packInt(4);
+
+        out.packString(clusterNode.id());
+        out.packString(clusterNode.name());
+
+        NetworkAddress address = clusterNode.address();
+        out.packString(address.host());
+        out.packInt(address.port());
+    }
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientTablePartitionPrimaryReplicasNodesGetRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientTablePartitionPrimaryReplicasNodesGetRequest.java
new file mode 100644
index 0000000000..dc01d07815
--- /dev/null
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientTablePartitionPrimaryReplicasNodesGetRequest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.table.partition;
+
+import static 
org.apache.ignite.client.handler.requests.cluster.ClientClusterGetNodesRequest.packClusterNode;
+import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
+
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import 
org.apache.ignite.client.handler.requests.table.ClientTablePartitionPrimaryReplicasGetRequest;
+import org.apache.ignite.internal.client.proto.ClientMessagePacker;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.manager.IgniteTables;
+import org.apache.ignite.table.partition.Partition;
+
+/**
+ * Client primary replicas with node info retrieval request.
+ * See also {@link ClientTablePartitionPrimaryReplicasGetRequest}.
+ */
+public class ClientTablePartitionPrimaryReplicasNodesGetRequest {
+
+    /**
+     * Process the request.
+     *
+     * @param in Unpacker.
+     * @param out Packer.
+     * @param tables Ignite tables.
+     * @return Future which will complete after request process finished.
+     */
+    public static CompletableFuture<Void> process(
+            ClientMessageUnpacker in,
+            ClientMessagePacker out,
+            IgniteTables tables
+    ) {
+        return readTableAsync(in, tables).thenCompose(table -> 
table.partitionManager()
+                .primaryReplicasAsync()
+                .thenAccept(partitions -> {
+                    out.packInt(partitions.size());
+                    for (Entry<Partition, ClusterNode> e : 
partitions.entrySet()) {
+                        HashPartition partition = (HashPartition) e.getKey();
+
+                        out.packInt(partition.partitionId());
+
+                        packClusterNode(e.getValue(), out);
+                    }
+                }));
+    }
+}
diff --git 
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
 
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
index 820e111de4..916621f3d5 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
@@ -26,6 +26,7 @@ import org.apache.ignite.sql.async.AsyncResultSet;
 import org.apache.ignite.table.DataStreamerTarget;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.manager.IgniteTables;
+import org.apache.ignite.table.partition.PartitionManager;
 import org.apache.ignite.tx.Transaction;
 
 /**
@@ -175,5 +176,10 @@ public enum ClientOperationType {
     /**
      * SQL Execute batch ({@link IgniteSql#executeBatchAsync(Transaction, 
String, BatchedArguments)}).
      */
-    SQL_EXECUTE_BATCH
+    SQL_EXECUTE_BATCH,
+
+    /**
+     * Get all primary replicas mapping to cluster nodes ({@link 
PartitionManager#primaryReplicasAsync()}).
+     */
+    PRIMARY_REPLICAS_GET
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java 
b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
index 4912f2a2db..eec5bbb004 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
@@ -60,6 +60,7 @@ public class RetryReadPolicy extends RetryLimitPolicy {
             case SQL_CURSOR_NEXT_PAGE:
             case SQL_EXECUTE_SCRIPT:
             case STREAMER_BATCH_SEND:
+            case PRIMARY_REPLICAS_GET:
                 return false;
 
             default:
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
index bc76849b24..1d1d70709a 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
@@ -257,6 +257,9 @@ public class ClientUtils {
             case ClientOp.SQL_EXEC_BATCH:
                 return ClientOperationType.SQL_EXECUTE_BATCH;
 
+            case ClientOp.PRIMARY_REPLICAS_GET:
+                return ClientOperationType.PRIMARY_REPLICAS_GET;
+
             // Do not return null from default arm intentionally, so we don't 
forget to update this when new ClientOp values are added.
             default:
                 throw new UnsupportedOperationException("Invalid op code: " + 
opCode);
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index 591c83fcbc..4a6adf9208 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -30,6 +30,7 @@ import org.apache.ignite.client.IgniteClientConfiguration;
 import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.internal.catalog.sql.IgniteCatalogSqlImpl;
 import org.apache.ignite.internal.client.compute.ClientCompute;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.sql.ClientSql;
 import org.apache.ignite.internal.client.table.ClientTables;
@@ -195,13 +196,9 @@ public class TcpIgniteClient implements IgniteClient {
             List<ClusterNode> res = new ArrayList<>(cnt);
 
             for (int i = 0; i < cnt; i++) {
-                int fieldCnt = r.in().unpackInt();
-                assert fieldCnt == 4;
+                ClusterNode clusterNode = unpackClusterNode(r);
 
-                res.add(new ClientClusterNode(
-                        r.in().unpackString(),
-                        r.in().unpackString(),
-                        new NetworkAddress(r.in().unpackString(), 
r.in().unpackInt())));
+                res.add(clusterNode);
             }
 
             return res;
@@ -266,4 +263,22 @@ public class TcpIgniteClient implements IgniteClient {
     public <T extends ClientMessage> CompletableFuture<T> sendRequestAsync(int 
opCode, PayloadWriter writer, PayloadReader<T> reader) {
         return ch.serviceAsync(opCode, writer, reader);
     }
+
+    /**
+     * Tries to unpack {@link ClusterNode} instance from input channel.
+     *
+     * @param r Payload input channel.
+     * @return Cluster node or {@code null} if message doesn't contain cluster 
node.
+     */
+    public static ClusterNode unpackClusterNode(PayloadInputChannel r) {
+        ClientMessageUnpacker in = r.in();
+
+        int fieldCnt = r.in().unpackInt();
+        assert fieldCnt == 4;
+
+        return new ClientClusterNode(
+                in.unpackString(),
+                in.unpackString(),
+                new NetworkAddress(in.unpackString(), in.unpackInt()));
+    }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
new file mode 100644
index 0000000000..ad0494985d
--- /dev/null
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.table;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.client.TcpIgniteClient.unpackClusterNode;
+import static 
org.apache.ignite.internal.client.table.ClientTupleSerializer.getPartitionAwarenessProvider;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Client partition manager implementation.
+ */
+public class ClientPartitionManager implements PartitionManager {
+    private final ClientTable tbl;
+
+    private final Lock lock = new ReentrantLock();
+
+    private final Map<Partition, ClusterNode> cache = new HashMap<>();
+
+    private @Nullable Instant aliveUntil;
+
+    ClientPartitionManager(ClientTable clientTable) {
+        this.tbl = clientTable;
+    }
+
+    @Override
+    public CompletableFuture<ClusterNode> primaryReplicaAsync(Partition 
partition) {
+        if (!(partition instanceof HashPartition)) {
+            throw new IllegalArgumentException("Unsupported partition type: " 
+ partition);
+        }
+
+        ClusterNode clusterNode = getClusterNode(partition);
+
+        if (clusterNode != null) {
+            return completedFuture(clusterNode);
+        }
+
+        return primaryReplicasAsync()
+                .thenApply(map -> map.get(partition));
+    }
+
+    @Override
+    @SuppressWarnings("resource")
+    public CompletableFuture<Map<Partition, ClusterNode>> 
primaryReplicasAsync() {
+        Map<Partition, ClusterNode> cache = lookupCache();
+
+        if (cache != null) {
+            return completedFuture(cache);
+        }
+
+        return tbl.channel().serviceAsync(ClientOp.PRIMARY_REPLICAS_GET,
+                w -> w.out().packInt(tbl.tableId()),
+                r -> {
+                    ClientMessageUnpacker in = r.in();
+
+                    if (in.tryUnpackNil()) {
+                        return Collections.<Partition, ClusterNode>emptyMap();
+                    }
+
+                    int size = in.unpackInt();
+
+                    Map<Partition, ClusterNode> res = new HashMap<>(size);
+
+                    for (int i = 0; i < size; i++) {
+                        int partition = in.unpackInt();
+                        res.put(new HashPartition(partition), 
unpackClusterNode(r));
+                    }
+
+                    return res;
+                })
+                .thenApply(this::updateCache);
+    }
+
+    @Override
+    public <K> CompletableFuture<Partition> partitionAsync(K key, Mapper<K> 
mapper) {
+        Objects.requireNonNull(key, "Key is null.");
+        Objects.requireNonNull(mapper, "Mapper is null.");
+
+        return getPartition(getPartitionAwarenessProvider(null, mapper, key));
+    }
+
+    @Override
+    public CompletableFuture<Partition> partitionAsync(Tuple key) {
+        Objects.requireNonNull(key, "Key is null.");
+
+        return getPartition(getPartitionAwarenessProvider(null, key));
+    }
+
+    private @Nullable ClusterNode getClusterNode(Partition partition) {
+        lock.lock();
+        try {
+            if (aliveUntil == null || Instant.now().isAfter(aliveUntil)) {
+                cache.clear();
+                aliveUntil = null;
+                return null;
+            }
+            return cache.get(partition);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private @Nullable Map<Partition, ClusterNode> lookupCache() {
+        lock.lock();
+        try {
+            if (aliveUntil == null || Instant.now().isAfter(aliveUntil)) {
+                cache.clear();
+                aliveUntil = null;
+                return null;
+            }
+            return Map.copyOf(cache);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private Map<Partition, ClusterNode> updateCache(Map<Partition, 
ClusterNode> map) {
+        lock.lock();
+        try {
+            cache.putAll(map);
+            aliveUntil = Instant.now().plus(1, ChronoUnit.MINUTES);
+            return map;
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    private CompletableFuture<Partition> 
getPartition(PartitionAwarenessProvider partitionAwarenessProvider) {
+        return tbl.getPartitionAssignment()
+                .thenCompose(partitions -> 
tbl.getLatestSchema().thenApply(schema -> {
+                    Integer hash = 
partitionAwarenessProvider.getObjectHashCode(schema);
+
+                    return new HashPartition(Math.abs(hash % 
partitions.size()));
+                }));
+    }
+}
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index 70956196a6..9818661563 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -87,6 +87,8 @@ public class ClientTable implements Table {
 
     private volatile PartitionAssignment partitionAssignment = null;
 
+    private final ClientPartitionManager clientPartitionManager;
+
     /**
      * Constructor.
      *
@@ -95,7 +97,12 @@ public class ClientTable implements Table {
      * @param id Table id.
      * @param name Table name.
      */
-    public ClientTable(ReliableChannel ch, MarshallersProvider marshallers, 
int id, String name) {
+    public ClientTable(
+            ReliableChannel ch,
+            MarshallersProvider marshallers,
+            int id,
+            String name
+    ) {
         assert ch != null;
         assert marshallers != null;
         assert name != null && !name.isEmpty();
@@ -106,6 +113,7 @@ public class ClientTable implements Table {
         this.name = name;
         this.log = ClientUtils.logger(ch.configuration(), ClientTable.class);
         this.sql = new ClientSql(ch, marshallers);
+        clientPartitionManager = new ClientPartitionManager(this);
     }
 
     /**
@@ -133,9 +141,8 @@ public class ClientTable implements Table {
     }
 
     @Override
-    // TODO: IGNITE-22149
     public PartitionManager partitionManager() {
-        throw new UnsupportedOperationException("This operation doesn't 
implemented yet.");
+        return clientPartitionManager;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
similarity index 100%
copy from 
modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
copy to 
modules/core/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItAbstractPartitionManagerTest.java
similarity index 64%
rename from 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java
rename to 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItAbstractPartitionManagerTest.java
index 5bade5447d..8ca8096487 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItAbstractPartitionManagerTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table;
+package org.apache.ignite.internal.table.partition;
 
 import static java.util.concurrent.CompletableFuture.allOf;
 import static org.apache.ignite.internal.SessionUtils.executeUpdate;
@@ -25,16 +25,21 @@ import static 
org.apache.ignite.internal.table.TableRow.tuple;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.SchemaRegistry;
-import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.partition.Partition;
 import org.apache.ignite.table.partition.PartitionManager;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -42,19 +47,23 @@ import org.junit.jupiter.api.Test;
 /**
  * Test suite for {@link PartitionManager}.
  */
-public class ItPartitionManagerTest extends ClusterPerTestIntegrationTest {
-    private static final String TABLE_NAME = "tableName";
+public abstract class ItAbstractPartitionManagerTest extends 
ClusterPerTestIntegrationTest {
+    protected static final String TABLE_NAME = "tableName";
+
+    protected static final String ZONE_NAME = "TEST_ZONE";
 
     private static final int PARTITIONS = 3;
 
+    protected abstract PartitionManager partitionManager();
+
     @BeforeEach
     public void setup() {
-        String zoneSql = "create zone test_zone with"
+        String zoneSql = "create zone " + ZONE_NAME + " with"
                 + " partitions=" + PARTITIONS + ","
                 + " replicas=3,"
                 + " storage_profiles='" + DEFAULT_STORAGE_PROFILE + "'";
 
-        String sql = "create table " + TABLE_NAME + " (key int primary key, 
val varchar(20)) with primary_zone='TEST_ZONE'";
+        String sql = "create table " + TABLE_NAME + " (key int primary key, 
val varchar(20)) with primary_zone='" + ZONE_NAME + "'";
 
         cluster.doInSession(0, session -> {
             executeUpdate(zoneSql, session);
@@ -67,21 +76,49 @@ public class ItPartitionManagerTest extends 
ClusterPerTestIntegrationTest {
         }
     }
 
+    @Test
+    public void primaryPartitions() {
+        TableViewInternal tableViewInternal = 
unwrapTableViewInternal(cluster.aliveNode().tables().table(TABLE_NAME));
+
+        verifyPrimaryPartition(tableViewInternal, PARTITIONS);
+
+        executeSql("ALTER TABLE " + TABLE_NAME + " ADD COLUMN val1 VARCHAR 
DEFAULT 'newDefault'");
+        verifyPrimaryPartition(tableViewInternal, PARTITIONS);
+        verifyAllKeys(tableViewInternal, PARTITIONS);
+    }
+
     @Test
     public void partitionsForAllKeys() {
-        PartitionManager partitionManager = 
cluster.aliveNode().tables().table(TABLE_NAME).partitionManager();
         TableViewInternal tableViewInternal = 
unwrapTableViewInternal(cluster.aliveNode().tables().table(TABLE_NAME));
+
+        verifyAllKeys(tableViewInternal, PARTITIONS);
+    }
+
+    private void verifyPrimaryPartition(TableViewInternal tableViewInternal, 
int partitions) {
+        InternalTable internalTable = tableViewInternal.internalTable();
+
+        for (int i = 0; i < partitions; i++) {
+            CompletableFuture<ClusterNode> clusterNodeCompletableFuture = 
internalTable.partitionLocation(
+                    new TablePartitionId(internalTable.tableId(), i));
+
+            CompletableFuture<ClusterNode> clusterNodeCompletableFuture1 = 
partitionManager().primaryReplicaAsync(new HashPartition(i));
+
+            assertThat(clusterNodeCompletableFuture.join().id(), 
equalTo(clusterNodeCompletableFuture1.join().id()));
+        }
+    }
+
+    private void verifyAllKeys(TableViewInternal tableViewInternal, int 
partitions) {
         InternalTable internalTable = tableViewInternal.internalTable();
 
-        CompletableFuture<?>[] futures = new CompletableFuture<?>[PARTITIONS];
-        for (int i = 0; i < PARTITIONS; i++) {
+        CompletableFuture<?>[] futures = new CompletableFuture<?>[partitions];
+        for (int i = 0; i < partitions; i++) {
             CompletableFuture<Object> future = new CompletableFuture<>();
 
             futures[i] = future;
 
             Publisher<BinaryRow> scan = internalTable.scan(i, null);
 
-            HashPartition value = new HashPartition(i);
+            Partition value = new HashPartition(i);
 
             scan.subscribe(new Subscriber<>() {
                 @Override
@@ -95,7 +132,7 @@ public class ItPartitionManagerTest extends 
ClusterPerTestIntegrationTest {
                     Tuple tuple = tuple(registry.resolve(item, 
registry.lastKnownSchemaVersion()));
 
                     Tuple key = Tuple.create().set("key", 
tuple.intValue("key"));
-                    assertThat(partitionManager.partitionAsync(key), 
willBe(value));
+                    assertThat(partitionManager().partitionAsync(key), 
willBe(value));
                 }
 
                 @Override
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItStandalonePartitionManagerTest.java
similarity index 53%
copy from 
modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
copy to 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItStandalonePartitionManagerTest.java
index da622a7d08..b3fee3fee3 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItStandalonePartitionManagerTest.java
@@ -17,40 +17,14 @@
 
 package org.apache.ignite.internal.table.partition;
 
-import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionManager;
 
 /**
- * Hash partition representation.
+ * Embedded mode implementation of partition management test suite.
  */
-public class HashPartition implements Partition {
-    private static final long serialVersionUID = 1717320056615864614L;
-
-    private final int partitionId;
-
-    public HashPartition(int partitionId) {
-        this.partitionId = partitionId;
-    }
-
-    public int partitionId() {
-        return partitionId;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        HashPartition that = (HashPartition) o;
-
-        return partitionId == that.partitionId;
-    }
-
+public class ItStandalonePartitionManagerTest extends 
ItAbstractPartitionManagerTest {
     @Override
-    public int hashCode() {
-        return partitionId;
+    protected PartitionManager partitionManager() {
+        return 
cluster.aliveNode().tables().table(TABLE_NAME).partitionManager();
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItThinClientPartitionManagerTest.java
similarity index 54%
rename from 
modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
rename to 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItThinClientPartitionManagerTest.java
index da622a7d08..6a08ffdb7b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItThinClientPartitionManagerTest.java
@@ -17,40 +17,26 @@
 
 package org.apache.ignite.internal.table.partition;
 
-import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.table.partition.PartitionManager;
+import org.junit.jupiter.api.BeforeEach;
 
 /**
- * Hash partition representation.
+ * Thin client implementation of partition management test suite.
  */
-public class HashPartition implements Partition {
-    private static final long serialVersionUID = 1717320056615864614L;
-
-    private final int partitionId;
-
-    public HashPartition(int partitionId) {
-        this.partitionId = partitionId;
-    }
-
-    public int partitionId() {
-        return partitionId;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-
-        HashPartition that = (HashPartition) o;
-
-        return partitionId == that.partitionId;
+public class ItThinClientPartitionManagerTest extends 
ItAbstractPartitionManagerTest {
+    private IgniteClient client;
+
+    @BeforeEach
+    public void startClient() {
+        client = IgniteClient.builder()
+                .addresses("localhost")
+                .reconnectThrottlingPeriod(0)
+                .build();
     }
 
     @Override
-    public int hashCode() {
-        return partitionId;
+    protected PartitionManager partitionManager() {
+        return client.tables().table(TABLE_NAME).partitionManager();
     }
 }

Reply via email to