This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch IGNITE-22149
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 72b01b0e87c985a561f0f48374e63e7e98f599c2
Author: Mikhail Pochatkin <[email protected]>
AuthorDate: Wed May 15 13:29:09 2024 +0300

    IGNITE-22149 Java Thin client table partition API
---
 .../org/apache/ignite/network/NodeMetadata.java    |   9 +
 .../ignite/internal/client/proto/ClientOp.java     |   6 +
 .../handler/ClientInboundMessageHandler.java       |  11 ++
 .../partition/ClientKeyPartitionGetRequest.java    |  62 +++++++
 .../ClientPartitionPrimaryGetRequest.java          |  76 ++++++++
 .../ClientPartitionsPrimaryGetRequest.java         |  70 ++++++++
 .../ignite/internal/client/TcpIgniteClient.java    |   7 +-
 .../ignite/internal/client/TopologyCache.java      |  73 ++++++++
 .../client/table/ClientPartitionManager.java       | 198 +++++++++++++++++++++
 .../ignite/internal/client/table/ClientTable.java  |  15 +-
 .../ignite/internal/client/table/ClientTables.java |  11 +-
 .../ItAbstractPartitionManagerTest.java}           |  67 +++++--
 .../ItStandalonePartitionManagerTest.java          |  16 ++
 .../ItThinClientPartitionManagerTest.java          |  29 +++
 14 files changed, 632 insertions(+), 18 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/network/NodeMetadata.java 
