This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a6cc4f293 IGNITE-22065 Introduce partition public API (#3673)
2a6cc4f293 is described below

commit 2a6cc4f2930db3c260e9b5b014bfa22896af7759
Author: Mikhail <[email protected]>
AuthorDate: Fri May 10 13:58:40 2024 +0300

    IGNITE-22065 Introduce partition public API (#3673)
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   3 +
 .../lang/UnsupportedPartitionTypeException.java    |  51 ++++++++
 .../main/java/org/apache/ignite/table/Table.java   |   8 ++
 .../ignite/table/criteria/CriteriaVisitor.java     |   9 ++
 .../ignite/table/criteria/PartitionCriteria.java   |  32 +++++
 .../apache/ignite/table/partition/Partition.java   |  27 +++++
 .../ignite/table/partition/PartitionManager.java   |  65 +++++++++++
 .../ignite/internal/client/table/ClientTable.java  |   7 ++
 .../ignite/client/fakes/FakeInternalTable.java     |   6 +
 .../internal/table/criteria/ColumnValidator.java   |   6 +
 .../internal/table/criteria/SqlSerializer.java     |   7 ++
 modules/platforms/cpp/ignite/common/error_codes.h  |   1 +
 modules/platforms/cpp/ignite/odbc/common_types.cpp |   1 +
 .../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs |   3 +
 .../internal/table/ItPartitionManagerTest.java     | 115 ++++++++++++++++++
 .../ignite/internal/table/InternalTable.java       |   9 ++
 .../apache/ignite/internal/table/TableImpl.java    |  14 ++-
 .../table/distributed/PublicApiThreadingTable.java |   6 +
 .../distributed/storage/InternalTableImpl.java     |  50 ++++----
 .../internal/table/partition/HashPartition.java    |  56 +++++++++
 .../table/partition/HashPartitionManagerImpl.java  | 129 +++++++++++++++++++++
 21 files changed, 580 insertions(+), 25 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 217eac851e..6ef49bb962 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -172,6 +172,9 @@ public class ErrorGroups {
 
         /** Schema version mismatch. */
         public static final int SCHEMA_VERSION_MISMATCH_ERR = 
TABLE_ERR_GROUP.registerErrorCode((short) 7);
+
+        /** Unsupported partition type. */
+        public static final int UNSUPPORTED_PARTITION_TYPE_ERR = 
TABLE_ERR_GROUP.registerErrorCode((short) 8);
     }
 
     /** Client error group. */
diff --git 
a/modules/api/src/main/java/org/apache/ignite/lang/UnsupportedPartitionTypeException.java
 
b/modules/api/src/main/java/org/apache/ignite/lang/UnsupportedPartitionTypeException.java
new file mode 100644
index 0000000000..f9321b6acf
--- /dev/null
+++ 
b/modules/api/src/main/java/org/apache/ignite/lang/UnsupportedPartitionTypeException.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import static 
org.apache.ignite.lang.ErrorGroups.Table.UNSUPPORTED_PARTITION_TYPE_ERR;
+
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This exception is thrown when a table does not support the specified 
partition type.
+ */
+public class UnsupportedPartitionTypeException extends IgniteException {
+    private static final long serialVersionUID = 6875586826126580903L;
+
+    /**
+     * Creates a new exception with the given message.
+     *
+     * @param message Exception message.
+     */
+    public UnsupportedPartitionTypeException(String message) {
+        super(UNSUPPORTED_PARTITION_TYPE_ERR, message);
+    }
+
+    /**
+     * Creates a new exception with the given trace id, error code, detail 
message and cause.
+     *
+     * @param traceId Unique identifier of this exception.
+     * @param code Full error code.
+     * @param message Detail message.
+     * @param cause Optional nested exception (can be {@code null}).
+     */
+    public UnsupportedPartitionTypeException(UUID traceId, int code, String 
message, @Nullable Throwable cause) {
+        super(traceId, code, message, cause);
+    }
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/table/Table.java 
b/modules/api/src/main/java/org/apache/ignite/table/Table.java
index 2af898f65d..3f39df5e74 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/Table.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/Table.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.table;
 
 import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.PartitionManager;
 
 /**
  * Table provides the different views (key-value vs record) and approaches 
(mapped-object vs binary) to access the data.
@@ -37,6 +38,13 @@ public interface Table {
      */
     String name();
 
+    /**
+     * Gets a partition manager of a table.
+     *
+     * @return Partition manager.
+     */
+    PartitionManager partitionManager();
+
     /**
      * Creates a record view of a table for the record class mapper provided.
      *
diff --git 
a/modules/api/src/main/java/org/apache/ignite/table/criteria/CriteriaVisitor.java
 
b/modules/api/src/main/java/org/apache/ignite/table/criteria/CriteriaVisitor.java
index a40742b0cb..d8fe77f404 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/table/criteria/CriteriaVisitor.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/table/criteria/CriteriaVisitor.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.table.criteria;
 
+import org.apache.ignite.table.partition.Partition;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -49,6 +50,14 @@ public interface CriteriaVisitor<C> {
      */
     <T> void visit(Expression expression, @Nullable C context);
 
+    /**
+     * Visit a {@link Partition} instance with the given context.
+     *
+     * @param partition Partition to visit.
+     * @param context context of the visit or {@code null}, if not used.
+     */
+    void visit(PartitionCriteria partition, @Nullable C context);
+
     /**
      * Visit a {@link Criteria} instance with the given context.
      *
diff --git 
a/modules/api/src/main/java/org/apache/ignite/table/criteria/PartitionCriteria.java
 
b/modules/api/src/main/java/org/apache/ignite/table/criteria/PartitionCriteria.java
new file mode 100644
index 0000000000..3f6ff0e5ef
--- /dev/null
+++ 
b/modules/api/src/main/java/org/apache/ignite/table/criteria/PartitionCriteria.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table.criteria;
+
+import org.apache.ignite.table.partition.Partition;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Represents a partition reference for criteria query.
+ */
+// TODO: IGNITE-22153
+public class PartitionCriteria implements Partition, Criteria {
+    @Override
+    public <C> void accept(CriteriaVisitor<C> v, @Nullable C context) {
+        v.visit(this, context);
+    }
+}
diff --git 
a/modules/api/src/main/java/org/apache/ignite/table/partition/Partition.java 
b/modules/api/src/main/java/org/apache/ignite/table/partition/Partition.java
new file mode 100644
index 0000000000..27333d15b9
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/table/partition/Partition.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table.partition;
+
+import java.io.Serializable;
+
+/**
+ * Marker interface which represents a partition reference.
+ */
+public interface Partition extends Serializable {
+
+}
diff --git 
a/modules/api/src/main/java/org/apache/ignite/table/partition/PartitionManager.java
 
b/modules/api/src/main/java/org/apache/ignite/table/partition/PartitionManager.java
new file mode 100644
index 0000000000..bace22bf19
--- /dev/null
+++ 
b/modules/api/src/main/java/org/apache/ignite/table/partition/PartitionManager.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table.partition;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+
+/**
+ * The partition manager provides the ability to obtain information about 
table partitions.
+ * This interface can be used to get all partitions of a table,
+ * the location of the primary replica of a partition,
+ * the partition for a specific table key.
+ */
+public interface PartitionManager {
+    /**
+     * Returns location of primary replica for provided partition.
+     *
+     * @param partition Partition instance.
+     * @return Cluster node where primary replica of provided partition is 
located.
+     */
+    CompletableFuture<ClusterNode> primaryReplicaAsync(Partition partition);
+
+    /**
+     * Returns map with all partitions and their locations.
+     *
+     * @return Map from partition to cluster node where primary replica of the 
partition is located.
+     */
+    CompletableFuture<Map<Partition, ClusterNode>> primaryReplicasAsync();
+
+    /**
+     * Returns partition instance for provided table key.
+     *
+     * @param key Table key.
+     * @param mapper Table key mapper.
+     * @param <K> Key type.
+     * @return Partition instance which contains provided key.
+     */
+    <K> CompletableFuture<Partition> partitionAsync(K key, Mapper<K> mapper);
+
+    /**
+     * Returns partition instance for provided table key.
+     *
+     * @param key Table key tuple.
+     * @return Partition instance which contains provided key.
+     */
+    CompletableFuture<Partition> partitionAsync(Tuple key);
+}
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index d8d6d85c18..70956196a6 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -54,6 +54,7 @@ import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.PartitionManager;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 
@@ -131,6 +132,12 @@ public class ClientTable implements Table {
         return name;
     }
 
+    @Override
+    // TODO: IGNITE-22149
+    public PartitionManager partitionManager() {
+        throw new UnsupportedOperationException("This operation doesn't 
implemented yet.");
+    }
+
     /** {@inheritDoc} */
     @Override
     public <R> RecordView<R> recordView(Mapper<R> recMapper) {
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 8016953e97..6fbcbe5f98 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -37,6 +37,7 @@ import java.util.function.BiConsumer;
 import javax.naming.OperationNotSupportedException;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -498,4 +499,9 @@ public class FakeInternalTable implements InternalTable {
     public ScheduledExecutorService streamerFlushExecutor() {
         throw new UnsupportedOperationException("Not implemented");
     }
+
+    @Override
+    public CompletableFuture<ClusterNode> partitionLocation(ReplicationGroupId 
partition) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/ColumnValidator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/ColumnValidator.java
index 3ae839861e..b94e116de9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/ColumnValidator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/ColumnValidator.java
@@ -25,6 +25,7 @@ import org.apache.ignite.table.criteria.Criteria;
 import org.apache.ignite.table.criteria.CriteriaVisitor;
 import org.apache.ignite.table.criteria.Expression;
 import org.apache.ignite.table.criteria.Parameter;
+import org.apache.ignite.table.criteria.PartitionCriteria;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -57,6 +58,11 @@ class ColumnValidator implements 
CriteriaVisitor<Collection<String>> {
         }
     }
 
+    @Override
+    public void visit(PartitionCriteria partition, @Nullable 
Collection<String> context) {
+        // No-op.
+    }
+
     @Override
     public <T> void visit(Criteria criteria, @Nullable Collection<String> 
context) {
         criteria.accept(this, context);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlSerializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlSerializer.java
index 9eedbc603e..ea50c90e22 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlSerializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlSerializer.java
@@ -37,6 +37,7 @@ import org.apache.ignite.table.criteria.CriteriaVisitor;
 import org.apache.ignite.table.criteria.Expression;
 import org.apache.ignite.table.criteria.Operator;
 import org.apache.ignite.table.criteria.Parameter;
+import org.apache.ignite.table.criteria.PartitionCriteria;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -124,6 +125,12 @@ public class SqlSerializer implements 
CriteriaVisitor<Void> {
         }
     }
 
+    @Override
+    // TODO: IGNITE-22153
+    public void visit(PartitionCriteria partition, @Nullable Void context) {
+        throw new UnsupportedOperationException("This operation doesn't 
implemented yet.");
+    }
+
     /** {@inheritDoc} */
     @Override
     public <T> void visit(Criteria criteria, @Nullable Void context) {
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h 
b/modules/platforms/cpp/ignite/common/error_codes.h
index 4c0a079b66..10ff90e3fc 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -77,6 +77,7 @@ enum class code : underlying_t {
     TABLE_STOPPING = 0x20005,
     TABLE_DEFINITION = 0x20006,
     SCHEMA_VERSION_MISMATCH = 0x20007,
+    UNSUPPORTED_PARTITION_TYPE = 0x20008,
 
     // Client group. Group code: 3
     CONNECTION = 0x30001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp 
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index e19562a319..e76d18a938 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -135,6 +135,7 @@ sql_state error_code_to_sql_state(error::code code) {
         case error::code::TABLE_STOPPING:
         case error::code::TABLE_DEFINITION:
         case error::code::SCHEMA_VERSION_MISMATCH:
+        case error::code::UNSUPPORTED_PARTITION_TYPE:
             return sql_state::SHY000_GENERAL_ERROR;
 
         // Client group. Group code: 3
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs 
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index b96e5a793d..ea86d3885d 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -116,6 +116,9 @@ namespace Apache.Ignite
 
             /// <summary> SchemaVersionMismatch error. </summary>
             public const int SchemaVersionMismatch = (GroupCode << 16) | (7 & 
0xFFFF);
+
+            /// <summary> UnsupportedPartitionType error. </summary>
+            public const int UnsupportedPartitionType = (GroupCode << 16) | (8 
& 0xFFFF);
         }
 
         /// <summary> Client errors. </summary>
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java
new file mode 100644
index 0000000000..5bade5447d
--- /dev/null
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItPartitionManagerTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
+import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static org.apache.ignite.internal.table.TableRow.tuple;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.partition.PartitionManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test suite for {@link PartitionManager}.
+ */
+public class ItPartitionManagerTest extends ClusterPerTestIntegrationTest {
+    private static final String TABLE_NAME = "tableName";
+
+    private static final int PARTITIONS = 3;
+
+    @BeforeEach
+    public void setup() {
+        String zoneSql = "create zone test_zone with"
+                + " partitions=" + PARTITIONS + ","
+                + " replicas=3,"
+                + " storage_profiles='" + DEFAULT_STORAGE_PROFILE + "'";
+
+        String sql = "create table " + TABLE_NAME + " (key int primary key, 
val varchar(20)) with primary_zone='TEST_ZONE'";
+
+        cluster.doInSession(0, session -> {
+            executeUpdate(zoneSql, session);
+            executeUpdate(sql, session);
+        });
+
+
+        for (int i = 0; i < 1000; i++) {
+            executeSql("INSERT INTO " + TABLE_NAME + " (key, val) VALUES (" + 
i + ", 'one')");
+        }
+    }
+
+    @Test
+    public void partitionsForAllKeys() {
+        PartitionManager partitionManager = 
cluster.aliveNode().tables().table(TABLE_NAME).partitionManager();
+        TableViewInternal tableViewInternal = 
unwrapTableViewInternal(cluster.aliveNode().tables().table(TABLE_NAME));
+        InternalTable internalTable = tableViewInternal.internalTable();
+
+        CompletableFuture<?>[] futures = new CompletableFuture<?>[PARTITIONS];
+        for (int i = 0; i < PARTITIONS; i++) {
+            CompletableFuture<Object> future = new CompletableFuture<>();
+
+            futures[i] = future;
+
+            Publisher<BinaryRow> scan = internalTable.scan(i, null);
+
+            HashPartition value = new HashPartition(i);
+
+            scan.subscribe(new Subscriber<>() {
+                @Override
+                public void onSubscribe(Subscription subscription) {
+                    subscription.request(Long.MAX_VALUE);
+                }
+
+                @Override
+                public void onNext(BinaryRow item) {
+                    SchemaRegistry registry = tableViewInternal.schemaView();
+                    Tuple tuple = tuple(registry.resolve(item, 
registry.lastKnownSchemaVersion()));
+
+                    Tuple key = Tuple.create().set("key", 
tuple.intValue("key"));
+                    assertThat(partitionManager.partitionAsync(key), 
willBe(value));
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+                    future.completeExceptionally(throwable);
+                }
+
+                @Override
+                public void onComplete() {
+                    future.complete(null);
+                }
+            });
+        }
+
+        assertThat(allOf(futures), willCompleteSuccessfully());
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index a59ebba8fc..180ea3f7cd 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.ScheduledExecutorService;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -486,4 +487,12 @@ public interface InternalTable extends ManuallyCloseable {
      * @return Streamer flush executor.
      */
     ScheduledExecutorService streamerFlushExecutor();
+
+    /**
+     * Returns {@link ClusterNode} where primary replica of replication group 
is located.
+     *
+     * @param partition Replication group identifier.
+     * @return Cluster node with primary replica.
+     */
+    CompletableFuture<ClusterNode> partitionLocation(ReplicationGroupId 
partition);
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 578de2d09f..d8e966fcbe 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
-import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.marshaller.MarshallerException;
 import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
@@ -44,6 +43,7 @@ import 
org.apache.ignite.internal.table.distributed.PartitionSet;
 import org.apache.ignite.internal.table.distributed.TableIndexStoragesSupplier;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
+import org.apache.ignite.internal.table.partition.HashPartitionManagerImpl;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.sql.IgniteSql;
@@ -51,6 +51,7 @@ import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.PartitionManager;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -141,6 +142,11 @@ public class TableImpl implements TableViewInternal {
         return tbl;
     }
 
+    @Override
+    public PartitionManager partitionManager() {
+        return new HashPartitionManagerImpl(tbl, schemaReg, marshallers);
+    }
+
     @Override public String name() {
         return tbl.name();
     }
@@ -166,7 +172,7 @@ public class TableImpl implements TableViewInternal {
 
     @Override
     public <R> RecordView<R> recordView(Mapper<R> recMapper) {
-        return new RecordViewImpl<R>(tbl, schemaReg, schemaVersions, sql, 
marshallers, recMapper);
+        return new RecordViewImpl<>(tbl, schemaReg, schemaVersions, sql, 
marshallers, recMapper);
     }
 
     @Override
@@ -195,7 +201,7 @@ public class TableImpl implements TableViewInternal {
 
             return tbl.partition(keyRow);
         } catch (TupleMarshallerException e) {
-            throw new IgniteInternalException(e);
+            throw new org.apache.ignite.lang.MarshallerException(e);
         }
     }
 
@@ -209,7 +215,7 @@ public class TableImpl implements TableViewInternal {
         try {
             keyRow = marshaller.marshal(key);
         } catch (MarshallerException e) {
-            throw new IgniteInternalException("Cannot marshal key", e);
+            throw new org.apache.ignite.lang.MarshallerException(e);
         }
 
         return tbl.partition(keyRow);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PublicApiThreadingTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PublicApiThreadingTable.java
index 3524ecb6ca..d7731f26e9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PublicApiThreadingTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PublicApiThreadingTable.java
@@ -28,6 +28,7 @@ import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.PartitionManager;
 
 /**
  * Wrapper around {@link Table} that maintains public API invariants relating 
to threading.
@@ -53,6 +54,11 @@ class PublicApiThreadingTable implements Table, Wrapper {
         return table.name();
     }
 
+    @Override
+    public PartitionManager partitionManager() {
+        return table.partitionManager();
+    }
+
     @Override
     public <R> RecordView<R> recordView(Mapper<R> recMapper) {
         return new PublicApiThreadingRecordView<>(table.recordView(recMapper), 
asyncContinuationExecutor);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 4987f375f1..8ea498f868 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -743,12 +743,7 @@ public class InternalTableImpl implements InternalTable {
 
         CompletableFuture<R> fut = 
primaryReplicaFuture.thenCompose(primaryReplica -> {
             try {
-                ClusterNode node = 
clusterNodeResolver.getByConsistentId(primaryReplica.getLeaseholder());
-
-                if (node == null) {
-                    throw new TransactionException(REPLICA_UNAVAILABLE_ERR, 
"Failed to resolve the primary replica node [consistentId="
-                            + primaryReplica.getLeaseholder() + ']');
-                }
+                ClusterNode node = 
getClusterNode(primaryReplica.getLeaseholder());
 
                 return replicaSvc.invoke(node, op.apply(tablePartitionId, 
primaryReplica.getStartTime().longValue()));
             } catch (Throwable e) {
@@ -794,12 +789,7 @@ public class InternalTableImpl implements InternalTable {
 
         CompletableFuture<R> fut = 
primaryReplicaFuture.thenCompose(primaryReplica -> {
             try {
-                ClusterNode node = 
clusterNodeResolver.getByConsistentId(primaryReplica.getLeaseholder());
-
-                if (node == null) {
-                    throw new TransactionException(REPLICA_UNAVAILABLE_ERR, 
"Failed to resolve the primary replica node [consistentId="
-                            + primaryReplica.getLeaseholder() + ']');
-                }
+                ClusterNode node = 
getClusterNode(primaryReplica.getLeaseholder());
 
                 return replicaSvc.invoke(node, op.apply(tablePartitionId, 
primaryReplica.getStartTime().longValue()));
             } catch (Throwable e) {
@@ -1867,6 +1857,22 @@ public class InternalTableImpl implements InternalTable {
         TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
partId);
         tx.assignCommitPartition(tablePartitionId);
 
+        return partitionMeta(tablePartitionId).thenApply(meta -> {
+            TablePartitionId partGroupId = new TablePartitionId(tableId, 
partId);
+
+            return tx.enlist(partGroupId, new IgniteBiTuple<>(
+                    getClusterNode(meta.getLeaseholder()),
+                    meta.getStartTime().longValue())
+            );
+        });
+    }
+
+    @Override
+    public CompletableFuture<ClusterNode> partitionLocation(ReplicationGroupId 
tablePartitionId) {
+        return partitionMeta(tablePartitionId).thenApply(meta -> 
getClusterNode(meta.getLeaseholder()));
+    }
+
+    private CompletableFuture<ReplicaMeta> partitionMeta(ReplicationGroupId 
tablePartitionId) {
         HybridTimestamp now = clock.now();
 
         CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(
@@ -1882,17 +1888,19 @@ public class InternalTableImpl implements InternalTable 
{
                         + " [tablePartitionId=" + tablePartitionId + ", 
awaitTimestamp=" + now + ']', e);
             }
 
-            ClusterNode node = 
clusterNodeResolver.getByConsistentId(primaryReplica.getLeaseholder());
+            return primaryReplica;
+        });
+    }
 
-            if (node == null) {
-                throw new TransactionException(REPLICA_UNAVAILABLE_ERR, 
"Failed to resolve the primary replica node [consistentId="
-                        + primaryReplica.getLeaseholder() + ']');
-            }
+    private ClusterNode getClusterNode(@Nullable String leaserHolder) {
+        ClusterNode node = clusterNodeResolver.getByConsistentId(leaserHolder);
 
-            TablePartitionId partGroupId = new TablePartitionId(tableId, 
partId);
+        if (node == null) {
+            throw new TransactionException(REPLICA_UNAVAILABLE_ERR, "Failed to 
resolve the primary replica node [consistentId="
+                    + leaserHolder + ']');
+        }
 
-            return tx.enlist(partGroupId, new IgniteBiTuple<>(node, 
primaryReplica.getStartTime().longValue()));
-        });
+        return node;
     }
 
     /**
@@ -2145,7 +2153,7 @@ public class InternalTableImpl implements InternalTable {
                         if (res == null) {
                             throw withCause(TransactionException::new, 
REPLICA_UNAVAILABLE_ERR, e);
                         } else {
-                            return 
clusterNodeResolver.getByConsistentId(res.getLeaseholder());
+                            return getClusterNode(res.getLeaseholder());
                         }
                     }
                 });
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
new file mode 100644
index 0000000000..da622a7d08
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartition.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.partition;
+
+import org.apache.ignite.table.partition.Partition;
+
+/**
+ * Hash partition representation.
+ */
+public class HashPartition implements Partition {
+    private static final long serialVersionUID = 1717320056615864614L;
+
+    private final int partitionId;
+
+    public HashPartition(int partitionId) {
+        this.partitionId = partitionId;
+    }
+
+    public int partitionId() {
+        return partitionId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        HashPartition that = (HashPartition) o;
+
+        return partitionId == that.partitionId;
+    }
+
+    @Override
+    public int hashCode() {
+        return partitionId;
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartitionManagerImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartitionManagerImpl.java
new file mode 100644
index 0000000000..a185be1e33
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/partition/HashPartitionManagerImpl.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.partition;
+
+import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.internal.marshaller.MarshallersProvider;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import 
org.apache.ignite.internal.schema.marshaller.reflection.KvMarshallerImpl;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.lang.UnsupportedPartitionTypeException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionManager;
+
+/**
+ * Implementation of {@link PartitionManager} for tables with hash partitions.
+ */
+public class HashPartitionManagerImpl implements PartitionManager {
+    private final InternalTable table;
+
+    private final SchemaRegistry schemaReg;
+
+    private final MarshallersProvider marshallers;
+
+    /**
+     * Constructor.
+     *
+     * @param table Internal table.
+     * @param schemaReg Schema registry.
+     * @param marshallers Marshallers.
+     */
+    public HashPartitionManagerImpl(
+            InternalTable table,
+            SchemaRegistry schemaReg,
+            MarshallersProvider marshallers
+    ) {
+        this.table = table;
+        this.schemaReg = schemaReg;
+        this.marshallers = marshallers;
+    }
+
+    @Override
+    public CompletableFuture<ClusterNode> primaryReplicaAsync(Partition 
partition) {
+        if (!(partition instanceof HashPartition)) {
+            throw new UnsupportedPartitionTypeException("Table " + table.name()
+                    + " doesn't support any other type of partition except 
hash partition.");
+        }
+        HashPartition hashPartition = (HashPartition) partition;
+        return table.partitionLocation(new TablePartitionId(table.tableId(), 
hashPartition.partitionId()));
+    }
+
+    @Override
+    public CompletableFuture<Map<Partition, ClusterNode>> 
primaryReplicasAsync() {
+        int partitions = table.partitions();
+        CompletableFuture<?>[] futures = new CompletableFuture<?>[partitions];
+
+        for (int i = 0; i < partitions; i++) {
+            futures[i] = table.partitionLocation(new 
TablePartitionId(table.tableId(), i));
+        }
+
+        return allOf(futures)
+                .thenApply(unused -> {
+                    Map<Partition, ClusterNode> result = new 
HashMap<>(partitions);
+                    for (int i = 0; i < partitions; i++) {
+                        result.put(new HashPartition(i), (ClusterNode) 
futures[i].join());
+                    }
+                    return result;
+                });
+    }
+
+    @Override
+    public <K> CompletableFuture<Partition> partitionAsync(K key, Mapper<K> 
mapper) {
+        Objects.requireNonNull(key);
+        Objects.requireNonNull(mapper);
+
+        var marshaller = new KvMarshallerImpl<>(schemaReg.lastKnownSchema(), 
marshallers, mapper, mapper);
+        try {
+            BinaryRowEx keyRow = marshaller.marshal(key);
+
+            return completedFuture(new HashPartition(table.partition(keyRow)));
+        } catch (MarshallerException e) {
+            throw new org.apache.ignite.lang.MarshallerException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Partition> partitionAsync(Tuple key) {
+        Objects.requireNonNull(key);
+
+        try {
+            // Taking latest schema version for marshaller here because it's 
only used to calculate colocation hash, and colocation
+            // columns never change (so they are the same for all schema 
versions of the table),
+            Row keyRow = new 
TupleMarshallerImpl(schemaReg.lastKnownSchema()).marshalKey(key);
+
+            return completedFuture(new HashPartition(table.partition(keyRow)));
+        } catch (TupleMarshallerException e) {
+            throw new org.apache.ignite.lang.MarshallerException(e);
+        }
+    }
+}


Reply via email to