This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8314beacd1 IGNITE-18900 Sql. Remove hash function creation from
ExecutionContext (#1756)
8314beacd1 is described below
commit 8314beacd11ef54690d2ca350a8a043bbf21e262
Author: korlov42 <[email protected]>
AuthorDate: Tue Mar 7 11:42:24 2023 +0200
IGNITE-18900 Sql. Remove hash function creation from ExecutionContext
(#1756)
---
.../ignite/client/fakes/FakeInternalTable.java | 6 ++
.../internal/sql/engine/exec/ExecutionContext.java | 65 +--------------------
.../sql/engine/exec/ExecutionServiceImpl.java | 3 +-
.../sql/engine/schema/IgniteTableImpl.java | 66 +++++++++-------------
.../sql/engine/exec/RuntimeSortedIndexTest.java | 1 -
.../sql/engine/exec/rel/AbstractExecutionTest.java | 3 +-
.../engine/exec/rel/MergeJoinExecutionTest.java | 2 +-
.../sql/engine/framework/TestBuilders.java | 3 +-
.../ignite/internal/table/InternalTable.java | 8 +++
.../distributed/storage/InternalTableImpl.java | 27 ++++-----
10 files changed, 56 insertions(+), 128 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index 50036afd99..ba232b6580 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -100,6 +100,12 @@ public class FakeInternalTable implements InternalTable {
return tableName;
}
+ /** {@inheritDoc} */
+ @Override
+ public int partitionId(BinaryRowEx row) {
+ return 0;
+ }
+
/** {@inheritDoc} */
@Override
public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable
InternalTransaction tx) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
index 053c921b97..3a8770f1ea 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionContext.java
@@ -20,8 +20,6 @@ package org.apache.ignite.internal.sql.engine.exec;
import static org.apache.ignite.lang.ErrorGroups.Common.UNEXPECTED_ERR;
import java.lang.reflect.Type;
-import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -44,8 +42,6 @@ import
org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.AbstractQueryContext;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
-import org.apache.ignite.internal.sql.engine.util.HashFunctionFactory;
-import
org.apache.ignite.internal.sql.engine.util.HashFunctionFactory.RowHashFunction;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
@@ -82,10 +78,7 @@ public class ExecutionContext<RowT> extends
AbstractQueryContext implements Data
private final ExpressionFactory<RowT> expressionFactory;
- private final HashFunctionFactory<RowT> hashFunctionFactory;
-
private final AtomicBoolean cancelFlag = new AtomicBoolean();
- private final Map<HashFunctionCacheKey, RowHashFunction<RowT>>
hashFunctionCache = new HashMap<>();
/**
* Need to store timestamp, since SQL standard says that functions such as
CURRENT_TIMESTAMP return the same value throughout the
@@ -117,8 +110,7 @@ public class ExecutionContext<RowT> extends
AbstractQueryContext implements Data
FragmentDescription fragmentDesc,
RowHandler<RowT> handler,
Map<String, Object> params,
- TxAttributes txAttributes,
- HashFunctionFactory<RowT> hashFunctionFactory
+ TxAttributes txAttributes
) {
super(qctx);
@@ -130,7 +122,6 @@ public class ExecutionContext<RowT> extends
AbstractQueryContext implements Data
this.params = params;
this.localNode = localNode;
this.originatingNodeName = originatingNodeName;
- this.hashFunctionFactory = hashFunctionFactory;
this.txAttributes = txAttributes;
expressionFactory = new ExpressionFactoryImpl<>(
@@ -221,28 +212,6 @@ public class ExecutionContext<RowT> extends
AbstractQueryContext implements Data
return localNode;
}
- /**
- * Returns the function to compute colocation hash for specified table.
- *
- * @param tableId An identifier of a table. This identifier will be used
to acquire expected types
- * of the colocation fields.
- * @param fields Indexes of the fields representing colocation columns.
This is a projection of
- * the colocation fields of specified table on actual row. For
example, type of the row to insert
- * equals to the type of table's row, thus passed fields should match
the colocation columns of the table.
- * But row for delete may contain the primary fields only, thus we
need to project colocation fields
- * on this trimmed row.
- * @return A hash function.
- */
- // this is more like workaround to limit scope of the refactoring,
- // but it definitely need to be fixed
- // TODO: https://issues.apache.org/jira/browse/IGNITE-18900
- public RowHashFunction<RowT> hashFunction(UUID tableId, int[] fields) {
- return hashFunctionCache.computeIfAbsent(
- new HashFunctionCacheKey(tableId, fields),
- k -> hashFunctionFactory.create(fields, tableId)
- );
- }
-
/** {@inheritDoc} */
@Override
public SchemaPlus getRootSchema() {
@@ -431,36 +400,4 @@ public class ExecutionContext<RowT> extends
AbstractQueryContext implements Data
public int hashCode() {
return Objects.hash(qryId, fragmentDesc.fragmentId());
}
-
- private static class HashFunctionCacheKey {
- private final UUID tableId;
- private final int[] fields;
-
- private HashFunctionCacheKey(UUID tableId, int[] fields) {
- this.tableId = tableId;
- this.fields = fields;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- HashFunctionCacheKey that = (HashFunctionCacheKey) o;
-
- if (!tableId.equals(that.tableId)) {
- return false;
- }
- return Arrays.equals(fields, that.fields);
- }
-
- @Override
- public int hashCode() {
- return tableId.hashCode();
- }
- }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index edc278be88..817259d19f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -550,8 +550,7 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
desc,
handler,
Commons.parametersMap(ctx.parameters()),
- txAttributes,
- new HashFunctionFactoryImpl<>(sqlSchemaManager, handler)
+ txAttributes
);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
index d019eddf6f..f77e9d4e70 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
@@ -30,7 +30,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.ToIntFunction;
import java.util.stream.Collectors;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
@@ -53,6 +52,7 @@ import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.NativeTypeSpec;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -72,14 +72,12 @@ import
org.apache.ignite.internal.sql.engine.schema.IgniteIndex.Type;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
-import
org.apache.ignite.internal.sql.engine.util.HashFunctionFactory.RowHashFunction;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import
org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
-import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.sql.SqlException;
import org.jetbrains.annotations.Nullable;
@@ -110,8 +108,7 @@ public class IgniteTableImpl extends AbstractTable
implements IgniteTable, Updat
private final List<ColumnDescriptor> columnsOrderedByPhysSchema;
- private final int[] deleteRowHashFields;
- private final int[] upsertRowHashFields;
+ private final PartitionExtractor partitionExtractor;
/**
* Constructor.
@@ -133,28 +130,19 @@ public class IgniteTableImpl extends AbstractTable
implements IgniteTable, Updat
this.clock = clock;
this.schemaRegistry = schemaRegistry;
this.schemaDescriptor = schemaRegistry.schema();
+ this.partitionExtractor = table::partitionId;
assert schemaDescriptor != null;
- BitSet keyFields = new BitSet();
List<ColumnDescriptor> tmp = new ArrayList<>(desc.columnsCount());
for (int i = 0; i < desc.columnsCount(); i++) {
- ColumnDescriptor descriptor = desc.columnDescriptor(i);
-
- tmp.add(descriptor);
-
- if (descriptor.key()) {
- keyFields.set(descriptor.logicalIndex());
- }
+ tmp.add(desc.columnDescriptor(i));
}
tmp.sort(Comparator.comparingInt(ColumnDescriptor::physicalIndex));
columnsOrderedByPhysSchema = tmp;
- upsertRowHashFields = desc.distribution().getKeys().toIntArray();
- deleteRowHashFields = project(desc.columnsCount(),
upsertRowHashFields, keyFields);
-
statistic = new StatisticsImpl();
}
@@ -168,8 +156,7 @@ public class IgniteTableImpl extends AbstractTable
implements IgniteTable, Updat
this.schemaDescriptor = t.schemaDescriptor;
this.statistic = t.statistic;
this.columnsOrderedByPhysSchema = t.columnsOrderedByPhysSchema;
- this.upsertRowHashFields = t.upsertRowHashFields;
- this.deleteRowHashFields = t.deleteRowHashFields;
+ this.partitionExtractor = t.partitionExtractor;
this.indexes.putAll(t.indexes);
}
@@ -349,23 +336,20 @@ public class IgniteTableImpl extends AbstractTable
implements IgniteTable, Updat
assert commitPartitionId != null;
UUID tableId = table.tableId();
- RowHashFunction<RowT> hashFunction = ectx.hashFunction(tableId,
upsertRowHashFields);
- ToIntFunction<RowT> partitionExtractor = row ->
IgniteUtils.safeAbs(hashFunction.hashOf(row) % table.partitions());
-
- Int2ObjectOpenHashMap<List<BinaryRow>> keyRowsByPartition = new
Int2ObjectOpenHashMap<>();
+ Int2ObjectOpenHashMap<List<BinaryRow>> rowsByPartition = new
Int2ObjectOpenHashMap<>();
for (RowT row : rows) {
- BinaryRow binaryRow = convertRow(row, ectx, false);
+ BinaryRowEx binaryRow = convertRow(row, ectx, false);
-
keyRowsByPartition.computeIfAbsent(partitionExtractor.applyAsInt(row), k -> new
ArrayList<>()).add(binaryRow);
+
rowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k -> new
ArrayList<>()).add(binaryRow);
}
- CompletableFuture<List<RowT>>[] futures = new
CompletableFuture[keyRowsByPartition.size()];
+ CompletableFuture<List<RowT>>[] futures = new
CompletableFuture[rowsByPartition.size()];
int batchNum = 0;
- for (Int2ObjectMap.Entry<List<BinaryRow>> partToRows :
keyRowsByPartition.int2ObjectEntrySet()) {
+ for (Int2ObjectMap.Entry<List<BinaryRow>> partToRows :
rowsByPartition.int2ObjectEntrySet()) {
TablePartitionId partGroupId = new TablePartitionId(tableId,
partToRows.getIntKey());
NodeWithTerm nodeWithTerm =
ectx.description().mapping().updatingTableAssignments().get(partToRows.getIntKey());
@@ -399,23 +383,20 @@ public class IgniteTableImpl extends AbstractTable
implements IgniteTable, Updat
RowHandler<RowT> handler = ectx.rowHandler();
UUID tableId = table.tableId();
- RowHashFunction<RowT> hashFunction = ectx.hashFunction(tableId,
upsertRowHashFields);
- ToIntFunction<RowT> partitionExtractor = row ->
IgniteUtils.safeAbs(hashFunction.hashOf(row) % table.partitions());
-
- Int2ObjectOpenHashMap<List<BinaryRow>> keyRowsByPartition = new
Int2ObjectOpenHashMap<>();
+ Int2ObjectOpenHashMap<List<BinaryRow>> rowsByPartition = new
Int2ObjectOpenHashMap<>();
for (RowT row : rows) {
- BinaryRow binaryRow = convertRow(row, ectx, false);
+ BinaryRowEx binaryRow = convertRow(row, ectx, false);
-
keyRowsByPartition.computeIfAbsent(partitionExtractor.applyAsInt(row), k -> new
ArrayList<>()).add(binaryRow);
+
rowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k -> new
ArrayList<>()).add(binaryRow);
}
- CompletableFuture<List<RowT>>[] futures = new
CompletableFuture[keyRowsByPartition.size()];
+ CompletableFuture<List<RowT>>[] futures = new
CompletableFuture[rowsByPartition.size()];
int batchNum = 0;
- for (Int2ObjectMap.Entry<List<BinaryRow>> partToRows :
keyRowsByPartition.int2ObjectEntrySet()) {
+ for (Int2ObjectMap.Entry<List<BinaryRow>> partToRows :
rowsByPartition.int2ObjectEntrySet()) {
TablePartitionId partGroupId = new TablePartitionId(tableId,
partToRows.getIntKey());
NodeWithTerm nodeWithTerm =
ectx.description().mapping().updatingTableAssignments().get(partToRows.getIntKey());
@@ -467,16 +448,13 @@ public class IgniteTableImpl extends AbstractTable
implements IgniteTable, Updat
assert commitPartitionId != null;
UUID tableId = table.tableId();
- RowHashFunction<RowT> hashFunction = ectx.hashFunction(tableId,
deleteRowHashFields);
-
- ToIntFunction<RowT> partitionExtractor = row ->
IgniteUtils.safeAbs(hashFunction.hashOf(row) % table.partitions());
Int2ObjectOpenHashMap<List<BinaryRow>> keyRowsByPartition = new
Int2ObjectOpenHashMap<>();
for (RowT row : rows) {
- BinaryRow binaryRow = convertRow(row, ectx, true);
+ BinaryRowEx binaryRow = convertRow(row, ectx, true);
-
keyRowsByPartition.computeIfAbsent(partitionExtractor.applyAsInt(row), k -> new
ArrayList<>()).add(binaryRow);
+
keyRowsByPartition.computeIfAbsent(partitionExtractor.fromRow(binaryRow), k ->
new ArrayList<>()).add(binaryRow);
}
CompletableFuture<List<RowT>>[] futures = new
CompletableFuture[keyRowsByPartition.size()];
@@ -503,7 +481,7 @@ public class IgniteTableImpl extends AbstractTable
implements IgniteTable, Updat
return CompletableFuture.allOf(futures);
}
- private <RowT> BinaryRow convertRow(RowT row, ExecutionContext<RowT> ectx,
boolean keyOnly) {
+ private <RowT> BinaryRowEx convertRow(RowT row, ExecutionContext<RowT>
ectx, boolean keyOnly) {
RowHandler<RowT> hnd = ectx.rowHandler();
boolean hasNulls = false;
@@ -659,4 +637,12 @@ public class IgniteTableImpl extends AbstractTable
implements IgniteTable, Updat
return new SqlException(ErrorGroups.Sql.DUPLICATE_KEYS_ERR, "PK unique
constraint is violated");
}
+
+ /**
+ * Extracts an identifier of partition from a given row.
+ */
+ @FunctionalInterface
+ private interface PartitionExtractor {
+ int fromRow(BinaryRowEx row);
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
index 5768b4a207..af6d1daf2b 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
@@ -118,7 +118,6 @@ public class RuntimeSortedIndexTest extends
IgniteAbstractTest {
null,
ArrayRowHandler.INSTANCE,
Map.of(),
- null,
null
),
RelCollations.of(ImmutableIntList.copyOf(idxCols)),
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
index 35ebd03a82..8b1e7bc413 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/AbstractExecutionTest.java
@@ -111,8 +111,7 @@ public class AbstractExecutionTest extends
IgniteAbstractTest {
fragmentDesc,
ArrayRowHandler.INSTANCE,
Map.of(),
- TxAttributes.fromTx(new NoOpTransaction("fake-test-node")),
- null
+ TxAttributes.fromTx(new NoOpTransaction("fake-test-node"))
);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
index ac78a05703..ee4680ed19 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/MergeJoinExecutionTest.java
@@ -488,7 +488,7 @@ public class MergeJoinExecutionTest extends
AbstractExecutionTest {
ExecutionContext<Object[]> ectx =
new
ExecutionContext<>(BaseQueryContext.builder().logger(log).build(), null, null,
null,
- null, null, ArrayRowHandler.INSTANCE, null, null,
null);
+ null, null, ArrayRowHandler.INSTANCE, null, null);
ExpressionFactoryImpl<Object[]> expFactory = new
ExpressionFactoryImpl<>(ectx, typeFactory, SqlConformanceEnum.DEFAULT);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 3d8e552231..868c712cbf 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -219,8 +219,7 @@ public class TestBuilders {
description,
ArrayRowHandler.INSTANCE,
Map.of(),
- TxAttributes.fromTx(new NoOpTransaction(node.name())),
- null
+ TxAttributes.fromTx(new NoOpTransaction(node.name()))
);
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index e7b176636a..8b97f5d05a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -66,6 +66,14 @@ public interface InternalTable extends ManuallyCloseable {
*/
String name();
+ /**
+ * Extracts an identifier of a partition from a given row.
+ *
+ * @param row A row to extract partition from.
+ * @return An identifier of a partition the row belongs to.
+ */
+ int partitionId(BinaryRowEx row);
+
/**
* Asynchronously gets a row with same key columns values as given one
from the table.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 61b9d5cad0..deb430c1e8 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -77,6 +77,7 @@ import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
@@ -227,7 +228,7 @@ public class InternalTableImpl implements InternalTable {
final InternalTransaction tx0 = implicit ? txManager.begin() : tx;
- int partId = partId(row);
+ int partId = partitionId(row);
TablePartitionId partGroupId = new TablePartitionId(tableId, partId);
@@ -497,7 +498,7 @@ public class InternalTableImpl implements InternalTable {
@Override
public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow,
InternalTransaction tx) {
if (tx != null && tx.isReadOnly()) {
- return evaluateReadOnlyRecipientNode(partId(keyRow))
+ return evaluateReadOnlyRecipientNode(partitionId(keyRow))
.thenCompose(recipientNode -> get(keyRow,
tx.readTimestamp(), recipientNode));
} else {
return enlistInTx(
@@ -522,7 +523,7 @@ public class InternalTableImpl implements InternalTable {
@NotNull HybridTimestamp readTimestamp,
@NotNull ClusterNode recipientNode
) {
- int partId = partId(keyRow);
+ int partId = partitionId(keyRow);
ReplicationGroupId partGroupId = partitionMap.get(partId).groupId();
return replicaSvc.invoke(recipientNode,
tableMessagesFactory.readOnlySingleRowReplicaRequest()
@@ -543,7 +544,7 @@ public class InternalTableImpl implements InternalTable {
if (firstRow == null) {
return
CompletableFuture.completedFuture(Collections.emptyList());
} else {
- return evaluateReadOnlyRecipientNode(partId(firstRow))
+ return evaluateReadOnlyRecipientNode(partitionId(firstRow))
.thenCompose(recipientNode -> getAll(keyRows,
tx.readTimestamp(), recipientNode));
}
} else {
@@ -1049,7 +1050,7 @@ public class InternalTableImpl implements InternalTable {
Int2ObjectOpenHashMap<List<BinaryRow>> keyRowsByPartition = new
Int2ObjectOpenHashMap<>();
for (BinaryRowEx keyRow : rows) {
- keyRowsByPartition.computeIfAbsent(partId(keyRow), k -> new
ArrayList<>()).add(keyRow);
+ keyRowsByPartition.computeIfAbsent(partitionId(keyRow), k -> new
ArrayList<>()).add(keyRow);
}
return keyRowsByPartition;
@@ -1141,7 +1142,7 @@ public class InternalTableImpl implements InternalTable {
@TestOnly
@Override
public int partition(BinaryRowEx keyRow) {
- return partId(keyRow);
+ return partitionId(keyRow);
}
/**
@@ -1162,16 +1163,10 @@ public class InternalTableImpl implements InternalTable
{
}));
}
- /**
- * Get partition id by key row.
- *
- * @param row Key row.
- * @return partition id.
- */
- private int partId(BinaryRowEx row) {
- int partId = row.colocationHash() % partitions;
-
- return (partId < 0) ? -partId : partId;
+ /** {@inheritDoc} */
+ @Override
+ public int partitionId(BinaryRowEx row) {
+ return IgniteUtils.safeAbs(row.colocationHash()) % partitions;
}
/**