b/modules/api/src/main/java/org/apache/ignite/network/NodeMetadata.java
index 154c602d2c..5291973b75 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/NodeMetadata.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/NodeMetadata.java
@@ -75,4 +75,13 @@ public class NodeMetadata implements Serializable {
     public int hashCode() {
         return Objects.hash(restHost, httpPort, httpsPort);
     }
+
+    @Override
+    public String toString() {
+        return "NodeMetadata{"
+                + "restHost='" + restHost + '\''
+                + ", httpPort=" + httpPort
+                + ", httpsPort=" + httpsPort
+                + '}';
+    }
 }
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index 0b83a4869f..11b06c49ea 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -170,4 +170,10 @@ public class ClientOp {
 
     /** Execute SQL query with the parameters batch. */
     public static final int SQL_EXEC_BATCH = 63;
+
+    public static final int PARTITION_PRIMARY_GET = 64;
+
+    public static final int PARTITIONS_PRIMARY_GET = 65;
+
+    public static final int KEY_PARTITION_PRIMARY_GET = 66;
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 5669f4d285..754bf8730b 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -88,6 +88,9 @@ import 
org.apache.ignite.client.handler.requests.table.ClientTupleReplaceExactRe
 import 
org.apache.ignite.client.handler.requests.table.ClientTupleReplaceRequest;
 import 
org.apache.ignite.client.handler.requests.table.ClientTupleUpsertAllRequest;
 import 
org.apache.ignite.client.handler.requests.table.ClientTupleUpsertRequest;
+import 
org.apache.ignite.client.handler.requests.table.partition.ClientKeyPartitionGetRequest;
+import 
org.apache.ignite.client.handler.requests.table.partition.ClientPartitionPrimaryGetRequest;
+import 
org.apache.ignite.client.handler.requests.table.partition.ClientPartitionsPrimaryGetRequest;
 import 
org.apache.ignite.client.handler.requests.tx.ClientTransactionBeginRequest;
 import 
org.apache.ignite.client.handler.requests.tx.ClientTransactionCommitRequest;
 import 
org.apache.ignite.client.handler.requests.tx.ClientTransactionRollbackRequest;
@@ -774,6 +777,14 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
             case ClientOp.STREAMER_BATCH_SEND:
                 return ClientStreamerBatchSendRequest.process(in, out, 
igniteTables);
 
+            case ClientOp.PARTITION_PRIMARY_GET:
+                return ClientPartitionPrimaryGetRequest.process(in, out, 
igniteTables);
+
+            case ClientOp.PARTITIONS_PRIMARY_GET:
+                return ClientPartitionsPrimaryGetRequest.process(in, out, 
igniteTables);
+
+            case ClientOp.KEY_PARTITION_PRIMARY_GET:
+                return ClientKeyPartitionGetRequest.process(in, out, 
igniteTables);
             default:
                 throw new IgniteException(PROTOCOL_ERR, "Unexpected operation 
code: " + opCode);
         }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientKeyPartitionGetRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientKeyPartitionGetRequest.java
new file mode 100644
index 0000000000..c9f6757e9e
--- /dev/null
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientKeyPartitionGetRequest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.table.partition;
+
+import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
+import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.client.proto.ClientMessagePacker;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.table.manager.IgniteTables;
+
+/**
+ * Client key partition get request.
+ */
+public class ClientKeyPartitionGetRequest {
+
+    /**
+     * Process the request.
+     *
+     * @param in Unpacker.
+     * @param out Packer.
+     * @param tables Ignite tables.
+     * @return Future which will complete after request process finished.
+     */
+    public static CompletableFuture<Void> process(
+            ClientMessageUnpacker in,
+            ClientMessagePacker out,
+            IgniteTables tables
+    ) {
+        return readTableAsync(in, tables).thenCompose(table -> {
+            if (table == null) {
+                out.packNil();
+                return nullCompletedFuture();
+            } else {
+                return readTuple(in, table, true)
+                        .thenCompose(tuple -> 
table.partitionManager().partitionAsync(tuple))
+                        .thenAccept(partition -> {
+                            HashPartition hashPartition = (HashPartition) 
partition;
+                            out.packInt(hashPartition.partitionId());
+                        });
+            }
+        });
+    }
+}
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientPartitionPrimaryGetRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientPartitionPrimaryGetRequest.java
new file mode 100644
index 0000000000..3c4d2264c2
--- /dev/null
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientPartitionPrimaryGetRequest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.table.partition;
+
+import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.client.proto.ClientMessagePacker;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.table.manager.IgniteTables;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Client table primary partition get request.
+ */
+public class ClientPartitionPrimaryGetRequest {
+
+    /**
+     * Process the request.
+     *
+     * @param in Unpacker.
+     * @param out Packer.
+     * @param tables Ignite tables.
+     * @return Future which will complete after request process finished.
+     */
+    public static CompletableFuture<Void> process(
+            ClientMessageUnpacker in,
+            ClientMessagePacker out,
+            IgniteTables tables
+    ) {
+
+        return readTableAsync(in, tables).thenCompose(table -> {
+            if (table == null) {
+                out.packNil();
+                return nullCompletedFuture();
+            } else {
+                int partition = in.unpackInt();
+                return table.partitionManager()
+                        .primaryReplicaAsync(new HashPartition(partition))
+                        .thenAccept(node -> packClusterNode(node, out));
+            }
+        });
+    }
+
+
+    static void packClusterNode(@Nullable ClusterNode clusterNode, 
ClientMessagePacker out) {
+        if (clusterNode == null) {
+            out.packNil();
+        } else {
+            out.packString(clusterNode.id());
+            out.packString(clusterNode.name());
+            NetworkAddress address = clusterNode.address();
+            out.packString(address.host());
+            out.packInt(address.port());
+        }
+    }
+}
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientPartitionsPrimaryGetRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientPartitionsPrimaryGetRequest.java
new file mode 100644
index 0000000000..ffa23f8c86
--- /dev/null
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientPartitionsPrimaryGetRequest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.table.partition;
+
+import static 
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
+import static 
org.apache.ignite.client.handler.requests.table.partition.ClientPartitionPrimaryGetRequest.packClusterNode;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.client.proto.ClientMessagePacker;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.manager.IgniteTables;
+import org.apache.ignite.table.partition.Partition;
+
+/**
+ * Client all primary partitions get request.
+ */
+public class ClientPartitionsPrimaryGetRequest {
+
+    /**
+     * Process the request.
+     *
+     * @param in Unpacker.
+     * @param out Packer.
+     * @param tables Ignite tables.
+     * @return Future which will complete after request process finished.
+     */
+    public static CompletableFuture<Void> process(
+            ClientMessageUnpacker in,
+            ClientMessagePacker out,
+            IgniteTables tables
+    ) {
+        return readTableAsync(in, tables).thenCompose(table -> {
+            if (table == null) {
+                out.packNil();
+                return nullCompletedFuture();
+            } else {
+                return table.partitionManager()
+                        .primaryReplicasAsync()
+                        .thenAccept(partitions -> {
+                            out.packInt(partitions.size());
+                            for (Entry<Partition, ClusterNode> e : 
partitions.entrySet()) {
+                                HashPartition partition = (HashPartition) 
e.getKey();
+
+                                out.packInt(partition.partitionId());
+                                packClusterNode(e.getValue(), out);
+                            }
+                        });
+            }
+        });
+    }
+}
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index 591c83fcbc..bc9fe7cf95 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -79,6 +79,8 @@ public class TcpIgniteClient implements IgniteClient {
     /** Marshallers provider. */
     private final ReflectionMarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
 
+    private final TopologyCache topologyCache;
+
     /**
      * Cluster name.
      */
@@ -105,9 +107,11 @@ public class TcpIgniteClient implements IgniteClient {
 
         this.cfg = cfg;
 
+        topologyCache = new TopologyCache(this::clusterNodes);
+
         metrics = new ClientMetricSource();
         ch = new ReliableChannel(chFactory, cfg, metrics);
-        tables = new ClientTables(ch, marshallers);
+        tables = new ClientTables(ch, marshallers, topologyCache);
         transactions = new ClientTransactions(ch);
         compute = new ClientCompute(ch, tables);
         sql = new ClientSql(ch, marshallers);
@@ -204,6 +208,7 @@ public class TcpIgniteClient implements IgniteClient {
                         new NetworkAddress(r.in().unpackString(), 
r.in().unpackInt())));
             }
 
+            topologyCache.refresh(res);
             return res;
         });
     }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TopologyCache.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TopologyCache.java
