This is an automated email from the ASF dual-hosted git repository. mpochatkin pushed a commit to branch IGNITE-22149 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 72b01b0e87c985a561f0f48374e63e7e98f599c2 Author: Mikhail Pochatkin <[email protected]> AuthorDate: Wed May 15 13:29:09 2024 +0300 IGNITE-22149 Java Thin client table partition API --- .../org/apache/ignite/network/NodeMetadata.java | 9 + .../ignite/internal/client/proto/ClientOp.java | 6 + .../handler/ClientInboundMessageHandler.java | 11 ++ .../partition/ClientKeyPartitionGetRequest.java | 62 +++++++ .../ClientPartitionPrimaryGetRequest.java | 76 ++++++++ .../ClientPartitionsPrimaryGetRequest.java | 70 ++++++++ .../ignite/internal/client/TcpIgniteClient.java | 7 +- .../ignite/internal/client/TopologyCache.java | 73 ++++++++ .../client/table/ClientPartitionManager.java | 198 +++++++++++++++++++++ .../ignite/internal/client/table/ClientTable.java | 15 +- .../ignite/internal/client/table/ClientTables.java | 11 +- .../ItAbstractPartitionManagerTest.java} | 67 +++++-- .../ItStandalonePartitionManagerTest.java | 16 ++ .../ItThinClientPartitionManagerTest.java | 29 +++ 14 files changed, 632 insertions(+), 18 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/network/NodeMetadata.java b/modules/api/src/main/java/org/apache/ignite/network/NodeMetadata.java index 154c602d2c..5291973b75 100644 --- a/modules/api/src/main/java/org/apache/ignite/network/NodeMetadata.java +++ b/modules/api/src/main/java/org/apache/ignite/network/NodeMetadata.java @@ -75,4 +75,13 @@ public class NodeMetadata implements Serializable { public int hashCode() { return Objects.hash(restHost, httpPort, httpsPort); } + + @Override + public String toString() { + return "NodeMetadata{" + + "restHost='" + restHost + '\'' + + ", httpPort=" + httpPort + + ", httpsPort=" + httpsPort + + '}'; + } } diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java index 0b83a4869f..11b06c49ea 100644 --- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java +++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java @@ -170,4 +170,10 @@ public class ClientOp { /** Execute SQL query with the parameters batch. */ public static final int SQL_EXEC_BATCH = 63; + + public static final int PARTITION_PRIMARY_GET = 64; + + public static final int PARTITIONS_PRIMARY_GET = 65; + + public static final int KEY_PARTITION_PRIMARY_GET = 66; } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java index 5669f4d285..754bf8730b 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java @@ -88,6 +88,9 @@ import org.apache.ignite.client.handler.requests.table.ClientTupleReplaceExactRe import org.apache.ignite.client.handler.requests.table.ClientTupleReplaceRequest; import org.apache.ignite.client.handler.requests.table.ClientTupleUpsertAllRequest; import org.apache.ignite.client.handler.requests.table.ClientTupleUpsertRequest; +import org.apache.ignite.client.handler.requests.table.partition.ClientKeyPartitionGetRequest; +import org.apache.ignite.client.handler.requests.table.partition.ClientPartitionPrimaryGetRequest; +import org.apache.ignite.client.handler.requests.table.partition.ClientPartitionsPrimaryGetRequest; import org.apache.ignite.client.handler.requests.tx.ClientTransactionBeginRequest; import org.apache.ignite.client.handler.requests.tx.ClientTransactionCommitRequest; import org.apache.ignite.client.handler.requests.tx.ClientTransactionRollbackRequest; @@ -774,6 +777,14 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter im case ClientOp.STREAMER_BATCH_SEND: return ClientStreamerBatchSendRequest.process(in, out, igniteTables); + case ClientOp.PARTITION_PRIMARY_GET: + return ClientPartitionPrimaryGetRequest.process(in, out, igniteTables); + + case ClientOp.PARTITIONS_PRIMARY_GET: + return ClientPartitionsPrimaryGetRequest.process(in, out, igniteTables); + + case ClientOp.KEY_PARTITION_PRIMARY_GET: + return ClientKeyPartitionGetRequest.process(in, out, igniteTables); default: throw new IgniteException(PROTOCOL_ERR, "Unexpected operation code: " + opCode); } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientKeyPartitionGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientKeyPartitionGetRequest.java new file mode 100644 index 0000000000..c9f6757e9e --- /dev/null +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientKeyPartitionGetRequest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client.handler.requests.table.partition; + +import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync; +import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTuple; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.client.proto.ClientMessagePacker; +import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; +import org.apache.ignite.internal.table.partition.HashPartition; +import org.apache.ignite.table.manager.IgniteTables; + +/** + * Client key partition get request. + */ +public class ClientKeyPartitionGetRequest { + + /** + * Process the request. + * + * @param in Unpacker. + * @param out Packer. + * @param tables Ignite tables. + * @return Future which will complete after request process finished. + */ + public static CompletableFuture<Void> process( + ClientMessageUnpacker in, + ClientMessagePacker out, + IgniteTables tables + ) { + return readTableAsync(in, tables).thenCompose(table -> { + if (table == null) { + out.packNil(); + return nullCompletedFuture(); + } else { + return readTuple(in, table, true) + .thenCompose(tuple -> table.partitionManager().partitionAsync(tuple)) + .thenAccept(partition -> { + HashPartition hashPartition = (HashPartition) partition; + out.packInt(hashPartition.partitionId()); + }); + } + }); + } +} diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientPartitionPrimaryGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientPartitionPrimaryGetRequest.java new file mode 100644 index 0000000000..3c4d2264c2 --- /dev/null +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientPartitionPrimaryGetRequest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client.handler.requests.table.partition; + +import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.client.proto.ClientMessagePacker; +import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; +import org.apache.ignite.internal.table.partition.HashPartition; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.table.manager.IgniteTables; +import org.jetbrains.annotations.Nullable; + +/** + * Client table primary partition get request. + */ +public class ClientPartitionPrimaryGetRequest { + + /** + * Process the request. + * + * @param in Unpacker. + * @param out Packer. + * @param tables Ignite tables. + * @return Future which will complete after request process finished. + */ + public static CompletableFuture<Void> process( + ClientMessageUnpacker in, + ClientMessagePacker out, + IgniteTables tables + ) { + + return readTableAsync(in, tables).thenCompose(table -> { + if (table == null) { + out.packNil(); + return nullCompletedFuture(); + } else { + int partition = in.unpackInt(); + return table.partitionManager() + .primaryReplicaAsync(new HashPartition(partition)) + .thenAccept(node -> packClusterNode(node, out)); + } + }); + } + + + static void packClusterNode(@Nullable ClusterNode clusterNode, ClientMessagePacker out) { + if (clusterNode == null) { + out.packNil(); + } else { + out.packString(clusterNode.id()); + out.packString(clusterNode.name()); + NetworkAddress address = clusterNode.address(); + out.packString(address.host()); + out.packInt(address.port()); + } + } +} diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientPartitionsPrimaryGetRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientPartitionsPrimaryGetRequest.java new file mode 100644 index 0000000000..ffa23f8c86 --- /dev/null +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientPartitionsPrimaryGetRequest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client.handler.requests.table.partition; + +import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync; +import static org.apache.ignite.client.handler.requests.table.partition.ClientPartitionPrimaryGetRequest.packClusterNode; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; + +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.client.proto.ClientMessagePacker; +import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; +import org.apache.ignite.internal.table.partition.HashPartition; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.table.manager.IgniteTables; +import org.apache.ignite.table.partition.Partition; + +/** + * Client all primary partitions get request. + */ +public class ClientPartitionsPrimaryGetRequest { + + /** + * Process the request. + * + * @param in Unpacker. + * @param out Packer. + * @param tables Ignite tables. + * @return Future which will complete after request process finished. + */ + public static CompletableFuture<Void> process( + ClientMessageUnpacker in, + ClientMessagePacker out, + IgniteTables tables + ) { + return readTableAsync(in, tables).thenCompose(table -> { + if (table == null) { + out.packNil(); + return nullCompletedFuture(); + } else { + return table.partitionManager() + .primaryReplicasAsync() + .thenAccept(partitions -> { + out.packInt(partitions.size()); + for (Entry<Partition, ClusterNode> e : partitions.entrySet()) { + HashPartition partition = (HashPartition) e.getKey(); + + out.packInt(partition.partitionId()); + packClusterNode(e.getValue(), out); + } + }); + } + }); + } +} diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java index 591c83fcbc..bc9fe7cf95 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java @@ -79,6 +79,8 @@ public class TcpIgniteClient implements IgniteClient { /** Marshallers provider. */ private final ReflectionMarshallersProvider marshallers = new ReflectionMarshallersProvider(); + private final TopologyCache topologyCache; + /** * Cluster name. */ @@ -105,9 +107,11 @@ public class TcpIgniteClient implements IgniteClient { this.cfg = cfg; + topologyCache = new TopologyCache(this::clusterNodes); + metrics = new ClientMetricSource(); ch = new ReliableChannel(chFactory, cfg, metrics); - tables = new ClientTables(ch, marshallers); + tables = new ClientTables(ch, marshallers, topologyCache); transactions = new ClientTransactions(ch); compute = new ClientCompute(ch, tables); sql = new ClientSql(ch, marshallers); @@ -204,6 +208,7 @@ public class TcpIgniteClient implements IgniteClient { new NetworkAddress(r.in().unpackString(), r.in().unpackInt()))); } + topologyCache.refresh(res); return res; }); } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TopologyCache.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TopologyCache.java new file mode 100644 index 0000000000..b90564a2ad --- /dev/null +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TopologyCache.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; +import org.apache.ignite.network.ClusterNode; +import org.jetbrains.annotations.Nullable; + +/** + * Cache for server nodes topology. + */ +public class TopologyCache { + private final Map<String, ClusterNode> topology = new ConcurrentHashMap<>(); + + private Instant aliveUntil; + + private final Supplier<Collection<ClusterNode>> updateFunction; + + /** + * Creates new topology cache. + * + * @param updateFunction New server nodes topology provider. + */ + public TopologyCache(Supplier<Collection<ClusterNode>> updateFunction) { + this.updateFunction = updateFunction; + } + + /** + * Refreshes cache with new given topology. + * + * @param newTopology New server nodes topology. + */ + public void refresh(Collection<ClusterNode> newTopology) { + topology.clear(); + for (ClusterNode clusterNode : newTopology) { + topology.put(clusterNode.name(), clusterNode); + } + aliveUntil = Instant.now().plus(1, ChronoUnit.MINUTES); + } + + /** + * Returns {@link ClusterNode} for given node name. + * + * @param name Node name. + * @return Cluster node for given node name or {@code null} if node name is not presented in topology. + */ + public @Nullable ClusterNode get(String name) { + if (aliveUntil == null || Instant.now().isAfter(aliveUntil)) { + refresh(updateFunction.get()); + } + return topology.get(name); + } +} diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java new file mode 100644 index 0000000000..300f77b4fa --- /dev/null +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.table; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.client.table.ClientTupleSerializer.getPartitionAwarenessProvider; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import org.apache.ignite.internal.client.ClientClusterNode; +import org.apache.ignite.internal.client.PayloadInputChannel; +import org.apache.ignite.internal.client.PayloadOutputChannel; +import org.apache.ignite.internal.client.TopologyCache; +import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; +import org.apache.ignite.internal.client.proto.ClientOp; +import org.apache.ignite.internal.client.proto.TuplePart; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.table.Tuple; +import org.apache.ignite.table.mapper.Mapper; +import org.apache.ignite.table.partition.Partition; +import org.apache.ignite.table.partition.PartitionManager; +import org.jetbrains.annotations.Nullable; + +/** + * Client partition manager implementation. + */ +public class ClientPartitionManager implements PartitionManager { + private final ClientTable tbl; + + private final TopologyCache topologyCache; + + ClientPartitionManager(ClientTable clientTable, TopologyCache topologyCache) { + this.tbl = clientTable; + this.topologyCache = topologyCache; + } + + @Override + @SuppressWarnings("resource") + public CompletableFuture<ClusterNode> primaryReplicaAsync(Partition partition) { + if (!(partition instanceof ClientHashPartition)) { + throw new IllegalArgumentException(); + } + + ClientHashPartition clientPartition = (ClientHashPartition) partition; + return tbl.getPartitionAssignment().thenCompose(names -> { + String node = names.get(clientPartition.partitionId); + if (node == null) { + return tbl.channel().serviceAsync(ClientOp.PARTITION_PRIMARY_GET, + w -> { + w.out().packInt(tbl.tableId()); + w.out().packInt(clientPartition.partitionId); + }, + ClientPartitionManager::unpackClusterNode); + } else { + return completedFuture(topologyCache.get(node)); + } + }); + } + + @Override + @SuppressWarnings("resource") + public CompletableFuture<Map<Partition, ClusterNode>> primaryReplicasAsync() { + return tbl.channel().serviceAsync(ClientOp.PARTITIONS_PRIMARY_GET, + w -> w.out().packInt(tbl.tableId()), + r -> { + ClientMessageUnpacker in = r.in(); + + if (in.tryUnpackNil()) { + return null; + } + + int size = in.unpackInt(); + + Map<Partition, ClusterNode> res = new HashMap<>(size); + + for (int i = 0; i < size; i++) { + int partition = in.unpackInt(); + res.put(new ClientHashPartition(partition), unpackClusterNode(r)); + } + + return res; + }); + } + + @Override + public <K> CompletableFuture<Partition> partitionAsync(K key, Mapper<K> mapper) { + return getPartition(getPartitionAwarenessProvider(null, mapper, key), + (schema, w) -> ClientRecordSerializer.writeRecRaw(key, mapper, schema, w.out(), TuplePart.KEY) + ); + } + + @Override + public CompletableFuture<Partition> partitionAsync(Tuple key) { + return getPartition(getPartitionAwarenessProvider(null, key), + (schema, w) -> ClientTupleSerializer.writeTupleRaw(key, schema, w, true) + ); + } + + @SuppressWarnings("resource") + private CompletableFuture<Partition> getPartition( + PartitionAwarenessProvider partitionAwarenessProvider, + BiConsumer<ClientSchema, PayloadOutputChannel> keyPack + ) { + return tbl.getPartitionAssignment() + .thenCompose(partitions -> tbl.getLatestSchema().thenApply(schema -> { + Integer hash = partitionAwarenessProvider.getObjectHashCode(schema); + + return Math.abs(hash % partitions.size()); + })) + .exceptionally(t -> -1) + .thenCompose(partition -> { + if (partition == -1) { + return tbl.getLatestSchema() + .thenCompose(schema -> + tbl.channel().serviceAsync(ClientOp.KEY_PARTITION_PRIMARY_GET, + w -> { + w.out().packInt(tbl.tableId()); + keyPack.accept(schema, w); + }, + r -> r.in().unpackInt() + )); + } + + return completedFuture(partition); + }).thenApply(ClientHashPartition::new); + } + + private static @Nullable ClusterNode unpackClusterNode(PayloadInputChannel r) { + ClientMessageUnpacker in = r.in(); + if (in.tryUnpackNil()) { + return null; + } + + return new ClientClusterNode( + in.unpackString(), + in.unpackString(), + new NetworkAddress(in.unpackString(), in.unpackInt())); + } + + public static class ClientHashPartition implements Partition { + private static final long serialVersionUID = -2089375867774271170L; + + private final int partitionId; + + public ClientHashPartition(int partitionId) { + this.partitionId = partitionId; + if (partitionId > 2 || partitionId < 0) { + System.out.println(); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ClientHashPartition that = (ClientHashPartition) o; + + return partitionId == that.partitionId; + } + + @Override + public int hashCode() { + return partitionId; + } + + @Override + public String toString() { + return "ClientHashPartition{" + + "partitionId=" + partitionId + + '}'; + } + } +} diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java index 70956196a6..fd0830282e 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.client.ClientUtils; import org.apache.ignite.internal.client.PayloadInputChannel; import org.apache.ignite.internal.client.PayloadOutputChannel; import org.apache.ignite.internal.client.ReliableChannel; +import org.apache.ignite.internal.client.TopologyCache; import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; import org.apache.ignite.internal.client.proto.ClientOp; import org.apache.ignite.internal.client.proto.ColumnTypeConverter; @@ -77,6 +78,8 @@ public class ClientTable implements Table { private final IgniteLogger log; + private final TopologyCache topologyCache; + private static final int UNKNOWN_SCHEMA_VERSION = -1; private volatile int latestSchemaVer = UNKNOWN_SCHEMA_VERSION; @@ -95,13 +98,20 @@ public class ClientTable implements Table { * @param id Table id. * @param name Table name. */ - public ClientTable(ReliableChannel ch, MarshallersProvider marshallers, int id, String name) { + public ClientTable( + ReliableChannel ch, + MarshallersProvider marshallers, + TopologyCache topologyCache, + int id, + String name + ) { assert ch != null; assert marshallers != null; assert name != null && !name.isEmpty(); this.ch = ch; this.marshallers = marshallers; + this.topologyCache = topologyCache; this.id = id; this.name = name; this.log = ClientUtils.logger(ch.configuration(), ClientTable.class); @@ -133,9 +143,8 @@ public class ClientTable implements Table { } @Override - // TODO: IGNITE-22149 public PartitionManager partitionManager() { - throw new UnsupportedOperationException("This operation doesn't implemented yet."); + return new ClientPartitionManager(this, topologyCache); } /** {@inheritDoc} */ diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java index 80ef7baa78..f898c89a33 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTables.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.client.ReliableChannel; +import org.apache.ignite.internal.client.TopologyCache; import org.apache.ignite.internal.client.proto.ClientOp; import org.apache.ignite.internal.marshaller.MarshallersProvider; import org.apache.ignite.table.Table; @@ -37,15 +38,19 @@ public class ClientTables implements IgniteTables { private final MarshallersProvider marshallers; + private final TopologyCache topologyCache; + /** * Constructor. * * @param ch Channel. * @param marshallers Marshallers provider. + * @param topologyCache Server topology cache. */ - public ClientTables(ReliableChannel ch, MarshallersProvider marshallers) { + public ClientTables(ReliableChannel ch, MarshallersProvider marshallers, TopologyCache topologyCache) { this.ch = ch; this.marshallers = marshallers; + this.topologyCache = topologyCache; } /** {@inheritDoc} */ @@ -63,7 +68,7 @@ public class ClientTables implements IgniteTables { var res = new ArrayList<Table>(cnt); for (int i = 0; i < cnt; i++) { - res.add(new ClientTable(ch, marshallers, in.unpackInt(), in.unpackString())); + res.add(new ClientTable(ch, marshallers, topologyCache, in.unpackInt(), in.unpackString())); } return res; @@ -82,6 +87,6 @@ public class ClientTables implements IgniteTables { Objects.requireNonNull(name); return ch.serviceAsync(ClientOp.TABLE_GET, w -> w.out().packString(name), - r -> r.in().tryUnpackNil() ? null : new ClientTable(ch, marshallers, r.in().unpackInt(), name)); + r -> r.in().tryUnpackNil() ? null : new ClientTable(ch, marshallers, topologyCache, r.in().unpackInt(), name)); } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItAbstractPartitionManagerTest.java similarity index 61% rename from modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java rename to modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItAbstractPartitionManagerTest.java index 5bade5447d..28f7bb9b57 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItAbstractPartitionManagerTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.ignite.internal.table; +package org.apache.ignite.internal.table.partition; import static java.util.concurrent.CompletableFuture.allOf; import static org.apache.ignite.internal.SessionUtils.executeUpdate; @@ -25,16 +25,21 @@ import static org.apache.ignite.internal.table.TableRow.tuple; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.SchemaRegistry; -import org.apache.ignite.internal.table.partition.HashPartition; +import org.apache.ignite.internal.table.InternalTable; +import org.apache.ignite.internal.table.TableViewInternal; +import org.apache.ignite.network.ClusterNode; import org.apache.ignite.table.Tuple; +import org.apache.ignite.table.partition.Partition; import org.apache.ignite.table.partition.PartitionManager; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -42,19 +47,25 @@ import org.junit.jupiter.api.Test; /** * Test suite for {@link PartitionManager}. */ -public class ItPartitionManagerTest extends ClusterPerTestIntegrationTest { - private static final String TABLE_NAME = "tableName"; +public abstract class ItAbstractPartitionManagerTest extends ClusterPerTestIntegrationTest { + protected static final String TABLE_NAME = "tableName"; + + protected static final String ZONE_NAME = "TEST_ZONE"; private static final int PARTITIONS = 3; + protected abstract PartitionManager partitionManager(); + + protected abstract Partition toPartition(int i); + @BeforeEach public void setup() { - String zoneSql = "create zone test_zone with" + String zoneSql = "create zone " + ZONE_NAME + " with" + " partitions=" + PARTITIONS + "," + " replicas=3," + " storage_profiles='" + DEFAULT_STORAGE_PROFILE + "'"; - String sql = "create table " + TABLE_NAME + " (key int primary key, val varchar(20)) with primary_zone='TEST_ZONE'"; + String sql = "create table " + TABLE_NAME + " (key int primary key, val varchar(20)) with primary_zone='" + ZONE_NAME + "'"; cluster.doInSession(0, session -> { executeUpdate(zoneSql, session); @@ -67,21 +78,49 @@ public class ItPartitionManagerTest extends ClusterPerTestIntegrationTest { } } + @Test + public void primaryPartitions() { + TableViewInternal tableViewInternal = unwrapTableViewInternal(cluster.aliveNode().tables().table(TABLE_NAME)); + + verifyPrimaryPartition(tableViewInternal, PARTITIONS); + + executeSql("ALTER TABLE " + TABLE_NAME + " ADD COLUMN val1 VARCHAR DEFAULT 'newDefault'"); + verifyPrimaryPartition(tableViewInternal, PARTITIONS); + verifyAllKeys(tableViewInternal, PARTITIONS); + } + @Test public void partitionsForAllKeys() { - PartitionManager partitionManager = cluster.aliveNode().tables().table(TABLE_NAME).partitionManager(); TableViewInternal tableViewInternal = unwrapTableViewInternal(cluster.aliveNode().tables().table(TABLE_NAME)); + + verifyAllKeys(tableViewInternal, PARTITIONS); + } + + private void verifyPrimaryPartition(TableViewInternal tableViewInternal, int partitions) { + InternalTable internalTable = tableViewInternal.internalTable(); + + for (int i = 0; i < partitions; i++) { + CompletableFuture<ClusterNode> clusterNodeCompletableFuture = internalTable.partitionLocation( + new TablePartitionId(internalTable.tableId(), i)); + + CompletableFuture<ClusterNode> clusterNodeCompletableFuture1 = partitionManager().primaryReplicaAsync(toPartition(i)); + + assertThat(clusterNodeCompletableFuture.join().id(), equalTo(clusterNodeCompletableFuture1.join().id())); + } + } + + private void verifyAllKeys(TableViewInternal tableViewInternal, int partitions) { InternalTable internalTable = tableViewInternal.internalTable(); - CompletableFuture<?>[] futures = new CompletableFuture<?>[PARTITIONS]; - for (int i = 0; i < PARTITIONS; i++) { + CompletableFuture<?>[] futures = new CompletableFuture<?>[partitions]; + for (int i = 0; i < partitions; i++) { CompletableFuture<Object> future = new CompletableFuture<>(); futures[i] = future; Publisher<BinaryRow> scan = internalTable.scan(i, null); - HashPartition value = new HashPartition(i); + Partition value = toPartition(i); scan.subscribe(new Subscriber<>() { @Override @@ -95,7 +134,13 @@ public class ItPartitionManagerTest extends ClusterPerTestIntegrationTest { Tuple tuple = tuple(registry.resolve(item, registry.lastKnownSchemaVersion())); Tuple key = Tuple.create().set("key", tuple.intValue("key")); - assertThat(partitionManager.partitionAsync(key), willBe(value)); + try { + assertThat(partitionManager().partitionAsync(key), willBe(value)); + } catch (AssertionError e) { + InternalTable internalTable1 = internalTable; + System.out.println(e); + throw e; + } } @Override diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItStandalonePartitionManagerTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItStandalonePartitionManagerTest.java new file mode 100644 index 0000000000..1e46ab8962 --- /dev/null +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItStandalonePartitionManagerTest.java @@ -0,0 +1,16 @@ +package org.apache.ignite.internal.table.partition; + +import org.apache.ignite.table.partition.Partition; +import org.apache.ignite.table.partition.PartitionManager; + +public class ItStandalonePartitionManagerTest extends ItAbstractPartitionManagerTest { + @Override + protected PartitionManager partitionManager() { + return cluster.aliveNode().tables().table(TABLE_NAME).partitionManager(); + } + + @Override + protected Partition toPartition(int i) { + return new HashPartition(i); + } +} diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItThinClientPartitionManagerTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItThinClientPartitionManagerTest.java new file mode 100644 index 0000000000..a9f97a740f --- /dev/null +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItThinClientPartitionManagerTest.java @@ -0,0 +1,29 @@ +package org.apache.ignite.internal.table.partition; + +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.internal.client.table.ClientPartitionManager.ClientHashPartition; +import org.apache.ignite.table.partition.Partition; +import org.apache.ignite.table.partition.PartitionManager; +import org.junit.jupiter.api.BeforeEach; + +public class ItThinClientPartitionManagerTest extends ItAbstractPartitionManagerTest { + private IgniteClient client; + + @BeforeEach + public void startClient() { + client = IgniteClient.builder() + .addresses("localhost") + .reconnectThrottlingPeriod(0) + .build(); + } + + @Override + protected PartitionManager partitionManager() { + return client.tables().table(TABLE_NAME).partitionManager(); + } + + @Override + protected Partition toPartition(int i) { + return new ClientHashPartition(i); + } +}
