This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 1ae5d75b97 IGNITE-22149 Java thin: Implement partition API (#3764)
1ae5d75b97 is described below
commit 1ae5d75b973ad8069af886e1dcfcfa2f9c801ab4
Author: Mikhail <[email protected]>
AuthorDate: Wed May 22 17:24:01 2024 +0300
IGNITE-22149 Java thin: Implement partition API (#3764)
---
.../org/apache/ignite/network/NodeMetadata.java | 9 ++
.../ignite/internal/client/proto/ClientOp.java | 3 +
.../handler/ClientInboundMessageHandler.java | 3 +
.../cluster/ClientClusterGetNodesRequest.java | 25 ++-
...blePartitionPrimaryReplicasNodesGetRequest.java | 65 ++++++++
.../apache/ignite/client/ClientOperationType.java | 8 +-
.../org/apache/ignite/client/RetryReadPolicy.java | 1 +
.../apache/ignite/internal/client/ClientUtils.java | 3 +
.../ignite/internal/client/TcpIgniteClient.java | 27 +++-
.../client/table/ClientPartitionManager.java | 169 +++++++++++++++++++++
.../ignite/internal/client/table/ClientTable.java | 13 +-
.../internal/table/partition/HashPartition.java | 0
.../ItAbstractPartitionManagerTest.java} | 59 +++++--
.../ItStandalonePartitionManagerTest.java} | 36 +----
.../ItThinClientPartitionManagerTest.java} | 44 ++----
15 files changed, 378 insertions(+), 87 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/network/NodeMetadata.java
b/modules/api/src/main/java/org/apache/ignite/network/NodeMetadata.java
index 154c602d2c..5291973b75 100644
--- a/modules/api/src/main/java/org/apache/ignite/network/NodeMetadata.java
+++ b/modules/api/src/main/java/org/apache/ignite/network/NodeMetadata.java
@@ -75,4 +75,13 @@ public class NodeMetadata implements Serializable {
public int hashCode() {
return Objects.hash(restHost, httpPort, httpsPort);
}
+
+ @Override
+ public String toString() {
+ return "NodeMetadata{"
+ + "restHost='" + restHost + '\''
+ + ", httpPort=" + httpPort
+ + ", httpsPort=" + httpsPort
+ + '}';
+ }
}
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index bb371faaed..dd670e89ae 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -175,4 +175,7 @@ public class ClientOp {
* Execute MapReduce task.
*/
public static final int COMPUTE_EXECUTE_MAPREDUCE = 64;
+
+ /** Get all primary replicas mapping to cluster nodes. */
+ public static final int PRIMARY_REPLICAS_GET = 65;
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index a9a75b8405..360e80c6cb 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -89,6 +89,7 @@ import
org.apache.ignite.client.handler.requests.table.ClientTupleReplaceExactRe
import
org.apache.ignite.client.handler.requests.table.ClientTupleReplaceRequest;
import
org.apache.ignite.client.handler.requests.table.ClientTupleUpsertAllRequest;
import
org.apache.ignite.client.handler.requests.table.ClientTupleUpsertRequest;
+import
org.apache.ignite.client.handler.requests.table.partition.ClientTablePartitionPrimaryReplicasNodesGetRequest;
import
org.apache.ignite.client.handler.requests.tx.ClientTransactionBeginRequest;
import
org.apache.ignite.client.handler.requests.tx.ClientTransactionCommitRequest;
import
org.apache.ignite.client.handler.requests.tx.ClientTransactionRollbackRequest;
@@ -778,6 +779,8 @@ public class ClientInboundMessageHandler extends
ChannelInboundHandlerAdapter im
case ClientOp.STREAMER_BATCH_SEND:
return ClientStreamerBatchSendRequest.process(in, out,
igniteTables);
+ case ClientOp.PRIMARY_REPLICAS_GET:
+ return
ClientTablePartitionPrimaryReplicasNodesGetRequest.process(in, out,
igniteTables);
default:
throw new IgniteException(PROTOCOL_ERR, "Unexpected operation
code: " + opCode);
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java
index f0c8fa7b8b..dfe562444d 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/cluster/ClientClusterGetNodesRequest.java
@@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.NetworkAddress;
/**
* Cluster nodes request.
@@ -42,15 +43,27 @@ public class ClientClusterGetNodesRequest {
out.packInt(nodes.size());
for (ClusterNode node : nodes) {
- out.packInt(4);
-
- out.packString(node.id());
- out.packString(node.name());
- out.packString(node.address().host());
- out.packInt(node.address().port());
+ packClusterNode(node, out);
}
// Null future indicates synchronous completion.
return null;
}
+
+ /**
+ * Pack {@link ClusterNode} instance to client message.
+ *
+ * @param clusterNode Cluster node.
+ * @param out Client message packer.
+ */
+ public static void packClusterNode(ClusterNode clusterNode,
ClientMessagePacker out) {
+ out.packInt(4);
+
+ out.packString(clusterNode.id());
+ out.packString(clusterNode.name());
+
+ NetworkAddress address = clusterNode.address();
+ out.packString(address.host());
+ out.packInt(address.port());
+ }
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientTablePartitionPrimaryReplicasNodesGetRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientTablePartitionPrimaryReplicasNodesGetRequest.java
new file mode 100644
index 0000000000..dc01d07815
--- /dev/null
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/partition/ClientTablePartitionPrimaryReplicasNodesGetRequest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.table.partition;
+
+import static
org.apache.ignite.client.handler.requests.cluster.ClientClusterGetNodesRequest.packClusterNode;
+import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTableAsync;
+
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import
org.apache.ignite.client.handler.requests.table.ClientTablePartitionPrimaryReplicasGetRequest;
+import org.apache.ignite.internal.client.proto.ClientMessagePacker;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.manager.IgniteTables;
+import org.apache.ignite.table.partition.Partition;
+
+/**
+ * Client primary replicas with node info retrieval request.
+ * See also {@link ClientTablePartitionPrimaryReplicasGetRequest}.
+ */
+public class ClientTablePartitionPrimaryReplicasNodesGetRequest {
+
+ /**
+ * Process the request.
+ *
+ * @param in Unpacker.
+ * @param out Packer.
+ * @param tables Ignite tables.
+ * @return Future which will complete after request process finished.
+ */
+ public static CompletableFuture<Void> process(
+ ClientMessageUnpacker in,
+ ClientMessagePacker out,
+ IgniteTables tables
+ ) {
+ return readTableAsync(in, tables).thenCompose(table ->
table.partitionManager()
+ .primaryReplicasAsync()
+ .thenAccept(partitions -> {
+ out.packInt(partitions.size());
+ for (Entry<Partition, ClusterNode> e :
partitions.entrySet()) {
+ HashPartition partition = (HashPartition) e.getKey();
+
+ out.packInt(partition.partitionId());
+
+ packClusterNode(e.getValue(), out);
+ }
+ }));
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
index 820e111de4..916621f3d5 100644
---
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
+++
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
@@ -26,6 +26,7 @@ import org.apache.ignite.sql.async.AsyncResultSet;
import org.apache.ignite.table.DataStreamerTarget;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.manager.IgniteTables;
+import org.apache.ignite.table.partition.PartitionManager;
import org.apache.ignite.tx.Transaction;
/**
@@ -175,5 +176,10 @@ public enum ClientOperationType {
/**
* SQL Execute batch ({@link IgniteSql#executeBatchAsync(Transaction,
String, BatchedArguments)}).
*/
- SQL_EXECUTE_BATCH
+ SQL_EXECUTE_BATCH,
+
+ /**
+ * Get all primary replicas mapping to cluster nodes ({@link
PartitionManager#primaryReplicasAsync()}).
+ */
+ PRIMARY_REPLICAS_GET
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
index 4912f2a2db..eec5bbb004 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
@@ -60,6 +60,7 @@ public class RetryReadPolicy extends RetryLimitPolicy {
case SQL_CURSOR_NEXT_PAGE:
case SQL_EXECUTE_SCRIPT:
case STREAMER_BATCH_SEND:
+ case PRIMARY_REPLICAS_GET:
return false;
default:
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
index bc76849b24..1d1d70709a 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
@@ -257,6 +257,9 @@ public class ClientUtils {
case ClientOp.SQL_EXEC_BATCH:
return ClientOperationType.SQL_EXECUTE_BATCH;
+ case ClientOp.PRIMARY_REPLICAS_GET:
+ return ClientOperationType.PRIMARY_REPLICAS_GET;
+
// Do not return null from default arm intentionally, so we don't
forget to update this when new ClientOp values are added.
default:
throw new UnsupportedOperationException("Invalid op code: " +
opCode);
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index 591c83fcbc..4a6adf9208 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -30,6 +30,7 @@ import org.apache.ignite.client.IgniteClientConfiguration;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.catalog.sql.IgniteCatalogSqlImpl;
import org.apache.ignite.internal.client.compute.ClientCompute;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.sql.ClientSql;
import org.apache.ignite.internal.client.table.ClientTables;
@@ -195,13 +196,9 @@ public class TcpIgniteClient implements IgniteClient {
List<ClusterNode> res = new ArrayList<>(cnt);
for (int i = 0; i < cnt; i++) {
- int fieldCnt = r.in().unpackInt();
- assert fieldCnt == 4;
+ ClusterNode clusterNode = unpackClusterNode(r);
- res.add(new ClientClusterNode(
- r.in().unpackString(),
- r.in().unpackString(),
- new NetworkAddress(r.in().unpackString(),
r.in().unpackInt())));
+ res.add(clusterNode);
}
return res;
@@ -266,4 +263,22 @@ public class TcpIgniteClient implements IgniteClient {
public <T extends ClientMessage> CompletableFuture<T> sendRequestAsync(int
opCode, PayloadWriter writer, PayloadReader<T> reader) {
return ch.serviceAsync(opCode, writer, reader);
}
+
+ /**
+ * Tries to unpack {@link ClusterNode} instance from input channel.
+ *
+ * @param r Payload input channel.
+ * @return Cluster node or {@code null} if message doesn't contain cluster
node.
+ */
+ public static ClusterNode unpackClusterNode(PayloadInputChannel r) {
+ ClientMessageUnpacker in = r.in();
+
+ int fieldCnt = r.in().unpackInt();
+ assert fieldCnt == 4;
+
+ return new ClientClusterNode(
+ in.unpackString(),
+ in.unpackString(),
+ new NetworkAddress(in.unpackString(), in.unpackInt()));
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
new file mode 100644
index 0000000000..ad0494985d
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientPartitionManager.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.table;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.client.TcpIgniteClient.unpackClusterNode;
+import static
org.apache.ignite.internal.client.table.ClientTupleSerializer.getPartitionAwarenessProvider;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionManager;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Client partition manager implementation.
+ */
+public class ClientPartitionManager implements PartitionManager {
+ private final ClientTable tbl;
+
+ private final Lock lock = new ReentrantLock();
+
+ private final Map<Partition, ClusterNode> cache = new HashMap<>();
+
+ private @Nullable Instant aliveUntil;
+
+ ClientPartitionManager(ClientTable clientTable) {
+ this.tbl = clientTable;
+ }
+
+ @Override
+ public CompletableFuture<ClusterNode> primaryReplicaAsync(Partition
partition) {
+ if (!(partition instanceof HashPartition)) {
+ throw new IllegalArgumentException("Unsupported partition type: "
+ partition);
+ }
+
+ ClusterNode clusterNode = getClusterNode(partition);
+
+ if (clusterNode != null) {
+ return completedFuture(clusterNode);
+ }
+
+ return primaryReplicasAsync()
+ .thenApply(map -> map.get(partition));
+ }
+
+ @Override
+ @SuppressWarnings("resource")
+ public CompletableFuture<Map<Partition, ClusterNode>>
primaryReplicasAsync() {
+ Map<Partition, ClusterNode> cache = lookupCache();
+
+ if (cache != null) {
+ return completedFuture(cache);
+ }
+
+ return tbl.channel().serviceAsync(ClientOp.PRIMARY_REPLICAS_GET,
+ w -> w.out().packInt(tbl.tableId()),
+ r -> {
+ ClientMessageUnpacker in = r.in();
+
+ if (in.tryUnpackNil()) {
+ return Collections.<Partition, ClusterNode>emptyMap();
+ }
+
+ int size = in.unpackInt();
+
+ Map<Partition, ClusterNode> res = new HashMap<>(size);
+
+ for (int i = 0; i < size; i++) {
+ int partition = in.unpackInt();
+ res.put(new HashPartition(partition),
unpackClusterNode(r));
+ }
+
+ return res;
+ })
+ .thenApply(this::updateCache);
+ }
+
+ @Override
+ public <K> CompletableFuture<Partition> partitionAsync(K key, Mapper<K>
mapper) {
+ Objects.requireNonNull(key, "Key is null.");
+ Objects.requireNonNull(mapper, "Mapper is null.");
+
+ return getPartition(getPartitionAwarenessProvider(null, mapper, key));
+ }
+
+ @Override
+ public CompletableFuture<Partition> partitionAsync(Tuple key) {
+ Objects.requireNonNull(key, "Key is null.");
+
+ return getPartition(getPartitionAwarenessProvider(null, key));
+ }
+
+ private @Nullable ClusterNode getClusterNode(Partition partition) {
+ lock.lock();
+ try {
+ if (aliveUntil == null || Instant.now().isAfter(aliveUntil)) {
+ cache.clear();
+ aliveUntil = null;
+ return null;
+ }
+ return cache.get(partition);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private @Nullable Map<Partition, ClusterNode> lookupCache() {
+ lock.lock();
+ try {
+ if (aliveUntil == null || Instant.now().isAfter(aliveUntil)) {
+ cache.clear();
+ aliveUntil = null;
+ return null;
+ }
+ return Map.copyOf(cache);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private Map<Partition, ClusterNode> updateCache(Map<Partition,
ClusterNode> map) {
+ lock.lock();
+ try {
+ cache.putAll(map);
+ aliveUntil = Instant.now().plus(1, ChronoUnit.MINUTES);
+ return map;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private CompletableFuture<Partition>
getPartition(PartitionAwarenessProvider partitionAwarenessProvider) {
+ return tbl.getPartitionAssignment()
+ .thenCompose(partitions ->
tbl.getLatestSchema().thenApply(schema -> {
+ Integer hash =
partitionAwarenessProvider.getObjectHashCode(schema);
+
+ return new HashPartition(Math.abs(hash %
partitions.size()));
+ }));
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index 70956196a6..9818661563 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -87,6 +87,8 @@ public class ClientTable implements Table {
private volatile PartitionAssignment partitionAssignment = null;
+ private final ClientPartitionManager clientPartitionManager;
+
/**
* Constructor.
*
@@ -95,7 +97,12 @@ public class ClientTable implements Table {
* @param id Table id.
* @param name Table name.
*/
- public ClientTable(ReliableChannel ch, MarshallersProvider marshallers,
int id, String name) {
+ public ClientTable(
+ ReliableChannel ch,
+ MarshallersProvider marshallers,
+ int id,
+ String name
+ ) {
assert ch != null;
assert marshallers != null;
assert name != null && !name.isEmpty();
@@ -106,6 +113,7 @@ public class ClientTable implements Table {
this.name = name;
this.log = ClientUtils.logger(ch.configuration(), ClientTable.class);
this.sql = new ClientSql(ch, marshallers);
+ clientPartitionManager = new ClientPartitionManager(this);
}
/**
@@ -133,9 +141,8 @@ public class ClientTable implements Table {
}
@Override
- // TODO: IGNITE-22149
public PartitionManager partitionManager() {
- throw new UnsupportedOperationException("This operation doesn't
implemented yet.");
+ return clientPartitionManager;
}
/** {@inheritDoc} */
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
b/modules/core/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
similarity index 100%
copy from
modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItAbstractPartitionManagerTest.java
similarity index 64%
rename from
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java
rename to
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItAbstractPartitionManagerTest.java
index 5bade5447d..8ca8096487 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItAbstractPartitionManagerTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table;
+package org.apache.ignite.internal.table.partition;
import static java.util.concurrent.CompletableFuture.allOf;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
@@ -25,16 +25,21 @@ import static
org.apache.ignite.internal.table.TableRow.tuple;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.SchemaRegistry;
-import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.partition.Partition;
import org.apache.ignite.table.partition.PartitionManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -42,19 +47,23 @@ import org.junit.jupiter.api.Test;
/**
* Test suite for {@link PartitionManager}.
*/
-public class ItPartitionManagerTest extends ClusterPerTestIntegrationTest {
- private static final String TABLE_NAME = "tableName";
+public abstract class ItAbstractPartitionManagerTest extends
ClusterPerTestIntegrationTest {
+ protected static final String TABLE_NAME = "tableName";
+
+ protected static final String ZONE_NAME = "TEST_ZONE";
private static final int PARTITIONS = 3;
+ protected abstract PartitionManager partitionManager();
+
@BeforeEach
public void setup() {
- String zoneSql = "create zone test_zone with"
+ String zoneSql = "create zone " + ZONE_NAME + " with"
+ " partitions=" + PARTITIONS + ","
+ " replicas=3,"
+ " storage_profiles='" + DEFAULT_STORAGE_PROFILE + "'";
- String sql = "create table " + TABLE_NAME + " (key int primary key,
val varchar(20)) with primary_zone='TEST_ZONE'";
+ String sql = "create table " + TABLE_NAME + " (key int primary key,
val varchar(20)) with primary_zone='" + ZONE_NAME + "'";
cluster.doInSession(0, session -> {
executeUpdate(zoneSql, session);
@@ -67,21 +76,49 @@ public class ItPartitionManagerTest extends
ClusterPerTestIntegrationTest {
}
}
+ @Test
+ public void primaryPartitions() {
+ TableViewInternal tableViewInternal =
unwrapTableViewInternal(cluster.aliveNode().tables().table(TABLE_NAME));
+
+ verifyPrimaryPartition(tableViewInternal, PARTITIONS);
+
+ executeSql("ALTER TABLE " + TABLE_NAME + " ADD COLUMN val1 VARCHAR
DEFAULT 'newDefault'");
+ verifyPrimaryPartition(tableViewInternal, PARTITIONS);
+ verifyAllKeys(tableViewInternal, PARTITIONS);
+ }
+
@Test
public void partitionsForAllKeys() {
- PartitionManager partitionManager =
cluster.aliveNode().tables().table(TABLE_NAME).partitionManager();
TableViewInternal tableViewInternal =
unwrapTableViewInternal(cluster.aliveNode().tables().table(TABLE_NAME));
+
+ verifyAllKeys(tableViewInternal, PARTITIONS);
+ }
+
+ private void verifyPrimaryPartition(TableViewInternal tableViewInternal,
int partitions) {
+ InternalTable internalTable = tableViewInternal.internalTable();
+
+ for (int i = 0; i < partitions; i++) {
+ CompletableFuture<ClusterNode> clusterNodeCompletableFuture =
internalTable.partitionLocation(
+ new TablePartitionId(internalTable.tableId(), i));
+
+ CompletableFuture<ClusterNode> clusterNodeCompletableFuture1 =
partitionManager().primaryReplicaAsync(new HashPartition(i));
+
+ assertThat(clusterNodeCompletableFuture.join().id(),
equalTo(clusterNodeCompletableFuture1.join().id()));
+ }
+ }
+
+ private void verifyAllKeys(TableViewInternal tableViewInternal, int
partitions) {
InternalTable internalTable = tableViewInternal.internalTable();
- CompletableFuture<?>[] futures = new CompletableFuture<?>[PARTITIONS];
- for (int i = 0; i < PARTITIONS; i++) {
+ CompletableFuture<?>[] futures = new CompletableFuture<?>[partitions];
+ for (int i = 0; i < partitions; i++) {
CompletableFuture<Object> future = new CompletableFuture<>();
futures[i] = future;
Publisher<BinaryRow> scan = internalTable.scan(i, null);
- HashPartition value = new HashPartition(i);
+ Partition value = new HashPartition(i);
scan.subscribe(new Subscriber<>() {
@Override
@@ -95,7 +132,7 @@ public class ItPartitionManagerTest extends
ClusterPerTestIntegrationTest {
Tuple tuple = tuple(registry.resolve(item,
registry.lastKnownSchemaVersion()));
Tuple key = Tuple.create().set("key",
tuple.intValue("key"));
- assertThat(partitionManager.partitionAsync(key),
willBe(value));
+ assertThat(partitionManager().partitionAsync(key),
willBe(value));
}
@Override
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItStandalonePartitionManagerTest.java
similarity index 53%
copy from
modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
copy to
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItStandalonePartitionManagerTest.java
index da622a7d08..b3fee3fee3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItStandalonePartitionManagerTest.java
@@ -17,40 +17,14 @@
package org.apache.ignite.internal.table.partition;
-import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionManager;
/**
- * Hash partition representation.
+ * Embedded mode implementation of partition management test suite.
*/
-public class HashPartition implements Partition {
- private static final long serialVersionUID = 1717320056615864614L;
-
- private final int partitionId;
-
- public HashPartition(int partitionId) {
- this.partitionId = partitionId;
- }
-
- public int partitionId() {
- return partitionId;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- HashPartition that = (HashPartition) o;
-
- return partitionId == that.partitionId;
- }
-
+public class ItStandalonePartitionManagerTest extends
ItAbstractPartitionManagerTest {
@Override
- public int hashCode() {
- return partitionId;
+ protected PartitionManager partitionManager() {
+ return
cluster.aliveNode().tables().table(TABLE_NAME).partitionManager();
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItThinClientPartitionManagerTest.java
similarity index 54%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
rename to
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItThinClientPartitionManagerTest.java
index da622a7d08..6a08ffdb7b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/partition/ItThinClientPartitionManagerTest.java
@@ -17,40 +17,26 @@
package org.apache.ignite.internal.table.partition;
-import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.table.partition.PartitionManager;
+import org.junit.jupiter.api.BeforeEach;
/**
- * Hash partition representation.
+ * Thin client implementation of partition management test suite.
*/
-public class HashPartition implements Partition {
- private static final long serialVersionUID = 1717320056615864614L;
-
- private final int partitionId;
-
- public HashPartition(int partitionId) {
- this.partitionId = partitionId;
- }
-
- public int partitionId() {
- return partitionId;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- HashPartition that = (HashPartition) o;
-
- return partitionId == that.partitionId;
+public class ItThinClientPartitionManagerTest extends
ItAbstractPartitionManagerTest {
+ private IgniteClient client;
+
+ @BeforeEach
+ public void startClient() {
+ client = IgniteClient.builder()
+ .addresses("localhost")
+ .reconnectThrottlingPeriod(0)
+ .build();
}
@Override
- public int hashCode() {
- return partitionId;
+ protected PartitionManager partitionManager() {
+ return client.tables().table(TABLE_NAME).partitionManager();
}
}