new file mode 100644
index 0000000000..b90564a2ad
--- /dev/null
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TopologyCache.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Cache for server nodes topology.
+ */
+public class TopologyCache {
+    private final Map<String, ClusterNode> topology = new 
ConcurrentHashMap<>();
+
+    private Instant aliveUntil;
+
+    private final Supplier<Collection<ClusterNode>> updateFunction;
+
+    /**
+     * Creates new topology cache.
+     *
+     * @param updateFunction New server nodes topology provider.
+     */
+    public TopologyCache(Supplier<Collection<ClusterNode>> updateFunction) {
+        this.updateFunction = updateFunction;
+    }
+
+    /**
+     * Refreshes cache with new given topology.
+     *
+     * @param newTopology New server nodes topology.
+     */
+    public void refresh(Collection<ClusterNode> newTopology) {
+        topology.clear();
+        for (ClusterNode clusterNode : newTopology) {
+            topology.put(clusterNode.name(), clusterNode);
+        }
+        aliveUntil = Instant.now().plus(1, ChronoUnit.MINUTES);
+    }
+
+    /**
+     * Returns {@link ClusterNode} for given node name.
+     *
+     * @param name Node name.
+     * @return Cluster node for given node name or {@code null} if node name 
is not presented in topology.
+     */
+    public @Nullable ClusterNode get(String name) {
+        if (aliveUntil == null || Instant.now().isAfter(aliveUntil)) {
+            refresh(updateFunction.get());
+        }
+        return topology.get(name);
+    }
+}
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
new file mode 100644
index 0000000000..300f77b4fa
--- /dev/null
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.table;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static 
org.apache.ignite.internal.client.table.ClientTupleSerializer.getPartitionAwarenessProvider;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.ignite.internal.client.ClientClusterNode;
+import org.apache.ignite.internal.client.PayloadInputChannel;
+import org.apache.ignite.internal.client.PayloadOutputChannel;
+import org.apache.ignite.internal.client.TopologyCache;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.client.proto.TuplePart;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Client partition manager implementation.
+ */
+public class ClientPartitionManager implements PartitionManager {
+    private final ClientTable tbl;
+
+    private final TopologyCache topologyCache;
+
+    ClientPartitionManager(ClientTable clientTable, TopologyCache 
topologyCache) {
+        this.tbl = clientTable;
+        this.topologyCache = topologyCache;
+    }
+
+    @Override
+    @SuppressWarnings("resource")
+    public CompletableFuture<ClusterNode> primaryReplicaAsync(Partition 
partition) {
+        if (!(partition instanceof ClientHashPartition)) {
+            throw new IllegalArgumentException();
+        }
+
+        ClientHashPartition clientPartition = (ClientHashPartition) partition;
+        return tbl.getPartitionAssignment().thenCompose(names -> {
+            String node = names.get(clientPartition.partitionId);
+            if (node == null) {
+                return 
tbl.channel().serviceAsync(ClientOp.PARTITION_PRIMARY_GET,
+                        w -> {
+                            w.out().packInt(tbl.tableId());
+                            w.out().packInt(clientPartition.partitionId);
+                        },
+                        ClientPartitionManager::unpackClusterNode);
+            } else {
+                return completedFuture(topologyCache.get(node));
+            }
+        });
+    }
+
+    @Override
+    @SuppressWarnings("resource")
+    public CompletableFuture<Map<Partition, ClusterNode>> 
primaryReplicasAsync() {
+        return tbl.channel().serviceAsync(ClientOp.PARTITIONS_PRIMARY_GET,
+                w -> w.out().packInt(tbl.tableId()),
+                r -> {
+                    ClientMessageUnpacker in = r.in();
+
+                    if (in.tryUnpackNil()) {
+                        return null;
+                    }
+
+                    int size = in.unpackInt();
+
+                    Map<Partition, ClusterNode> res = new HashMap<>(size);
+
+                    for (int i = 0; i < size; i++) {
+                        int partition = in.unpackInt();
+                        res.put(new ClientHashPartition(partition), 
unpackClusterNode(r));
+                    }
+
+                    return res;
+                });
+    }
+
+    @Override
+    public <K> CompletableFuture<Partition> partitionAsync(K key, Mapper<K> 
mapper) {
+        return getPartition(getPartitionAwarenessProvider(null, mapper, key),
+                (schema, w) -> ClientRecordSerializer.writeRecRaw(key, mapper, 
schema, w.out(), TuplePart.KEY)
+        );
+    }
+
+    @Override
+    public CompletableFuture<Partition> partitionAsync(Tuple key) {
+        return getPartition(getPartitionAwarenessProvider(null, key),
+                (schema, w) -> ClientTupleSerializer.writeTupleRaw(key, 
schema, w, true)
+        );
+    }
+
+    @SuppressWarnings("resource")
+    private CompletableFuture<Partition> getPartition(
+            PartitionAwarenessProvider partitionAwarenessProvider,
+            BiConsumer<ClientSchema, PayloadOutputChannel> keyPack
+    ) {
+        return tbl.getPartitionAssignment()
+                .thenCompose(partitions -> 
tbl.getLatestSchema().thenApply(schema -> {
+                    Integer hash = 
partitionAwarenessProvider.getObjectHashCode(schema);
+
+                    return Math.abs(hash % partitions.size());
+                }))
+                .exceptionally(t -> -1)
+                .thenCompose(partition -> {
+                    if (partition == -1) {
+                        return tbl.getLatestSchema()
+                                .thenCompose(schema ->
+                                        
tbl.channel().serviceAsync(ClientOp.KEY_PARTITION_PRIMARY_GET,
+                                                w -> {
+                                                    
w.out().packInt(tbl.tableId());
+                                                    keyPack.accept(schema, w);
+                                                },
+                                                r -> r.in().unpackInt()
+                                        ));
+                    }
+
+                    return completedFuture(partition);
+                }).thenApply(ClientHashPartition::new);
+    }
+
+    private static @Nullable ClusterNode unpackClusterNode(PayloadInputChannel 
r) {
+        ClientMessageUnpacker in = r.in();
+        if (in.tryUnpackNil()) {
+            return null;
+        }
+
+        return new ClientClusterNode(
+                in.unpackString(),
+                in.unpackString(),
+                new NetworkAddress(in.unpackString(), in.unpackInt()));
+    }
+
+    public static class ClientHashPartition implements Partition {
+        private static final long serialVersionUID = -2089375867774271170L;
+
+        private final int partitionId;
+
+        public ClientHashPartition(int partitionId) {
+            this.partitionId = partitionId;
+            if (partitionId > 2 || partitionId < 0) {
+                System.out.println();
+            }
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            ClientHashPartition that = (ClientHashPartition) o;
+
+            return partitionId == that.partitionId;
+        }
+
+        @Override
+        public int hashCode() {
+            return partitionId;
+        }
+
+        @Override
+        public String toString() {
+            return "ClientHashPartition{"
+                    + "partitionId=" + partitionId
+                    + '}';
+        }
+    }
+}
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index 70956196a6..fd0830282e 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.client.ClientUtils;
 import org.apache.ignite.internal.client.PayloadInputChannel;
 import org.apache.ignite.internal.client.PayloadOutputChannel;
 import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.TopologyCache;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.proto.ColumnTypeConverter;
