This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9bc629a998 IGNITE-22618 Remove MvPartitionStorage#rowsCount (#4020)
9bc629a998 is described below
commit 9bc629a9982ab0bcc0d3c8588b191e40e663aaac
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Mon Jul 1 10:57:52 2024 +0300
IGNITE-22618 Remove MvPartitionStorage#rowsCount (#4020)
---
...IndexNodeFinishedRwTransactionsCheckerTest.java | 15 ++++-
.../internal/storage/MvPartitionStorage.java | 11 ----
.../storage/ThreadAssertingMvPartitionStorage.java | 7 --
.../storage/AbstractMvTableStorageTest.java | 3 -
.../storage/impl/TestMvPartitionStorage.java | 9 +--
.../mv/AbstractPageMemoryMvPartitionStorage.java | 13 ----
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 20 ------
.../rebalance/ItRebalanceRecoveryTest.java | 7 +-
.../rebalance/ItRebalanceTriggersRecoveryTest.java | 19 +++---
.../table/distributed/IndexCleanupTest.java | 17 +----
.../table/distributed/StorageCleanupTest.java | 76 +++++-----------------
.../distributed/StorageUpdateHandlerTest.java | 26 +++-----
12 files changed, 56 insertions(+), 167 deletions(-)
diff --git
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexNodeFinishedRwTransactionsCheckerTest.java
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexNodeFinishedRwTransactionsCheckerTest.java
index 8120d9b4a3..a64ea61f73 100644
---
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexNodeFinishedRwTransactionsCheckerTest.java
+++
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexNodeFinishedRwTransactionsCheckerTest.java
@@ -34,10 +34,12 @@ import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.index.message.IndexMessagesFactory;
import
org.apache.ignite.internal.index.message.IsNodeFinishedRwTransactionsStartedBeforeResponse;
import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import
org.apache.ignite.internal.storage.impl.schema.TestProfileConfigurationSchema;
@@ -228,15 +230,22 @@ public class ItIndexNodeFinishedRwTransactionsCheckerTest
extends ClusterPerClas
InternalTable table = tableImpl().internalTable();
return IntStream.range(0, table.partitions())
- .mapToLong(partitionId ->
table.storage().getMvPartition(partitionId).rowsCount())
+ .mapToObj(table.storage()::getMvPartition)
+
.mapToLong(ItIndexNodeFinishedRwTransactionsCheckerTest::partitionSize)
.toArray();
});
}
- private static int differences(long[] partitionSizes0, long[]
partitionsSizes1) {
+ private static long partitionSize(MvPartitionStorage partitionStorage) {
+ try (PartitionTimestampCursor cursor =
partitionStorage.scan(HybridTimestamp.MAX_VALUE)) {
+ return cursor.stream().count();
+ }
+ }
+
+ private static long differences(long[] partitionSizes0, long[]
partitionsSizes1) {
assertEquals(partitionSizes0.length, partitionsSizes1.length);
- return (int) IntStream.range(0, partitionSizes0.length)
+ return IntStream.range(0, partitionSizes0.length)
.filter(i -> partitionSizes0[i] != partitionsSizes1[i])
.count();
}
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index 3c624a8a7b..51a8dbddbb 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -260,17 +260,6 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
*/
@Nullable BinaryRow vacuum(GcEntry entry);
- /**
- * Returns rows count belongs to current storage.
- *
- * @return Rows count.
- * @throws StorageException If failed to obtain size.
- * @deprecated It's not yet defined what a "count" is. This value is not
easily defined for multi-versioned storages.
- * TODO IGNITE-16769 Implement correct PartitionStorage rows count
calculation.
- */
- @Deprecated
- long rowsCount() throws StorageException;
-
/**
* Updates the current lease start time in the storage.
*
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
index 8ef1b14aff..50308b7122 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
@@ -154,13 +154,6 @@ public class ThreadAssertingMvPartitionStorage implements
MvPartitionStorage, Wr
return partitionStorage.vacuum(entry);
}
- @Override
- public long rowsCount() throws StorageException {
- assertThreadAllowsToRead();
-
- return partitionStorage.rowsCount();
- }
-
@Override
public void updateLease(long leaseStartTime) {
assertThreadAllowsToWrite();
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 7b66179283..05fc2a551a 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -614,8 +614,6 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
assertThrows(StorageDestroyedException.class, () ->
storage.scanVersions(rowId));
assertThrows(StorageDestroyedException.class, () ->
storage.closestRowId(rowId));
-
- assertThrows(StorageDestroyedException.class, storage::rowsCount);
}
@Test
@@ -1472,7 +1470,6 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
assertThrows(StorageRebalanceException.class, () ->
storage.scanVersions(rowId));
assertThrows(StorageRebalanceException.class, () ->
storage.scan(clock.now()));
assertThrows(StorageRebalanceException.class, () ->
storage.closestRowId(rowId));
- assertThrows(StorageRebalanceException.class, storage::rowsCount);
return null;
});
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 0ca9ed4aea..33aa0ab287 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -333,7 +333,7 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
}
@Override
- public ReadResult read(RowId rowId, @Nullable HybridTimestamp timestamp) {
+ public ReadResult read(RowId rowId, HybridTimestamp timestamp) {
checkStorageClosedOrInProcessOfRebalance();
if (rowId.partitionId() != partitionId) {
@@ -620,13 +620,6 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
return versionChainToRemove.row;
}
- @Override
- public long rowsCount() {
- checkStorageClosedOrInProcessOfRebalance();
-
- return map.size();
- }
-
@Override
public void updateLease(long leaseStartTime) {
checkStorageClosed();
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index e3e5fbd4bb..fe393ffe04 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -573,19 +573,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage
implements MvPartitio
});
}
- @Override
- public long rowsCount() {
- return busy(() -> {
- throwExceptionIfStorageNotInRunnableState();
-
- try {
- return renewableState.versionChainTree().size();
- } catch (IgniteInternalCheckedException e) {
- throw new StorageException("Error occurred while fetching the
size.", e);
- }
- });
- }
-
@Override
public void close() {
if (!transitionToTerminalState(StorageState.CLOSED)) {
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 0502432ff3..e3253aa205 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -976,26 +976,6 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
return helper.getRowId(keyBuffer, ROW_ID_OFFSET);
}
- @Override
- public long rowsCount() {
- return busy(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
-
- try (RocksIterator it = db.newIterator(helper.partCf,
helper.scanReadOpts)) {
- it.seek(helper.partitionStartPrefix());
-
- long size = 0;
-
- while (it.isValid()) {
- ++size;
- it.next();
- }
-
- return size;
- }
- });
- }
-
@Override
public void updateLease(long leaseStartTime) {
busy(() -> {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
index 8cbec068b2..0205f4f557 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceRecoveryTest.java
@@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.test.WatchListenerInhibitor;
import org.junit.jupiter.api.Test;
@@ -35,6 +36,8 @@ import org.junit.jupiter.api.Test;
* Tests for recovery of the rebalance procedure.
*/
public class ItRebalanceRecoveryTest extends ClusterPerTestIntegrationTest {
+ private static final int PARTITION_ID = 0;
+
@Override
protected int initialNodes() {
return 2;
@@ -72,8 +75,8 @@ public class ItRebalanceRecoveryTest extends
ClusterPerTestIntegrationTest {
MvPartitionStorage storage = tableManager.tableView("TEST")
.internalTable()
.storage()
- .getMvPartition(0);
+ .getMvPartition(PARTITION_ID);
- return bypassingThreadAssertions(() -> storage.rowsCount() != 0);
+ return storage != null && bypassingThreadAssertions(() ->
storage.closestRowId(RowId.lowestRowId(PARTITION_ID))) != null;
}
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java
index 24357590de..a0361ca4ab 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTriggersRecoveryTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.test.WatchListenerInhibitor;
import org.junit.jupiter.api.Disabled;
@@ -49,6 +50,8 @@ import org.junit.jupiter.api.Test;
* Tests for recovery of the rebalance procedure.
*/
public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTest {
+ private static final int PARTITION_ID = 0;
+
private static final String US_NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"
+ " network: {\n"
+ " port: {},\n"
@@ -109,7 +112,7 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
// Check that metastore node schedule the rebalance procedure.
assertTrue(waitForCondition(
- (() -> getPartitionPendingClusterNodes(node(0),
0).equals(Set.of(
+ (() -> getPartitionPendingClusterNodes(node(0),
PARTITION_ID).equals(Set.of(
Assignment.forPeer(node(2).name()),
Assignment.forPeer(node(1).name())))),
10_000));
@@ -118,7 +121,7 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
Integer tableId = getTableId(node(0).catalogManager(), "TEST", new
HybridClockImpl().nowLong());
node(0)
.metaStorageManager()
- .remove(pendingPartAssignmentsKey(new
TablePartitionId(tableId, 0))).join();
+ .remove(pendingPartAssignmentsKey(new
TablePartitionId(tableId, PARTITION_ID))).join();
restartNode(1);
restartNode(2);
@@ -154,7 +157,7 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
// Check that metastore node schedule the rebalance procedure.
assertTrue(waitForCondition(
- (() -> getPartitionPendingClusterNodes(node(0),
0).equals(Set.of(
+ (() -> getPartitionPendingClusterNodes(node(0),
PARTITION_ID).equals(Set.of(
Assignment.forPeer(node(2).name()),
Assignment.forPeer(node(1).name())))),
10_000));
@@ -163,7 +166,7 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
Integer tableId = getTableId(node(0).catalogManager(), "TEST", new
HybridClockImpl().nowLong());
node(0)
.metaStorageManager()
- .remove(pendingPartAssignmentsKey(new
TablePartitionId(tableId, 0))).join();
+ .remove(pendingPartAssignmentsKey(new
TablePartitionId(tableId, PARTITION_ID))).join();
restartNode(1);
restartNode(2);
@@ -199,7 +202,7 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
// Check that new replica from 'global' zone received the data and
rebalance really happened.
assertTrue(waitForCondition(() -> containsPartition(cluster.node(2)),
10_000));
assertTrue(waitForCondition(
- (() -> getPartitionPendingClusterNodes(node(0),
0).equals(Set.of())),
+ (() -> getPartitionPendingClusterNodes(node(0),
PARTITION_ID).equals(Set.of())),
10_000));
TablePartitionId tablePartitionId =
@@ -207,7 +210,7 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
getTableId(node(0).catalogManager(),
"TEST",
new HybridClockImpl().nowLong()),
- 0
+ PARTITION_ID
);
long pendingsKeysRevisionBeforeRecovery = node(0).metaStorageManager()
.get(pendingPartAssignmentsKey(tablePartitionId))
@@ -246,8 +249,8 @@ public class ItRebalanceTriggersRecoveryTest extends
ClusterPerTestIntegrationTe
MvPartitionStorage storage = tableManager.tableView("TEST")
.internalTable()
.storage()
- .getMvPartition(0);
+ .getMvPartition(PARTITION_ID);
- return storage != null &&
bypassingThreadAssertions(storage::rowsCount) != 0;
+ return storage != null && bypassingThreadAssertions(() ->
storage.closestRowId(RowId.lowestRowId(PARTITION_ID))) != null;
}
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
index ece8deb0ae..c2081156ce 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexCleanupTest.java
@@ -20,11 +20,11 @@ package org.apache.ignite.internal.table.distributed;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.not;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Set;
import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.RowId;
import org.junit.jupiter.params.ParameterizedTest;
@@ -42,14 +42,12 @@ public class IndexCleanupTest extends IndexBaseTest {
writer.addWrite(storageUpdateHandler, rowUuid, row);
- assertEquals(1, storage.rowsCount());
assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
storageUpdateHandler.switchWriteIntents(getTxId(), false, null, null);
- assertEquals(0, storage.rowsCount());
assertTrue(pkInnerStorage.allRowsIds().isEmpty());
assertTrue(sortedInnerStorage.allRowsIds().isEmpty());
assertTrue(hashInnerStorage.allRowsIds().isEmpty());
@@ -66,7 +64,7 @@ public class IndexCleanupTest extends IndexBaseTest {
writer.addWrite(storageUpdateHandler, rowUuid, row);
// Write intent is in the storage.
- assertEquals(1, storage.rowsCount());
+ assertTrue(storage.read(rowId,
HybridTimestamp.MAX_VALUE).isWriteIntent());
// Indexes are already in the storage.
assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
@@ -76,7 +74,7 @@ public class IndexCleanupTest extends IndexBaseTest {
writer.addWrite(storageUpdateHandler, rowUuid, null);
// Write intent is in the storage.
- assertEquals(1, storage.rowsCount());
+ assertTrue(storage.read(rowId,
HybridTimestamp.MAX_VALUE).isWriteIntent());
// But indexes are removed.
assertTrue(pkInnerStorage.allRowsIds().isEmpty());
@@ -96,7 +94,6 @@ public class IndexCleanupTest extends IndexBaseTest {
storageUpdateHandler.switchWriteIntents(getTxId(), false, null, null);
- assertEquals(0, storage.rowsCount());
assertTrue(pkInnerStorage.allRowsIds().isEmpty());
assertTrue(sortedInnerStorage.allRowsIds().isEmpty());
assertTrue(hashInnerStorage.allRowsIds().isEmpty());
@@ -121,8 +118,6 @@ public class IndexCleanupTest extends IndexBaseTest {
storageUpdateHandler.switchWriteIntents(getTxId(), false, null, null);
- assertEquals(1, storage.rowsCount());
-
Set<RowId> pkRows = pkInnerStorage.allRowsIds();
Set<RowId> sortedRows = sortedInnerStorage.allRowsIds();
Set<RowId> hashRows = hashInnerStorage.allRowsIds();
@@ -154,8 +149,6 @@ public class IndexCleanupTest extends IndexBaseTest {
storageUpdateHandler.switchWriteIntents(getTxId(), false, null, null);
- assertEquals(1, storage.rowsCount());
-
assertTrue(inAllIndexes(row1));
assertTrue(inIndexes(row2, true, false));
}
@@ -175,7 +168,6 @@ public class IndexCleanupTest extends IndexBaseTest {
storageUpdateHandler.switchWriteIntents(getTxId(), false, null, null);
- assertEquals(1, storage.rowsCount());
assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
@@ -196,7 +188,6 @@ public class IndexCleanupTest extends IndexBaseTest {
storageUpdateHandler.switchWriteIntents(getTxId(), false, null, null);
- assertEquals(1, storage.rowsCount());
assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
@@ -216,7 +207,6 @@ public class IndexCleanupTest extends IndexBaseTest {
writer.addWrite(storageUpdateHandler, rowUuid, row);
writer.addWrite(storageUpdateHandler, rowUuid, null);
- assertEquals(1, storage.rowsCount());
assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
@@ -239,7 +229,6 @@ public class IndexCleanupTest extends IndexBaseTest {
writer.addWrite(storageUpdateHandler, rowUuid, row);
writer.addWrite(storageUpdateHandler, rowUuid, null);
- assertEquals(1, storage.rowsCount());
assertThat(pkInnerStorage.allRowsIds(), contains(rowId));
assertThat(sortedInnerStorage.allRowsIds(), contains(rowId));
assertThat(hashInnerStorage.allRowsIds(), contains(rowId));
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
index 239c8e33bd..3f509ad3fc 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.ColumnsExtractor;
import
org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
import org.apache.ignite.internal.storage.BaseMvStoragesTest;
+import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
@@ -86,9 +87,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
private static final ColumnsExtractor USER_INDEX_BINARY_TUPLE_CONVERTER =
new BinaryRowConverter(TUPLE_SCHEMA, USER_INDEX_SCHEMA);
- private TestHashIndexStorage pkInnerStorage;
- private TestSortedIndexStorage sortedInnerStorage;
- private TestHashIndexStorage hashInnerStorage;
private TestMvPartitionStorage storage;
private StorageUpdateHandler storageUpdateHandler;
private IndexUpdateHandler indexUpdateHandler;
@@ -103,7 +101,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
int sortedIndexId = 3;
int hashIndexId = 4;
- pkInnerStorage = new TestHashIndexStorage(
+ var pkInnerStorage = new TestHashIndexStorage(
PARTITION_ID,
new StorageHashIndexDescriptor(
pkIndexId,
@@ -115,13 +113,13 @@ public class StorageCleanupTest extends
BaseMvStoragesTest {
)
);
- TableSchemaAwareIndexStorage pkStorage = new
TableSchemaAwareIndexStorage(
+ var pkStorage = new TableSchemaAwareIndexStorage(
pkIndexId,
pkInnerStorage,
PK_INDEX_BINARY_TUPLE_CONVERTER
);
- sortedInnerStorage = new TestSortedIndexStorage(
+ var sortedInnerStorage = new TestSortedIndexStorage(
PARTITION_ID,
new StorageSortedIndexDescriptor(
sortedIndexId,
@@ -133,13 +131,13 @@ public class StorageCleanupTest extends
BaseMvStoragesTest {
)
);
- TableSchemaAwareIndexStorage sortedIndexStorage = new
TableSchemaAwareIndexStorage(
+ var sortedIndexStorage = new TableSchemaAwareIndexStorage(
sortedIndexId,
sortedInnerStorage,
USER_INDEX_BINARY_TUPLE_CONVERTER
);
- hashInnerStorage = new TestHashIndexStorage(
+ var hashInnerStorage = new TestHashIndexStorage(
PARTITION_ID,
new StorageHashIndexDescriptor(
hashIndexId,
@@ -151,7 +149,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
)
);
- TableSchemaAwareIndexStorage hashIndexStorage = new
TableSchemaAwareIndexStorage(
+ var hashIndexStorage = new TableSchemaAwareIndexStorage(
hashIndexId,
hashInnerStorage,
USER_INDEX_BINARY_TUPLE_CONVERTER
@@ -191,11 +189,17 @@ public class StorageCleanupTest extends
BaseMvStoragesTest {
storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(),
partitionId, row2, true, null, null, null, null);
storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(),
partitionId, row3, true, null, null, null, null);
- assertEquals(3, storage.rowsCount());
+ assertEquals(3, storageSize());
storageUpdateHandler.switchWriteIntents(txUuid, false, null, null);
- assertEquals(0, storage.rowsCount());
+ assertEquals(0, storageSize());
+ }
+
+ private long storageSize() {
+ try (PartitionTimestampCursor cursor =
storage.scan(HybridTimestamp.MAX_VALUE)) {
+ return cursor.stream().count();
+ }
}
@Test
@@ -214,13 +218,11 @@ public class StorageCleanupTest extends
BaseMvStoragesTest {
storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(),
partitionId, row2, true, null, null, null, null);
storageUpdateHandler.handleUpdate(txUuid, UUID.randomUUID(),
partitionId, row3, true, null, null, null, null);
- assertEquals(3, storage.rowsCount());
// We have three writes to the storage.
verify(storage, times(3)).addWrite(any(), any(), any(), anyInt(),
anyInt());
storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs, null);
- assertEquals(3, storage.rowsCount());
// Those writes resulted in three commits.
verify(storage, times(3)).commitWrite(any(), any());
@@ -230,7 +232,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
// And run cleanup again for the same transaction.
storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs, null);
- assertEquals(3, storage.rowsCount());
// And no invocation after, meaning idempotence of the cleanup.
verify(storage, never()).commitWrite(any(), any());
}
@@ -262,13 +263,11 @@ public class StorageCleanupTest extends
BaseMvStoragesTest {
);
storageUpdateHandler.handleUpdateAll(txUuid, rowsToUpdate,
partitionId, true, null, null, null);
- assertEquals(3, storage.rowsCount());
// We have three writes to the storage.
verify(storage, times(3)).addWrite(any(), any(), any(), anyInt(),
anyInt());
storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs, null);
- assertEquals(3, storage.rowsCount());
// Those writes resulted in three commits.
verify(storage, times(3)).commitWrite(any(), any());
@@ -278,7 +277,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
// And run cleanup again for the same transaction.
storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs, null);
- assertEquals(3, storage.rowsCount());
// And no invocation after, meaning idempotence of the cleanup.
verify(storage, never()).commitWrite(any(), any());
@@ -338,8 +336,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
storageUpdateHandler.handleUpdate(txUuid, row2Id, partitionId, row2,
false, null, null, null, null);
storageUpdateHandler.handleUpdate(txUuid, row3Id, partitionId, row3,
false, null, null, null, null);
- assertEquals(3, storage.rowsCount());
-
// Now run cleanup.
storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs, null);
@@ -407,8 +403,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
// Do not track write intents to simulate the loss of a volatile state.
storageUpdateHandler.handleUpdateAll(txUuid, rowsToUpdate,
partitionId, false, null, null, null);
- assertEquals(3, storage.rowsCount());
-
// Now run cleanup.
storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs, null);
@@ -489,8 +483,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
storageUpdateHandler.switchWriteIntents(committedTx, true, commitTs,
null);
- assertEquals(1, storage.rowsCount());
-
assertFalse(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
// Now create a new write intent over the committed data. No cleanup
should be triggered.
@@ -530,8 +522,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
storageUpdateHandler.switchWriteIntents(committedTx, true, commitTs,
null);
- assertEquals(1, storage.rowsCount());
-
assertFalse(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
// Now create a new write intent over the committed data. No cleanup
should be triggered.
@@ -568,8 +558,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row1,
true, null, null, null, null);
- assertEquals(1, storage.rowsCount());
-
assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
// Then create another one for the same row in the same transaction.
The entry will be replaced.
@@ -580,8 +568,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row2,
true, null, null, commitTs, null);
- assertEquals(1, storage.rowsCount());
-
assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
verify(storage, never()).commitWrite(any(), any());
@@ -609,8 +595,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
storageUpdateHandler.handleUpdateAll(runningTx, rowsToUpdate,
partitionId, true, null, null, null);
- assertEquals(1, storage.rowsCount());
-
assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
// Then create another one for the same row in the same transaction.
The entry will be replaced.
@@ -625,8 +609,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
// Do not track write intents to simulate the loss of a volatile state.
storageUpdateHandler.handleUpdateAll(runningTx, rowsToUpdate2,
partitionId, true, null, null, null);
- assertEquals(1, storage.rowsCount());
-
assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
verify(storage, never()).commitWrite(any(), any());
@@ -650,8 +632,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row1,
true, null, null, null, null);
- assertEquals(1, storage.rowsCount());
-
assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
// Create another one and pass `last commit time`. The previous value
should be committed automatically.
@@ -665,8 +645,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
// Previous value will be committed even though the cleanup was not
called explicitly.
storageUpdateHandler.handleUpdate(runningTx2, rowId, partitionId,
row2, true, null, null, commitTs, null);
- assertEquals(1, storage.rowsCount());
-
assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
verify(storage, times(1)).commitWrite(any(), any());
@@ -694,8 +672,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
storageUpdateHandler.handleUpdateAll(runningTx, rowsToUpdate,
partitionId, true, null, null, null);
- assertEquals(1, storage.rowsCount());
-
assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
// Create another one and pass `last commit time`. The previous value
should be committed automatically.
@@ -713,8 +689,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
storageUpdateHandler.handleUpdateAll(runningTx2, rowsToUpdate2,
partitionId, true, null, null, null);
- assertEquals(1, storage.rowsCount());
-
assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
verify(storage, times(1)).commitWrite(any(), any());
@@ -740,8 +714,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
storageUpdateHandler.switchWriteIntents(committed1, true, commitTs,
null);
- assertEquals(1, storage.rowsCount());
-
assertFalse(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
// Now add a new write intent.
@@ -765,8 +737,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
// Last commit time equal to the time of the previously committed
value => previous write intent will be reverted.
storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row3,
true, null, null, commitTs, null);
- assertEquals(1, storage.rowsCount());
-
assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
verify(storage, never()).commitWrite(any(), any());
@@ -796,8 +766,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
storageUpdateHandler.switchWriteIntents(committed1, true, commitTs,
null);
- assertEquals(1, storage.rowsCount());
-
assertFalse(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
// Now add a new write intent.
@@ -829,8 +797,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
storageUpdateHandler.handleUpdateAll(runningTx, rowsToUpdate3,
partitionId, true, null, null, null);
- assertEquals(1, storage.rowsCount());
-
assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
verify(storage, never()).commitWrite(any(), any());
@@ -856,7 +822,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
storageUpdateHandler.switchWriteIntents(committed1, true, commitTs,
null);
- assertEquals(1, storage.rowsCount());
assertFalse(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
@@ -883,8 +848,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
// Last commit time is after the time of the previously committed
value => previous write intent will be committed.
storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row3,
true, null, null, lastCommitTs, null);
- assertEquals(1, storage.rowsCount());
-
assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
verify(storage, times(1)).commitWrite(any(), any());
@@ -914,8 +877,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
storageUpdateHandler.switchWriteIntents(committed1, true, commitTs,
null);
- assertEquals(1, storage.rowsCount());
-
assertFalse(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
// Now add a new write intent.
@@ -949,8 +910,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
storageUpdateHandler.handleUpdateAll(runningTx, rowsToUpdate3,
partitionId, true, null, null, null);
- assertEquals(1, storage.rowsCount());
-
assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
verify(storage, times(1)).commitWrite(any(), any());
@@ -976,8 +935,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
storageUpdateHandler.switchWriteIntents(committed1, true, commitTs,
null);
- assertEquals(1, storage.rowsCount());
-
assertFalse(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
// Now add a new write intent.
@@ -1005,9 +962,6 @@ public class StorageCleanupTest extends BaseMvStoragesTest
{
assertThrows(AssertionError.class, () ->
storageUpdateHandler.handleUpdate(runningTx, rowId,
partitionId, row3, true, null, null, lastCommitTs, null));
-
- assertEquals(1, storage.rowsCount());
-
assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
verify(storage, never()).commitWrite(any(), any());
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
index 68427fae5e..e14c1c9350 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandlerTest.java
@@ -61,7 +61,7 @@ import org.junit.jupiter.api.Test;
public class StorageUpdateHandlerTest extends BaseMvStoragesTest {
private static final HybridClock CLOCK = new HybridClockImpl();
- protected static final int PARTITION_ID = 0;
+ private static final int PARTITION_ID = 0;
private static final BinaryTupleSchema TUPLE_SCHEMA =
BinaryTupleSchema.createRowSchema(SCHEMA_DESCRIPTOR);
@@ -78,12 +78,8 @@ public class StorageUpdateHandlerTest extends
BaseMvStoragesTest {
private static final ColumnsExtractor USER_INDEX_BINARY_TUPLE_CONVERTER =
new BinaryRowConverter(TUPLE_SCHEMA, USER_INDEX_SCHEMA);
- private TestHashIndexStorage pkInnerStorage;
- private TestSortedIndexStorage sortedInnerStorage;
- private TestHashIndexStorage hashInnerStorage;
private TestMvPartitionStorage storage;
private StorageUpdateHandler storageUpdateHandler;
- private IndexUpdateHandler indexUpdateHandler;
private LockByRowId lock;
@InjectConfiguration
@@ -96,7 +92,7 @@ public class StorageUpdateHandlerTest extends
BaseMvStoragesTest {
int sortedIndexId = 3;
int hashIndexId = 4;
- pkInnerStorage = new TestHashIndexStorage(
+ var pkInnerStorage = new TestHashIndexStorage(
PARTITION_ID,
new StorageHashIndexDescriptor(
pkIndexId,
@@ -108,13 +104,13 @@ public class StorageUpdateHandlerTest extends
BaseMvStoragesTest {
)
);
- TableSchemaAwareIndexStorage pkStorage = new
TableSchemaAwareIndexStorage(
+ var pkStorage = new TableSchemaAwareIndexStorage(
pkIndexId,
pkInnerStorage,
PK_INDEX_BINARY_TUPLE_CONVERTER
);
- sortedInnerStorage = new TestSortedIndexStorage(
+ var sortedInnerStorage = new TestSortedIndexStorage(
PARTITION_ID,
new StorageSortedIndexDescriptor(
sortedIndexId,
@@ -126,13 +122,13 @@ public class StorageUpdateHandlerTest extends
BaseMvStoragesTest {
)
);
- TableSchemaAwareIndexStorage sortedIndexStorage = new
TableSchemaAwareIndexStorage(
+ var sortedIndexStorage = new TableSchemaAwareIndexStorage(
sortedIndexId,
sortedInnerStorage,
USER_INDEX_BINARY_TUPLE_CONVERTER
);
- hashInnerStorage = new TestHashIndexStorage(
+ var hashInnerStorage = new TestHashIndexStorage(
PARTITION_ID,
new StorageHashIndexDescriptor(
hashIndexId,
@@ -144,7 +140,7 @@ public class StorageUpdateHandlerTest extends
BaseMvStoragesTest {
)
);
- TableSchemaAwareIndexStorage hashIndexStorage = new
TableSchemaAwareIndexStorage(
+ var hashIndexStorage = new TableSchemaAwareIndexStorage(
hashIndexId,
hashInnerStorage,
USER_INDEX_BINARY_TUPLE_CONVERTER
@@ -159,9 +155,9 @@ public class StorageUpdateHandlerTest extends
BaseMvStoragesTest {
hashIndexId, hashIndexStorage
);
- TestPartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(tableId, PARTITION_ID, storage);
+ var partitionDataStorage = new TestPartitionDataStorage(tableId,
PARTITION_ID, storage);
- indexUpdateHandler = spy(new
IndexUpdateHandler(DummyInternalTableImpl.createTableIndexStoragesSupplier(indexes)));
+ var indexUpdateHandler = new
IndexUpdateHandler(DummyInternalTableImpl.createTableIndexStoragesSupplier(indexes));
storageUpdateHandler = new StorageUpdateHandler(
PARTITION_ID,
@@ -201,8 +197,6 @@ public class StorageUpdateHandlerTest extends
BaseMvStoragesTest {
storageUpdateHandler.handleUpdateAll(txUuid, rowsToUpdate,
partitionId, true, null, null, null);
- assertEquals(3, storage.rowsCount());
-
// We have three writes to the storage.
verify(storage, times(3)).addWrite(any(), any(), any(), anyInt(),
anyInt());
@@ -212,8 +206,6 @@ public class StorageUpdateHandlerTest extends
BaseMvStoragesTest {
storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs, null);
- assertEquals(3, storage.rowsCount());
-
// Those writes resulted in three commits.
verify(storage, times(3)).commitWrite(any(), any());