This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5f1ac53b45 IGNITE-20396 Correct implementation of
PartitionReplicaListener#resolvePk (#2578)
5f1ac53b45 is described below
commit 5f1ac53b45ed0b39dd5f6ff9261a6004c07028d4
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Sep 13 10:54:18 2023 +0300
IGNITE-20396 Correct implementation of PartitionReplicaListener#resolvePk
(#2578)
---
.../distributed/ItTxDistributedTestSingleNode.java | 18 +-----
.../ignite/distributed/ItTxStateLocalMapTest.java | 2 +-
.../distributed/TableSchemaAwareIndexStorage.java | 22 +++++++
.../replicator/PartitionReplicaListener.java | 10 +--
.../internal/table/distributed/IndexBaseTest.java | 6 +-
.../raft/PartitionCommandListenerTest.java | 7 ++-
.../PartitionReplicaListenerIndexLockingTest.java | 11 +++-
.../replication/PartitionReplicaListenerTest.java | 72 +++++++++++-----------
.../apache/ignite/distributed/ItTxTestCluster.java | 12 +++-
.../table/impl/DummyInternalTableImpl.java | 38 ++++--------
10 files changed, 103 insertions(+), 95 deletions(-)
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 912e4bfb68..d4c271e310 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -46,7 +46,6 @@ import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionOptions;
import org.apache.ignite.utils.ClusterServiceTestUtils;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -57,9 +56,9 @@ import org.junit.jupiter.api.extension.ExtendWith;
*/
@ExtendWith(ConfigurationExtension.class)
public class ItTxDistributedTestSingleNode extends TxAbstractTest {
- protected static int ACC_TABLE_ID;
+ protected static int ACC_TABLE_ID = 0;
- protected static int CUST_TABLE_ID;
+ protected static int CUST_TABLE_ID = 1;
protected static final String ACC_TABLE_NAME = "accounts";
@@ -72,12 +71,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
@InjectConfiguration
private static GcConfiguration gcConfig;
- @InjectConfiguration(
- "mock.tables { "
- + "accounts {id = 0, primaryKey.columns: [accountNumber]},
"
- + "customers {id = 1, primaryKey.columns: [accountNumber]}"
- + "}"
- )
+ @InjectConfiguration
private static TablesConfiguration tablesConfig;
/**
@@ -120,12 +114,6 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
this.testInfo = testInfo;
}
- @BeforeAll
- static void initTableIds() {
- ACC_TABLE_ID = tablesConfig.value().tables().get(ACC_TABLE_NAME).id();
- CUST_TABLE_ID =
tablesConfig.value().tables().get(CUST_TABLE_NAME).id();
- }
-
/**
* Initialize the test state.
*/
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
index 6cb7c47d70..97dadae646 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
@@ -67,7 +67,7 @@ public class ItTxStateLocalMapTest extends IgniteAbstractTest
{
@InjectConfiguration
private static GcConfiguration gcConfig;
- @InjectConfiguration("mock.tables.foo.primaryKey.columns: [col1]")
+ @InjectConfiguration
private static TablesConfiguration tablesConfig;
private final TestInfo testInfo;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
index e4a094646d..6198a9bbfb 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
@@ -17,13 +17,16 @@
package org.apache.ignite.internal.table.distributed;
+import java.nio.ByteBuffer;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.ColumnsExtractor;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.index.HashIndexStorage;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.util.Cursor;
/**
@@ -35,6 +38,8 @@ public class TableSchemaAwareIndexStorage {
private final IndexStorage storage;
private final ColumnsExtractor indexRowResolver;
+ private final int columnCount;
+
/** Constructs the object. */
public TableSchemaAwareIndexStorage(
int indexId,
@@ -44,6 +49,14 @@ public class TableSchemaAwareIndexStorage {
this.indexId = indexId;
this.storage = storage;
this.indexRowResolver = indexRowResolver;
+
+ if (storage instanceof HashIndexStorage) {
+ columnCount = ((HashIndexStorage)
storage).indexDescriptor().columns().size();
+ } else if (storage instanceof SortedIndexStorage) {
+ columnCount = ((SortedIndexStorage)
storage).indexDescriptor().columns().size();
+ } else {
+ throw new IllegalArgumentException("Unknown index type: " +
storage);
+ }
}
/** Returns an identifier of the index. */
@@ -93,4 +106,13 @@ public class TableSchemaAwareIndexStorage {
public IndexStorage storage() {
return storage;
}
+
+ /**
+ * Creates the binary tuple buffer according to the index.
+ *
+ * @param buffer Buffer with a binary tuple.
+ */
+ public BinaryTuple resolve(ByteBuffer buffer) {
+ return new BinaryTuple(columnCount, buffer);
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 7bb2587837..8b785381c4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -258,8 +258,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
private final TablesConfiguration tablesConfig;
- private final int pkLength;
-
/** Placement driver. */
private final PlacementDriver placementDriver;
@@ -338,12 +336,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
cursors = new
ConcurrentSkipListMap<>(IgniteUuid.globalOrderComparator());
schemaCompatValidator = new SchemaCompatValidator(schemas,
catalogTables);
-
- TableView tableConfig = findTableView(tablesConfig.tables().value(),
tableId);
-
- assert tableConfig != null;
-
- pkLength = tableConfig.primaryKey().columns().length;
}
@Override
@@ -2207,7 +2199,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
private BinaryTuple resolvePk(ByteBuffer bytes) {
- return new BinaryTuple(pkLength, bytes);
+ return pkIndexStorage.get().resolve(bytes);
}
private List<BinaryTuple> resolvePks(List<ByteBuffer> bytesList) {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index be3002e67e..ad2d717695 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -91,7 +91,6 @@ public abstract class IndexBaseTest extends
BaseMvStoragesTest {
GcUpdateHandler gcUpdateHandler;
-
public static UUID getTxId() {
return TX_ID;
}
@@ -102,7 +101,10 @@ public abstract class IndexBaseTest extends
BaseMvStoragesTest {
int sortedIndexId = 2;
int hashIndexId = 3;
- pkInnerStorage = new TestHashIndexStorage(PARTITION_ID, null);
+ pkInnerStorage = new TestHashIndexStorage(PARTITION_ID, new
StorageHashIndexDescriptor(pkIndexId, List.of(
+ new StorageHashIndexColumnDescriptor("INTKEY",
NativeTypes.INT32, false),
+ new StorageHashIndexColumnDescriptor("STRKEY",
NativeTypes.STRING, false)
+ )));
TableSchemaAwareIndexStorage pkStorage = new
TableSchemaAwareIndexStorage(
pkIndexId,
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index c0dc80fc35..c9aa2201c8 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -82,6 +82,8 @@ import
org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
+import
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.StorageHashIndexColumnDescriptor;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.table.distributed.LowWatermark;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
@@ -146,7 +148,10 @@ public class PartitionCommandListenerTest extends
BaseIgniteAbstractTest {
/** Primary index. */
private final TableSchemaAwareIndexStorage pkStorage = new
TableSchemaAwareIndexStorage(
1,
- new TestHashIndexStorage(PARTITION_ID, null),
+ new TestHashIndexStorage(
+ PARTITION_ID,
+ new StorageHashIndexDescriptor(1, List.of(new
StorageHashIndexColumnDescriptor("key", NativeTypes.INT32, false)))
+ ),
BinaryRowConverter.keyExtractor(SCHEMA)
);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index 2aeb6e1fd4..baeac418a4 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -69,6 +69,8 @@ import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.impl.TestMvTableStorage;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
+import
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.StorageHashIndexColumnDescriptor;
import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
import
org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
@@ -137,7 +139,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
@BeforeAll
public static void beforeAll(
@InjectConfiguration GcConfiguration gcConfig,
- @InjectConfiguration("mock.tables.foo.primaryKey.columns: [id]")
TablesConfiguration tablesConfig
+ @InjectConfiguration TablesConfiguration tablesConfig
) {
RaftGroupService mockRaftClient = mock(RaftGroupService.class);
@@ -154,9 +156,14 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
row2HashKeyConverter =
BinaryRowConverter.keyExtractor(schemaDescriptor);
+ StorageHashIndexDescriptor pkIndexDescriptor = new
StorageHashIndexDescriptor(
+ PK_INDEX_ID,
+ List.of(new StorageHashIndexColumnDescriptor("ID",
NativeTypes.INT32, false))
+ );
+
TableSchemaAwareIndexStorage hashIndexStorage = new
TableSchemaAwareIndexStorage(
PK_INDEX_ID,
- new TestHashIndexStorage(PART_ID, null),
+ new TestHashIndexStorage(PART_ID, pkIndexDescriptor),
row2HashKeyConverter
);
pkStorage = new Lazy<>(() -> hashIndexStorage);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 460826eadb..cd79c5a519 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -201,8 +201,7 @@ import org.mockito.quality.Strictness;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class PartitionReplicaListenerTest extends IgniteAbstractTest {
- /** Partition id. */
- private static final int partId = 0;
+ private static final int PART_ID = 0;
private static final int CURRENT_SCHEMA_VERSION = 1;
@@ -212,13 +211,12 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private static final int FUTURE_SCHEMA_ROW_INDEXED_VALUE = 0;
- /** Table id. */
- private final int tblId = 1;
+ private static final int TABLE_ID = 1;
private final Map<UUID, Set<RowId>> pendingRows = new
ConcurrentHashMap<>();
/** The storage stores partition data. */
- private final TestMvPartitionStorage testMvPartitionStorage = new
TestMvPartitionStorage(partId);
+ private final TestMvPartitionStorage testMvPartitionStorage = new
TestMvPartitionStorage(PART_ID);
private final LockManager lockManager = new HeapLockManager();
@@ -242,7 +240,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
v = new HashSet<>();
}
- RowId rowId = new RowId(partId, ((UpdateCommand)
cmd).rowUuid());
+ RowId rowId = new RowId(PART_ID, ((UpdateCommand)
cmd).rowUuid());
v.add(rowId);
return v;
@@ -259,7 +257,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new
TableMessagesFactory();
/** Partition group id. */
- private final TablePartitionId grpId = new TablePartitionId(tblId, partId);
+ private final TablePartitionId grpId = new TablePartitionId(TABLE_ID,
PART_ID);
/** Hybrid clock. */
private final HybridClock clock = new HybridClockImpl();
@@ -313,7 +311,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private KvMarshaller<TestKey, TestValue> kvMarshallerVersion2;
private final CatalogTableDescriptor tableDescriptor = new
CatalogTableDescriptor(
- tblId, 1, "table", 1, CURRENT_SCHEMA_VERSION,
+ TABLE_ID, 1, "table", 1, CURRENT_SCHEMA_VERSION,
List.of(
new CatalogTableColumnDescriptor("intKey",
ColumnType.INT32, false, 0, 0, 0, null),
new CatalogTableColumnDescriptor("strKey",
ColumnType.STRING, false, 0, 0, 0, null),
@@ -359,7 +357,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
@BeforeEach
public void beforeTest(
@InjectConfiguration GcConfiguration gcConfig,
- @InjectConfiguration("mock.tables.foo.primaryKey.columns: [foo,
bar]") TablesConfiguration tablesConfig
+ @InjectConfiguration TablesConfiguration tablesConfig
) {
when(mockRaftClient.refreshAndGetLeaderWithTerm()).thenAnswer(invocationOnMock
-> {
if (!localLeader) {
@@ -420,11 +418,11 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
pkStorageSupplier = new Lazy<>(() -> new TableSchemaAwareIndexStorage(
pkIndexId,
- new TestHashIndexStorage(partId,
mock(StorageHashIndexDescriptor.class)),
+ new TestHashIndexStorage(PART_ID,
mock(StorageHashIndexDescriptor.class)),
row2Tuple
));
- SortedIndexStorage indexStorage = new TestSortedIndexStorage(partId,
new StorageSortedIndexDescriptor(sortedIndexId, List.of(
+ SortedIndexStorage indexStorage = new TestSortedIndexStorage(PART_ID,
new StorageSortedIndexDescriptor(sortedIndexId, List.of(
new StorageSortedIndexColumnDescriptor("intVal",
NativeTypes.INT32, false, true)
)));
@@ -435,14 +433,14 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
hashIndexStorage = new TableSchemaAwareIndexStorage(
hashIndexId,
- new TestHashIndexStorage(partId, new
StorageHashIndexDescriptor(hashIndexId, List.of(
+ new TestHashIndexStorage(PART_ID, new
StorageHashIndexDescriptor(hashIndexId, List.of(
new StorageHashIndexColumnDescriptor("intVal",
NativeTypes.INT32, false)
))),
columnsExtractor
);
IndexLocker pkLocker = new HashIndexLocker(pkIndexId, true,
lockManager, row2Tuple);
- IndexLocker sortedIndexLocker = new SortedIndexLocker(sortedIndexId,
partId, lockManager, indexStorage, row2Tuple);
+ IndexLocker sortedIndexLocker = new SortedIndexLocker(sortedIndexId,
PART_ID, lockManager, indexStorage, row2Tuple);
IndexLocker hashIndexLocker = new HashIndexLocker(hashIndexId, false,
lockManager, row2Tuple);
IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler(
@@ -463,8 +461,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
txManager,
lockManager,
Runnable::run,
- partId,
- tblId,
+ PART_ID,
+ TABLE_ID,
() -> Map.of(pkLocker.id(), pkLocker, sortedIndexId,
sortedIndexLocker, hashIndexId, hashIndexLocker),
pkStorageSupplier,
() -> Map.of(sortedIndexId, sortedIndexStorage, hashIndexId,
hashIndexStorage),
@@ -473,7 +471,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
txStateStorage,
transactionStateResolver,
new StorageUpdateHandler(
- partId,
+ PART_ID,
partitionDataStorage,
gcConfig,
mock(LowWatermark.class),
@@ -482,7 +480,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
),
schemas,
localNode,
- new TestMvTableStorage(tblId, DEFAULT_PARTITION_COUNT),
+ new TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT),
mock(IndexBuilder.class),
schemaSyncService,
catalogService,
@@ -604,10 +602,10 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
UUID txId = newTxId();
BinaryRow testBinaryKey = nextBinaryKey();
BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new
TestValue(1, "v1"));
- var rowId = new RowId(partId);
+ var rowId = new RowId(PART_ID);
pkStorage().put(testBinaryRow, rowId);
- testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, tblId,
partId);
+ testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
testMvPartitionStorage.commitWrite(rowId, clock.now());
CompletableFuture<?> fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
@@ -627,11 +625,11 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
UUID txId = newTxId();
BinaryRow testBinaryKey = nextBinaryKey();
BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new
TestValue(1, "v1"));
- var rowId = new RowId(partId);
+ var rowId = new RowId(PART_ID);
txState = TxState.COMMITED;
pkStorage().put(testBinaryRow, rowId);
- testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, tblId,
partId);
+ testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
CompletableFuture<?> fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
.groupId(grpId)
@@ -650,10 +648,10 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
UUID txId = newTxId();
BinaryRow testBinaryKey = nextBinaryKey();
BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new
TestValue(1, "v1"));
- var rowId = new RowId(partId);
+ var rowId = new RowId(PART_ID);
pkStorage().put(testBinaryRow, rowId);
- testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, tblId,
partId);
+ testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
CompletableFuture<?> fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
.groupId(grpId)
@@ -672,11 +670,11 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
UUID txId = newTxId();
BinaryRow testBinaryKey = nextBinaryKey();
BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new
TestValue(1, "v1"));
- var rowId = new RowId(partId);
+ var rowId = new RowId(PART_ID);
txState = TxState.ABORTED;
pkStorage().put(testBinaryRow, rowId);
- testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, tblId,
partId);
+ testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, TABLE_ID,
PART_ID);
CompletableFuture<?> fut =
partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest()
.groupId(grpId)
@@ -696,7 +694,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
int sortedIndexId = sortedIndexStorage.id();
IntStream.range(0, 6).forEach(i -> {
- RowId rowId = new RowId(partId);
+ RowId rowId = new RowId(PART_ID);
int indexedVal = i % 5; // Non-uniq index.
TestValue testValue = new TestValue(indexedVal, "val" + i);
@@ -704,7 +702,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
new BinaryTupleBuilder(1).appendInt(indexedVal).build());
BinaryRow storeRow = binaryRow(key(nextBinaryKey()), testValue);
- testMvPartitionStorage.addWrite(rowId, storeRow, txId, tblId,
partId);
+ testMvPartitionStorage.addWrite(rowId, storeRow, txId, TABLE_ID,
PART_ID);
sortedIndexStorage.storage().put(new IndexRowImpl(indexedValue,
rowId));
testMvPartitionStorage.commitWrite(rowId, clock.now());
});
@@ -803,7 +801,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
int sortedIndexId = sortedIndexStorage.id();
IntStream.range(0, 6).forEach(i -> {
- RowId rowId = new RowId(partId);
+ RowId rowId = new RowId(PART_ID);
int indexedVal = i % 5; // Non-uniq index.
TestValue testValue = new TestValue(indexedVal, "val" + i);
@@ -811,7 +809,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
new BinaryTupleBuilder(1).appendInt(indexedVal).build());
BinaryRow storeRow = binaryRow(key(nextBinaryKey()), testValue);
- testMvPartitionStorage.addWrite(rowId, storeRow, txId, tblId,
partId);
+ testMvPartitionStorage.addWrite(rowId, storeRow, txId, TABLE_ID,
PART_ID);
sortedIndexStorage.storage().put(new IndexRowImpl(indexedValue,
rowId));
testMvPartitionStorage.commitWrite(rowId, clock.now());
});
@@ -905,7 +903,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
int hashIndexId = hashIndexStorage.id();
IntStream.range(0, 7).forEach(i -> {
- RowId rowId = new RowId(partId);
+ RowId rowId = new RowId(PART_ID);
int indexedVal = i % 2; // Non-uniq index.
TestValue testValue = new TestValue(indexedVal, "val" + i);
@@ -913,7 +911,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
new BinaryTupleBuilder(1).appendInt(indexedVal).build());
BinaryRow storeRow = binaryRow(key(nextBinaryKey()), testValue);
- testMvPartitionStorage.addWrite(rowId, storeRow, txId, tblId,
partId);
+ testMvPartitionStorage.addWrite(rowId, storeRow, txId, TABLE_ID,
PART_ID);
hashIndexStorage.storage().put(new IndexRowImpl(indexedValue,
rowId));
testMvPartitionStorage.commitWrite(rowId, clock.now());
});
@@ -1118,8 +1116,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
private TablePartitionIdMessage commitPartitionId() {
return TABLE_MESSAGES_FACTORY.tablePartitionIdMessage()
- .partitionId(partId)
- .tableId(tblId)
+ .partitionId(PART_ID)
+ .tableId(TABLE_ID)
.build();
}
@@ -1592,14 +1590,14 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
TestKey key = nextKey();
BinaryRow futureSchemaVersionRow = binaryRow(key, new TestValue(2,
"v2"), kvMarshallerVersion2);
- var rowId = new RowId(partId);
+ var rowId = new RowId(PART_ID);
BinaryTuple indexedValue = new BinaryTuple(1,
new
BinaryTupleBuilder(1).appendInt(FUTURE_SCHEMA_ROW_INDEXED_VALUE).build()
);
pkStorage().put(futureSchemaVersionRow, rowId);
- testMvPartitionStorage.addWrite(rowId, futureSchemaVersionRow,
futureSchemaVersionTxId, tblId, partId);
+ testMvPartitionStorage.addWrite(rowId, futureSchemaVersionRow,
futureSchemaVersionTxId, TABLE_ID, PART_ID);
sortedIndexStorage.storage().put(new IndexRowImpl(indexedValue,
rowId));
testMvPartitionStorage.commitWrite(rowId, clock.now());
@@ -1875,8 +1873,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION);
when(tableVersion2.tableVersion()).thenReturn(NEXT_SCHEMA_VERSION);
- when(catalogTables.table(tblId,
txBeginTs.longValue())).thenReturn(tableVersion1);
- when(catalogTables.table(eq(tblId),
gt(txBeginTs.longValue()))).thenReturn(tableVersion2);
+ when(catalogTables.table(TABLE_ID,
txBeginTs.longValue())).thenReturn(tableVersion1);
+ when(catalogTables.table(eq(TABLE_ID),
gt(txBeginTs.longValue()))).thenReturn(tableVersion2);
CompletableFuture<?> future = listenerInvocation.invoke(txId, key);
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index e69a6fdf90..05a96afec7 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -35,6 +35,7 @@ import static org.mockito.Mockito.when;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -84,6 +85,8 @@ import
org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.impl.TestMvTableStorage;
+import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
+import
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.StorageHashIndexColumnDescriptor;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.HashIndexLocker;
@@ -413,9 +416,16 @@ public class ItTxTestCluster {
ColumnsExtractor row2Tuple =
BinaryRowConverter.keyExtractor(schemaDescriptor);
+ StorageHashIndexDescriptor pkIndexDescriptor =
mock(StorageHashIndexDescriptor.class);
+
+ when(pkIndexDescriptor.columns()).then(invocation ->
Collections.nCopies(
+ schemaDescriptor.keyColumns().columns().length,
+ mock(StorageHashIndexColumnDescriptor.class)
+ ));
+
Lazy<TableSchemaAwareIndexStorage> pkStorage = new Lazy<>(()
-> new TableSchemaAwareIndexStorage(
indexId,
- new TestHashIndexStorage(partId, null),
+ new TestHashIndexStorage(partId, pkIndexDescriptor),
row2Tuple
));
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index a777af6887..bb49f42910 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -21,14 +21,13 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import java.io.Serializable;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -36,16 +35,12 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
-import java.util.stream.Stream;
import javax.naming.OperationNotSupportedException;
import org.apache.ignite.configuration.ConfigurationValue;
-import org.apache.ignite.configuration.NamedConfigurationTree;
-import org.apache.ignite.configuration.NamedListView;
import org.apache.ignite.distributed.TestPartitionDataStorage;
import org.apache.ignite.internal.TestHybridClock;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
-import org.apache.ignite.internal.configuration.NamedListConfiguration;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -69,13 +64,12 @@ import org.apache.ignite.internal.schema.ColumnsExtractor;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
-import org.apache.ignite.internal.schema.configuration.TableChange;
-import org.apache.ignite.internal.schema.configuration.TableConfiguration;
-import org.apache.ignite.internal.schema.configuration.TableView;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
+import
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.StorageHashIndexColumnDescriptor;
import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.table.distributed.HashIndexLocker;
import org.apache.ignite.internal.table.distributed.IndexLocker;
@@ -315,9 +309,15 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
ColumnsExtractor row2Tuple = BinaryRowConverter.keyExtractor(schema);
+ StorageHashIndexDescriptor pkIndexDescriptor =
mock(StorageHashIndexDescriptor.class);
+
+ when(pkIndexDescriptor.columns()).then(
+ invocation ->
Collections.nCopies(schema.keyColumns().columns().length,
mock(StorageHashIndexColumnDescriptor.class))
+ );
+
Lazy<TableSchemaAwareIndexStorage> pkStorage = new Lazy<>(() -> new
TableSchemaAwareIndexStorage(
indexId,
- new TestHashIndexStorage(PART_ID, null),
+ new TestHashIndexStorage(PART_ID, pkIndexDescriptor),
row2Tuple
));
@@ -352,22 +352,6 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
lenient().when(catalogTables.table(anyInt(),
anyLong())).thenReturn(tableDescriptor);
lenient().when(tableDescriptor.tableVersion()).thenReturn(1);
- TablesConfiguration tablesConfig = mock(TablesConfiguration.class);
- NamedConfigurationTree<TableConfiguration, TableView, TableChange>
tablesTree = mock(NamedListConfiguration.class);
- NamedListView<TableView> tablesList = mock(NamedListView.class);
- TableView tableConfig = mock(TableView.class, RETURNS_DEEP_STUBS);
-
- when(tablesConfig.tables()).thenReturn(tablesTree);
- when(tablesTree.value()).thenReturn(tablesList);
- when(tablesList.stream()).thenReturn(Stream.of(tableConfig));
-
- String[] primaryKeyColumns =
Arrays.stream(schema.keyColumns().columns())
- .map(Column::name)
- .toArray(String[]::new);
-
- when(tableConfig.id()).thenReturn(tableId);
- when(tableConfig.primaryKey().columns()).thenReturn(primaryKeyColumns);
-
replicaListener = new PartitionReplicaListener(
mvPartStorage,
raftGroupServiceByPartitionId.get(PART_ID),
@@ -391,7 +375,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
mock(SchemaSyncService.class, invocation ->
completedFuture(null)),
mock(CatalogService.class),
catalogTables,
- tablesConfig,
+ mock(TablesConfiguration.class),
new TestPlacementDriver(LOCAL_NODE.name())
);