@@ -77,6 +78,8 @@ public class ClientTable implements Table {
 
     private final IgniteLogger log;
 
+    private final TopologyCache topologyCache;
+
     private static final int UNKNOWN_SCHEMA_VERSION = -1;
 
     private volatile int latestSchemaVer = UNKNOWN_SCHEMA_VERSION;
@@ -95,13 +98,20 @@ public class ClientTable implements Table {
      * @param id Table id.
      * @param name Table name.
      */
-    public ClientTable(ReliableChannel ch, MarshallersProvider marshallers, 
int id, String name) {
+    public ClientTable(
+            ReliableChannel ch,
+            MarshallersProvider marshallers,
+            TopologyCache topologyCache,
+            int id,
+            String name
+    ) {
         assert ch != null;
         assert marshallers != null;
         assert name != null && !name.isEmpty();
 
         this.ch = ch;
         this.marshallers = marshallers;
+        this.topologyCache = topologyCache;
         this.id = id;
         this.name = name;
         this.log = ClientUtils.logger(ch.configuration(), ClientTable.class);
@@ -133,9 +143,8 @@ public class ClientTable implements Table {
     }
 
     @Override
-    // TODO: IGNITE-22149
     public PartitionManager partitionManager() {
-        throw new UnsupportedOperationException("This operation doesn't 
implemented yet.");
+        return new ClientPartitionManager(this, topologyCache);
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
index 80ef7baa78..f898c89a33 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.TopologyCache;
 import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.table.Table;
@@ -37,15 +38,19 @@ public class ClientTables implements IgniteTables {
 
     private final MarshallersProvider marshallers;
 
+    private final TopologyCache topologyCache;
+
     /**
      * Constructor.
      *
      * @param ch Channel.
      * @param marshallers Marshallers provider.
+     * @param topologyCache Server topology cache.
      */
-    public ClientTables(ReliableChannel ch, MarshallersProvider marshallers) {
+    public ClientTables(ReliableChannel ch, MarshallersProvider marshallers, 
TopologyCache topologyCache) {
         this.ch = ch;
         this.marshallers = marshallers;
+        this.topologyCache = topologyCache;
     }
 
     /** {@inheritDoc} */
@@ -63,7 +68,7 @@ public class ClientTables implements IgniteTables {
             var res = new ArrayList<Table>(cnt);
 
             for (int i = 0; i < cnt; i++) {
-                res.add(new ClientTable(ch, marshallers, in.unpackInt(), 
in.unpackString()));
+                res.add(new ClientTable(ch, marshallers, topologyCache, 
in.unpackInt(), in.unpackString()));
             }
 
             return res;
@@ -82,6 +87,6 @@ public class ClientTables implements IgniteTables {
         Objects.requireNonNull(name);
 
         return ch.serviceAsync(ClientOp.TABLE_GET, w -> 
w.out().packString(name),
-                r -> r.in().tryUnpackNil() ? null : new ClientTable(ch, 
marshallers, r.in().unpackInt(), name));
+                r -> r.in().tryUnpackNil() ? null : new ClientTable(ch, 
marshallers, topologyCache, r.in().unpackInt(), name));
     }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItAbstractPartitionManagerTest.java
similarity index 61%
rename from 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java
rename to 
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItAbstractPartitionManagerTest.java
index 5bade5447d..28f7bb9b57 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItAbstractPartitionManagerTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table;
+package org.apache.ignite.internal.table.partition;
 
 import static java.util.concurrent.CompletableFuture.allOf;
 import static org.apache.ignite.internal.SessionUtils.executeUpdate;
@@ -25,16 +25,21 @@ import static 
org.apache.ignite.internal.table.TableRow.tuple;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.SchemaRegistry;
-import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.partition.Partition;
 import org.apache.ignite.table.partition.PartitionManager;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -42,19 +47,25 @@ import org.junit.jupiter.api.Test;
 /**
  * Test suite for {@link PartitionManager}.
  */
-public class ItPartitionManagerTest extends ClusterPerTestIntegrationTest {
-    private static final String TABLE_NAME = "tableName";
+public abstract class ItAbstractPartitionManagerTest extends 
ClusterPerTestIntegrationTest {
+    protected static final String TABLE_NAME = "tableName";
+
+    protected static final String ZONE_NAME = "TEST_ZONE";
 
     private static final int PARTITIONS = 3;
 
+    protected abstract PartitionManager partitionManager();
+
+    protected abstract Partition toPartition(int i);
+
     @BeforeEach
     public void setup() {
-        String zoneSql = "create zone test_zone with"
+        String zoneSql = "create zone " + ZONE_NAME + " with"
                 + " partitions=" + PARTITIONS + ","
                 + " replicas=3,"
                 + " storage_profiles='" + DEFAULT_STORAGE_PROFILE + "'";
 
-        String sql = "create table " + TABLE_NAME + " (key int primary key, 
val varchar(20)) with primary_zone='TEST_ZONE'";
+        String sql = "create table " + TABLE_NAME + " (key int primary key, 
val varchar(20)) with primary_zone='" + ZONE_NAME + "'";
 
         cluster.doInSession(0, session -> {
             executeUpdate(zoneSql, session);
@@ -67,21 +78,49 @@ public class ItPartitionManagerTest extends 
ClusterPerTestIntegrationTest {
         }
     }
 
+    @Test
+    public void primaryPartitions() {
+        TableViewInternal tableViewInternal = 
unwrapTableViewInternal(cluster.aliveNode().tables().table(TABLE_NAME));
+
+        verifyPrimaryPartition(tableViewInternal, PARTITIONS);
+
+        executeSql("ALTER TABLE " + TABLE_NAME + " ADD COLUMN val1 VARCHAR 
DEFAULT 'newDefault'");
+        verifyPrimaryPartition(tableViewInternal, PARTITIONS);
+        verifyAllKeys(tableViewInternal, PARTITIONS);
+    }
+
     @Test
     public void partitionsForAllKeys() {
-        PartitionManager partitionManager = 
cluster.aliveNode().tables().table(TABLE_NAME).partitionManager();
         TableViewInternal tableViewInternal = 
unwrapTableViewInternal(cluster.aliveNode().tables().table(TABLE_NAME));
+
+        verifyAllKeys(tableViewInternal, PARTITIONS);
+    }
+
+    private void verifyPrimaryPartition(TableViewInternal tableViewInternal, 
int partitions) {
+        InternalTable internalTable = tableViewInternal.internalTable();
+
+        for (int i = 0; i < partitions; i++) {
+            CompletableFuture<ClusterNode> clusterNodeCompletableFuture = 
internalTable.partitionLocation(
+                    new TablePartitionId(internalTable.tableId(), i));
+
+            CompletableFuture<ClusterNode> clusterNodeCompletableFuture1 = 
partitionManager().primaryReplicaAsync(toPartition(i));
+
+            assertThat(clusterNodeCompletableFuture.join().id(), 
equalTo(clusterNodeCompletableFuture1.join().id()));
+        }
+    }
+
+    private void verifyAllKeys(TableViewInternal tableViewInternal, int 
partitions) {
         InternalTable internalTable = tableViewInternal.internalTable();
 
-        CompletableFuture<?>[] futures = new CompletableFuture<?>[PARTITIONS];
-        for (int i = 0; i < PARTITIONS; i++) {
+        CompletableFuture<?>[] futures = new CompletableFuture<?>[partitions];
+        for (int i = 0; i < partitions; i++) {
             CompletableFuture<Object> future = new CompletableFuture<>();
 
             futures[i] = future;
 
             Publisher<BinaryRow> scan = internalTable.scan(i, null);
 
-            HashPartition value = new HashPartition(i);
+            Partition value = toPartition(i);
 
             scan.subscribe(new Subscriber<>() {
                 @Override
@@ -95,7 +134,13 @@ public class ItPartitionManagerTest extends 
ClusterPerTestIntegrationTest {
                     Tuple tuple = tuple(registry.resolve(item, 
registry.lastKnownSchemaVersion()));
 
                     Tuple key = Tuple.create().set("key", 
tuple.intValue("key"));
-                    assertThat(partitionManager.partitionAsync(key), 
willBe(value));
+                    try {
+                        assertThat(partitionManager().partitionAsync(key), 
willBe(value));
+                    } catch (AssertionError e) {
+                        InternalTable internalTable1 = internalTable;
+                        System.out.println(e);
+                        throw e;
+                    }
                 }
 
                 @Override
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItStandalonePartitionManagerTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItStandalonePartitionManagerTest.java
new file mode 100644
index 0000000000..1e46ab8962
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItStandalonePartitionManagerTest.java
@@ -0,0 +1,16 @@
+package org.apache.ignite.internal.table.partition;
+
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionManager;
+
+public class ItStandalonePartitionManagerTest extends 
ItAbstractPartitionManagerTest {
+    @Override
+    protected PartitionManager partitionManager() {
+        return 
cluster.aliveNode().tables().table(TABLE_NAME).partitionManager();
+    }
+
+    @Override
+    protected Partition toPartition(int i) {
+        return new HashPartition(i);
+    }
+}
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItThinClientPartitionManagerTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItThinClientPartitionManagerTest.java
new file mode 100644
index 0000000000..a9f97a740f
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItThinClientPartitionManagerTest.java
@@ -0,0 +1,29 @@
+package org.apache.ignite.internal.table.partition;
+
+import org.apache.ignite.client.IgniteClient;
+import 
org.apache.ignite.internal.client.table.ClientPartitionManager.ClientHashPartition;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionManager;
+import org.junit.jupiter.api.BeforeEach;
+
+public class ItThinClientPartitionManagerTest extends 
ItAbstractPartitionManagerTest {
+    private IgniteClient client;
+
+    @BeforeEach
+    public void startClient() {
+        client = IgniteClient.builder()
+                .addresses("localhost")
+                .reconnectThrottlingPeriod(0)
+                .build();
+    }
+
+    @Override
+    protected PartitionManager partitionManager() {
+        return client.tables().table(TABLE_NAME).partitionManager();
+    }
+
+    @Override
+    protected Partition toPartition(int i) {
+        return new ClientHashPartition(i);
+    }
+}


Reply via email to