This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2a6cc4f293 IGNITE-22065 Introduce partition public API (#3673)
2a6cc4f293 is described below
commit 2a6cc4f2930db3c260e9b5b014bfa22896af7759
Author: Mikhail <[email protected]>
AuthorDate: Fri May 10 13:58:40 2024 +0300
IGNITE-22065 Introduce partition public API (#3673)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 3 +
.../lang/UnsupportedPartitionTypeException.java | 51 ++++++++
.../main/java/org/apache/ignite/table/Table.java | 8 ++
.../ignite/table/criteria/CriteriaVisitor.java | 9 ++
.../ignite/table/criteria/PartitionCriteria.java | 32 +++++
.../apache/ignite/table/partition/Partition.java | 27 +++++
.../ignite/table/partition/PartitionManager.java | 65 +++++++++++
.../ignite/internal/client/table/ClientTable.java | 7 ++
.../ignite/client/fakes/FakeInternalTable.java | 6 +
.../internal/table/criteria/ColumnValidator.java | 6 +
.../internal/table/criteria/SqlSerializer.java | 7 ++
modules/platforms/cpp/ignite/common/error_codes.h | 1 +
modules/platforms/cpp/ignite/odbc/common_types.cpp | 1 +
.../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs | 3 +
.../internal/table/ItPartitionManagerTest.java | 115 ++++++++++++++++++
.../ignite/internal/table/InternalTable.java | 9 ++
.../apache/ignite/internal/table/TableImpl.java | 14 ++-
.../table/distributed/PublicApiThreadingTable.java | 6 +
.../distributed/storage/InternalTableImpl.java | 50 ++++----
.../internal/table/partition/HashPartition.java | 56 +++++++++
.../table/partition/HashPartitionManagerImpl.java | 129 +++++++++++++++++++++
21 files changed, 580 insertions(+), 25 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 217eac851e..6ef49bb962 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -172,6 +172,9 @@ public class ErrorGroups {
/** Schema version mismatch. */
public static final int SCHEMA_VERSION_MISMATCH_ERR =
TABLE_ERR_GROUP.registerErrorCode((short) 7);
+
+ /** Unsupported partition type. */
+ public static final int UNSUPPORTED_PARTITION_TYPE_ERR =
TABLE_ERR_GROUP.registerErrorCode((short) 8);
}
/** Client error group. */
diff --git
a/modules/api/src/main/java/org/apache/ignite/lang/UnsupportedPartitionTypeException.java
b/modules/api/src/main/java/org/apache/ignite/lang/UnsupportedPartitionTypeException.java
new file mode 100644
index 0000000000..f9321b6acf
--- /dev/null
+++
b/modules/api/src/main/java/org/apache/ignite/lang/UnsupportedPartitionTypeException.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import static
org.apache.ignite.lang.ErrorGroups.Table.UNSUPPORTED_PARTITION_TYPE_ERR;
+
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This exception is thrown when a table does not support the specified
partition type.
+ */
+public class UnsupportedPartitionTypeException extends IgniteException {
+ private static final long serialVersionUID = 6875586826126580903L;
+
+ /**
+ * Creates a new exception with the given message.
+ *
+ * @param message Exception message.
+ */
+ public UnsupportedPartitionTypeException(String message) {
+ super(UNSUPPORTED_PARTITION_TYPE_ERR, message);
+ }
+
+ /**
+ * Creates a new exception with the given trace id, error code, detail
message and cause.
+ *
+ * @param traceId Unique identifier of this exception.
+ * @param code Full error code.
+ * @param message Detail message.
+ * @param cause Optional nested exception (can be {@code null}).
+ */
+ public UnsupportedPartitionTypeException(UUID traceId, int code, String
message, @Nullable Throwable cause) {
+ super(traceId, code, message, cause);
+ }
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/table/Table.java
b/modules/api/src/main/java/org/apache/ignite/table/Table.java
index 2af898f65d..3f39df5e74 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/Table.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/Table.java
@@ -18,6 +18,7 @@
package org.apache.ignite.table;
import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.PartitionManager;
/**
* Table provides the different views (key-value vs record) and approaches
(mapped-object vs binary) to access the data.
@@ -37,6 +38,13 @@ public interface Table {
*/
String name();
+ /**
+ * Gets a partition manager of a table.
+ *
+ * @return Partition manager.
+ */
+ PartitionManager partitionManager();
+
/**
* Creates a record view of a table for the record class mapper provided.
*
diff --git
a/modules/api/src/main/java/org/apache/ignite/table/criteria/CriteriaVisitor.java
b/modules/api/src/main/java/org/apache/ignite/table/criteria/CriteriaVisitor.java
index a40742b0cb..d8fe77f404 100644
---
a/modules/api/src/main/java/org/apache/ignite/table/criteria/CriteriaVisitor.java
+++
b/modules/api/src/main/java/org/apache/ignite/table/criteria/CriteriaVisitor.java
@@ -17,6 +17,7 @@
package org.apache.ignite.table.criteria;
+import org.apache.ignite.table.partition.Partition;
import org.jetbrains.annotations.Nullable;
/**
@@ -49,6 +50,14 @@ public interface CriteriaVisitor<C> {
*/
<T> void visit(Expression expression, @Nullable C context);
+ /**
+ * Visit a {@link Partition} instance with the given context.
+ *
+ * @param partition Partition to visit.
+ * @param context context of the visit or {@code null}, if not used.
+ */
+ void visit(PartitionCriteria partition, @Nullable C context);
+
/**
* Visit a {@link Criteria} instance with the given context.
*
diff --git
a/modules/api/src/main/java/org/apache/ignite/table/criteria/PartitionCriteria.java
b/modules/api/src/main/java/org/apache/ignite/table/criteria/PartitionCriteria.java
new file mode 100644
index 0000000000..3f6ff0e5ef
--- /dev/null
+++
b/modules/api/src/main/java/org/apache/ignite/table/criteria/PartitionCriteria.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table.criteria;
+
+import org.apache.ignite.table.partition.Partition;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents a partition reference for criteria query.
+ */
+// TODO: IGNITE-22153
+public class PartitionCriteria implements Partition, Criteria {
+ @Override
+ public <C> void accept(CriteriaVisitor<C> v, @Nullable C context) {
+ v.visit(this, context);
+ }
+}
diff --git
a/modules/api/src/main/java/org/apache/ignite/table/partition/Partition.java
b/modules/api/src/main/java/org/apache/ignite/table/partition/Partition.java
new file mode 100644
index 0000000000..27333d15b9
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/table/partition/Partition.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table.partition;
+
+import java.io.Serializable;
+
+/**
+ * Marker interface which represents a partition reference.
+ */
+public interface Partition extends Serializable {
+
+}
diff --git
a/modules/api/src/main/java/org/apache/ignite/table/partition/PartitionManager.java
b/modules/api/src/main/java/org/apache/ignite/table/partition/PartitionManager.java
new file mode 100644
index 0000000000..bace22bf19
--- /dev/null
+++
b/modules/api/src/main/java/org/apache/ignite/table/partition/PartitionManager.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table.partition;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+
+/**
+ * The partition manager provides the ability to obtain information about
table partitions.
+ * This interface can be used to get all partitions of a table,
+ * the location of the primary replica of a partition,
+ * the partition for a specific table key.
+ */
+public interface PartitionManager {
+ /**
+ * Returns location of primary replica for provided partition.
+ *
+ * @param partition Partition instance.
+ * @return Cluster node where primary replica of provided partition is
located.
+ */
+ CompletableFuture<ClusterNode> primaryReplicaAsync(Partition partition);
+
+ /**
+ * Returns map with all partitions and their locations.
+ *
+ * @return Map from partition to cluster node where primary replica of the
partition is located.
+ */
+ CompletableFuture<Map<Partition, ClusterNode>> primaryReplicasAsync();
+
+ /**
+ * Returns partition instance for provided table key.
+ *
+ * @param key Table key.
+ * @param mapper Table key mapper.
+ * @param <K> Key type.
+ * @return Partition instance which contains provided key.
+ */
+ <K> CompletableFuture<Partition> partitionAsync(K key, Mapper<K> mapper);
+
+ /**
+ * Returns partition instance for provided table key.
+ *
+ * @param key Table key tuple.
+ * @return Partition instance which contains provided key.
+ */
+ CompletableFuture<Partition> partitionAsync(Tuple key);
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index d8d6d85c18..70956196a6 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -54,6 +54,7 @@ import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.PartitionManager;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
@@ -131,6 +132,12 @@ public class ClientTable implements Table {
return name;
}
+ @Override
+ // TODO: IGNITE-22149
+ public PartitionManager partitionManager() {
+ throw new UnsupportedOperationException("This operation doesn't
implemented yet.");
+ }
+
/** {@inheritDoc} */
@Override
public <R> RecordView<R> recordView(Mapper<R> recMapper) {
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 8016953e97..6fbcbe5f98 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -37,6 +37,7 @@ import java.util.function.BiConsumer;
import javax.naming.OperationNotSupportedException;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -498,4 +499,9 @@ public class FakeInternalTable implements InternalTable {
public ScheduledExecutorService streamerFlushExecutor() {
throw new UnsupportedOperationException("Not implemented");
}
+
+ @Override
+ public CompletableFuture<ClusterNode> partitionLocation(ReplicationGroupId
partition) {
+ throw new UnsupportedOperationException("Not implemented yet");
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/ColumnValidator.java
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/ColumnValidator.java
index 3ae839861e..b94e116de9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/ColumnValidator.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/ColumnValidator.java
@@ -25,6 +25,7 @@ import org.apache.ignite.table.criteria.Criteria;
import org.apache.ignite.table.criteria.CriteriaVisitor;
import org.apache.ignite.table.criteria.Expression;
import org.apache.ignite.table.criteria.Parameter;
+import org.apache.ignite.table.criteria.PartitionCriteria;
import org.jetbrains.annotations.Nullable;
/**
@@ -57,6 +58,11 @@ class ColumnValidator implements
CriteriaVisitor<Collection<String>> {
}
}
+ @Override
+ public void visit(PartitionCriteria partition, @Nullable
Collection<String> context) {
+ // No-op.
+ }
+
@Override
public <T> void visit(Criteria criteria, @Nullable Collection<String>
context) {
criteria.accept(this, context);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlSerializer.java
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlSerializer.java
index 9eedbc603e..ea50c90e22 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlSerializer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlSerializer.java
@@ -37,6 +37,7 @@ import org.apache.ignite.table.criteria.CriteriaVisitor;
import org.apache.ignite.table.criteria.Expression;
import org.apache.ignite.table.criteria.Operator;
import org.apache.ignite.table.criteria.Parameter;
+import org.apache.ignite.table.criteria.PartitionCriteria;
import org.jetbrains.annotations.Nullable;
/**
@@ -124,6 +125,12 @@ public class SqlSerializer implements
CriteriaVisitor<Void> {
}
}
+ @Override
+ // TODO: IGNITE-22153
+ public void visit(PartitionCriteria partition, @Nullable Void context) {
+ throw new UnsupportedOperationException("This operation doesn't
implemented yet.");
+ }
+
/** {@inheritDoc} */
@Override
public <T> void visit(Criteria criteria, @Nullable Void context) {
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h
b/modules/platforms/cpp/ignite/common/error_codes.h
index 4c0a079b66..10ff90e3fc 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -77,6 +77,7 @@ enum class code : underlying_t {
TABLE_STOPPING = 0x20005,
TABLE_DEFINITION = 0x20006,
SCHEMA_VERSION_MISMATCH = 0x20007,
+ UNSUPPORTED_PARTITION_TYPE = 0x20008,
// Client group. Group code: 3
CONNECTION = 0x30001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index e19562a319..e76d18a938 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -135,6 +135,7 @@ sql_state error_code_to_sql_state(error::code code) {
case error::code::TABLE_STOPPING:
case error::code::TABLE_DEFINITION:
case error::code::SCHEMA_VERSION_MISMATCH:
+ case error::code::UNSUPPORTED_PARTITION_TYPE:
return sql_state::SHY000_GENERAL_ERROR;
// Client group. Group code: 3
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index b96e5a793d..ea86d3885d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -116,6 +116,9 @@ namespace Apache.Ignite
/// <summary> SchemaVersionMismatch error. </summary>
public const int SchemaVersionMismatch = (GroupCode << 16) | (7 &
0xFFFF);
+
+ /// <summary> UnsupportedPartitionType error. </summary>
+ public const int UnsupportedPartitionType = (GroupCode << 16) | (8
& 0xFFFF);
}
/// <summary> Client errors. </summary>
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java
new file mode 100644
index 0000000000..5bade5447d
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static org.apache.ignite.internal.table.TableRow.tuple;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.partition.PartitionManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test suite for {@link PartitionManager}.
+ */
+public class ItPartitionManagerTest extends ClusterPerTestIntegrationTest {
+ private static final String TABLE_NAME = "tableName";
+
+ private static final int PARTITIONS = 3;
+
+ @BeforeEach
+ public void setup() {
+ String zoneSql = "create zone test_zone with"
+ + " partitions=" + PARTITIONS + ","
+ + " replicas=3,"
+ + " storage_profiles='" + DEFAULT_STORAGE_PROFILE + "'";
+
+ String sql = "create table " + TABLE_NAME + " (key int primary key,
val varchar(20)) with primary_zone='TEST_ZONE'";
+
+ cluster.doInSession(0, session -> {
+ executeUpdate(zoneSql, session);
+ executeUpdate(sql, session);
+ });
+
+
+ for (int i = 0; i < 1000; i++) {
+ executeSql("INSERT INTO " + TABLE_NAME + " (key, val) VALUES (" +
i + ", 'one')");
+ }
+ }
+
+ @Test
+ public void partitionsForAllKeys() {
+ PartitionManager partitionManager =
cluster.aliveNode().tables().table(TABLE_NAME).partitionManager();
+ TableViewInternal tableViewInternal =
unwrapTableViewInternal(cluster.aliveNode().tables().table(TABLE_NAME));
+ InternalTable internalTable = tableViewInternal.internalTable();
+
+ CompletableFuture<?>[] futures = new CompletableFuture<?>[PARTITIONS];
+ for (int i = 0; i < PARTITIONS; i++) {
+ CompletableFuture<Object> future = new CompletableFuture<>();
+
+ futures[i] = future;
+
+ Publisher<BinaryRow> scan = internalTable.scan(i, null);
+
+ HashPartition value = new HashPartition(i);
+
+ scan.subscribe(new Subscriber<>() {
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ subscription.request(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(BinaryRow item) {
+ SchemaRegistry registry = tableViewInternal.schemaView();
+ Tuple tuple = tuple(registry.resolve(item,
registry.lastKnownSchemaVersion()));
+
+ Tuple key = Tuple.create().set("key",
tuple.intValue("key"));
+ assertThat(partitionManager.partitionAsync(key),
willBe(value));
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+
+ @Override
+ public void onComplete() {
+ future.complete(null);
+ }
+ });
+ }
+
+ assertThat(allOf(futures), willCompleteSuccessfully());
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index a59ebba8fc..180ea3f7cd 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -486,4 +487,12 @@ public interface InternalTable extends ManuallyCloseable {
* @return Streamer flush executor.
*/
ScheduledExecutorService streamerFlushExecutor();
+
+ /**
+ * Returns {@link ClusterNode} where primary replica of replication group
is located.
+ *
+ * @param partition Replication group identifier.
+ * @return Cluster node with primary replica.
+ */
+ CompletableFuture<ClusterNode> partitionLocation(ReplicationGroupId
partition);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 578de2d09f..d8e966fcbe 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
-import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.marshaller.MarshallerException;
import org.apache.ignite.internal.marshaller.MarshallersProvider;
import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
@@ -44,6 +43,7 @@ import
org.apache.ignite.internal.table.distributed.PartitionSet;
import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
+import org.apache.ignite.internal.table.partition.HashPartitionManagerImpl;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.sql.IgniteSql;
@@ -51,6 +51,7 @@ import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.PartitionManager;
import org.jetbrains.annotations.TestOnly;
/**
@@ -141,6 +142,11 @@ public class TableImpl implements TableViewInternal {
return tbl;
}
+ @Override
+ public PartitionManager partitionManager() {
+ return new HashPartitionManagerImpl(tbl, schemaReg, marshallers);
+ }
+
@Override public String name() {
return tbl.name();
}
@@ -166,7 +172,7 @@ public class TableImpl implements TableViewInternal {
@Override
public <R> RecordView<R> recordView(Mapper<R> recMapper) {
- return new RecordViewImpl<R>(tbl, schemaReg, schemaVersions, sql,
marshallers, recMapper);
+ return new RecordViewImpl<>(tbl, schemaReg, schemaVersions, sql,
marshallers, recMapper);
}
@Override
@@ -195,7 +201,7 @@ public class TableImpl implements TableViewInternal {
return tbl.partition(keyRow);
} catch (TupleMarshallerException e) {
- throw new IgniteInternalException(e);
+ throw new org.apache.ignite.lang.MarshallerException(e);
}
}
@@ -209,7 +215,7 @@ public class TableImpl implements TableViewInternal {
try {
keyRow = marshaller.marshal(key);
} catch (MarshallerException e) {
- throw new IgniteInternalException("Cannot marshal key", e);
+ throw new org.apache.ignite.lang.MarshallerException(e);
}
return tbl.partition(keyRow);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PublicApiThreadingTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PublicApiThreadingTable.java
index 3524ecb6ca..d7731f26e9 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PublicApiThreadingTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PublicApiThreadingTable.java
@@ -28,6 +28,7 @@ import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.PartitionManager;
/**
* Wrapper around {@link Table} that maintains public API invariants relating
to threading.
@@ -53,6 +54,11 @@ class PublicApiThreadingTable implements Table, Wrapper {
return table.name();
}
+ @Override
+ public PartitionManager partitionManager() {
+ return table.partitionManager();
+ }
+
@Override
public <R> RecordView<R> recordView(Mapper<R> recMapper) {
return new PublicApiThreadingRecordView<>(table.recordView(recMapper),
asyncContinuationExecutor);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 4987f375f1..8ea498f868 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -743,12 +743,7 @@ public class InternalTableImpl implements InternalTable {
CompletableFuture<R> fut =
primaryReplicaFuture.thenCompose(primaryReplica -> {
try {
- ClusterNode node =
clusterNodeResolver.getByConsistentId(primaryReplica.getLeaseholder());
-
- if (node == null) {
- throw new TransactionException(REPLICA_UNAVAILABLE_ERR,
"Failed to resolve the primary replica node [consistentId="
- + primaryReplica.getLeaseholder() + ']');
- }
+ ClusterNode node =
getClusterNode(primaryReplica.getLeaseholder());
return replicaSvc.invoke(node, op.apply(tablePartitionId,
primaryReplica.getStartTime().longValue()));
} catch (Throwable e) {
@@ -794,12 +789,7 @@ public class InternalTableImpl implements InternalTable {
CompletableFuture<R> fut =
primaryReplicaFuture.thenCompose(primaryReplica -> {
try {
- ClusterNode node =
clusterNodeResolver.getByConsistentId(primaryReplica.getLeaseholder());
-
- if (node == null) {
- throw new TransactionException(REPLICA_UNAVAILABLE_ERR,
"Failed to resolve the primary replica node [consistentId="
- + primaryReplica.getLeaseholder() + ']');
- }
+ ClusterNode node =
getClusterNode(primaryReplica.getLeaseholder());
return replicaSvc.invoke(node, op.apply(tablePartitionId,
primaryReplica.getStartTime().longValue()));
} catch (Throwable e) {
@@ -1867,6 +1857,22 @@ public class InternalTableImpl implements InternalTable {
TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partId);
tx.assignCommitPartition(tablePartitionId);
+ return partitionMeta(tablePartitionId).thenApply(meta -> {
+ TablePartitionId partGroupId = new TablePartitionId(tableId,
partId);
+
+ return tx.enlist(partGroupId, new IgniteBiTuple<>(
+ getClusterNode(meta.getLeaseholder()),
+ meta.getStartTime().longValue())
+ );
+ });
+ }
+
+ @Override
+ public CompletableFuture<ClusterNode> partitionLocation(ReplicationGroupId
tablePartitionId) {
+ return partitionMeta(tablePartitionId).thenApply(meta ->
getClusterNode(meta.getLeaseholder()));
+ }
+
+ private CompletableFuture<ReplicaMeta> partitionMeta(ReplicationGroupId
tablePartitionId) {
HybridTimestamp now = clock.now();
CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(
@@ -1882,17 +1888,19 @@ public class InternalTableImpl implements InternalTable
{
+ " [tablePartitionId=" + tablePartitionId + ",
awaitTimestamp=" + now + ']', e);
}
- ClusterNode node =
clusterNodeResolver.getByConsistentId(primaryReplica.getLeaseholder());
+ return primaryReplica;
+ });
+ }
- if (node == null) {
- throw new TransactionException(REPLICA_UNAVAILABLE_ERR,
"Failed to resolve the primary replica node [consistentId="
- + primaryReplica.getLeaseholder() + ']');
- }
+ private ClusterNode getClusterNode(@Nullable String leaserHolder) {
+ ClusterNode node = clusterNodeResolver.getByConsistentId(leaserHolder);
- TablePartitionId partGroupId = new TablePartitionId(tableId,
partId);
+ if (node == null) {
+ throw new TransactionException(REPLICA_UNAVAILABLE_ERR, "Failed to
resolve the primary replica node [consistentId="
+ + leaserHolder + ']');
+ }
- return tx.enlist(partGroupId, new IgniteBiTuple<>(node,
primaryReplica.getStartTime().longValue()));
- });
+ return node;
}
/**
@@ -2145,7 +2153,7 @@ public class InternalTableImpl implements InternalTable {
if (res == null) {
throw withCause(TransactionException::new,
REPLICA_UNAVAILABLE_ERR, e);
} else {
- return
clusterNodeResolver.getByConsistentId(res.getLeaseholder());
+ return getClusterNode(res.getLeaseholder());
}
}
});
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
new file mode 100644
index 0000000000..da622a7d08
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.partition;
+
+import org.apache.ignite.table.partition.Partition;
+
+/**
+ * Hash partition representation.
+ */
+public class HashPartition implements Partition {
+ private static final long serialVersionUID = 1717320056615864614L;
+
+ private final int partitionId;
+
+ public HashPartition(int partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ public int partitionId() {
+ return partitionId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ HashPartition that = (HashPartition) o;
+
+ return partitionId == that.partitionId;
+ }
+
+ @Override
+ public int hashCode() {
+ return partitionId;
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartitionManagerImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartitionManagerImpl.java
new file mode 100644
index 0000000000..a185be1e33
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartitionManagerImpl.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.partition;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import
org.apache.ignite.internal.schema.marshaller.reflection.KvMarshallerImpl;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.lang.UnsupportedPartitionTypeException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionManager;
+
+/**
+ * Implementation of {@link PartitionManager} for tables with hash partitions.
+ */
+public class HashPartitionManagerImpl implements PartitionManager {
+ private final InternalTable table;
+
+ private final SchemaRegistry schemaReg;
+
+ private final MarshallersProvider marshallers;
+
+ /**
+ * Constructor.
+ *
+ * @param table Internal table.
+ * @param schemaReg Schema registry.
+ * @param marshallers Marshallers.
+ */
+ public HashPartitionManagerImpl(
+ InternalTable table,
+ SchemaRegistry schemaReg,
+ MarshallersProvider marshallers
+ ) {
+ this.table = table;
+ this.schemaReg = schemaReg;
+ this.marshallers = marshallers;
+ }
+
+ @Override
+ public CompletableFuture<ClusterNode> primaryReplicaAsync(Partition
partition) {
+ if (!(partition instanceof HashPartition)) {
+ throw new UnsupportedPartitionTypeException("Table " + table.name()
+ + " doesn't support any other type of partition except
hash partition.");
+ }
+ HashPartition hashPartition = (HashPartition) partition;
+ return table.partitionLocation(new TablePartitionId(table.tableId(),
hashPartition.partitionId()));
+ }
+
+ @Override
+ public CompletableFuture<Map<Partition, ClusterNode>>
primaryReplicasAsync() {
+ int partitions = table.partitions();
+ CompletableFuture<?>[] futures = new CompletableFuture<?>[partitions];
+
+ for (int i = 0; i < partitions; i++) {
+ futures[i] = table.partitionLocation(new
TablePartitionId(table.tableId(), i));
+ }
+
+ return allOf(futures)
+ .thenApply(unused -> {
+ Map<Partition, ClusterNode> result = new
HashMap<>(partitions);
+ for (int i = 0; i < partitions; i++) {
+ result.put(new HashPartition(i), (ClusterNode)
futures[i].join());
+ }
+ return result;
+ });
+ }
+
+ @Override
+ public <K> CompletableFuture<Partition> partitionAsync(K key, Mapper<K>
mapper) {
+ Objects.requireNonNull(key);
+ Objects.requireNonNull(mapper);
+
+ var marshaller = new KvMarshallerImpl<>(schemaReg.lastKnownSchema(),
marshallers, mapper, mapper);
+ try {
+ BinaryRowEx keyRow = marshaller.marshal(key);
+
+ return completedFuture(new HashPartition(table.partition(keyRow)));
+ } catch (MarshallerException e) {
+ throw new org.apache.ignite.lang.MarshallerException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Partition> partitionAsync(Tuple key) {
+ Objects.requireNonNull(key);
+
+ try {
+ // Taking latest schema version for marshaller here because it's
only used to calculate colocation hash, and colocation
+ // columns never change (so they are the same for all schema
versions of the table),
+ Row keyRow = new
TupleMarshallerImpl(schemaReg.lastKnownSchema()).marshalKey(key);
+
+ return completedFuture(new HashPartition(table.partition(keyRow)));
+ } catch (TupleMarshallerException e) {
+ throw new org.apache.ignite.lang.MarshallerException(e);
+ }
+ }
+}