This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 315d95369da IGNITE-24931 Move schema aware related tests from PartitionReplicaListenerTest to ZonePartitionReplicaListenerTest (#5597) 315d95369da is described below commit 315d95369da264fbfcf49b47f26d1c58f5510115 Author: Alexander Lapin <lapin1...@gmail.com> AuthorDate: Thu Apr 10 11:26:34 2025 +0300 IGNITE-24931 Move schema aware related tests from PartitionReplicaListenerTest to ZonePartitionReplicaListenerTest (#5597) --- .../reset/ItResetPartitionsCommandTest.java | 6 - .../partitions/reset/ItResetPartitionsTest.java | 4 + ...asterRecoveryControllerResetPartitionsTest.java | 4 + .../replication/PartitionReplicaListenerTest.java | 24 + .../ZonePartitionReplicaListenerTest.java | 1410 ++++++++++++++++++++ .../tx/distributed/ItTransactionRecoveryTest.java | 82 +- 6 files changed, 1505 insertions(+), 25 deletions(-) diff --git a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/reset/ItResetPartitionsCommandTest.java b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/reset/ItResetPartitionsCommandTest.java index b9471cc61e9..d3f526dd1bd 100644 --- a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/reset/ItResetPartitionsCommandTest.java +++ b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/reset/ItResetPartitionsCommandTest.java @@ -17,13 +17,7 @@ package org.apache.ignite.internal.cli.commands.recovery.partitions.reset; -import static org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG; - -import org.apache.ignite.internal.testframework.WithSystemProperty; - /** Test class for {@link ResetPartitionsCommand}. */ -// TODO https://issues.apache.org/jira/browse/IGNITE-24332 -@WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false") public class ItResetPartitionsCommandTest extends ItResetPartitionsTest { @Override protected Class<?> getCommandClass() { diff --git a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/reset/ItResetPartitionsTest.java b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/reset/ItResetPartitionsTest.java index f4212f45810..2608247f23d 100644 --- a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/reset/ItResetPartitionsTest.java +++ b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/recovery/partitions/reset/ItResetPartitionsTest.java @@ -22,13 +22,17 @@ import static org.apache.ignite.internal.cli.commands.Options.Constants.CLUSTER_ import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_PARTITION_IDS_OPTION; import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_TABLE_NAME_OPTION; import static org.apache.ignite.internal.cli.commands.Options.Constants.RECOVERY_ZONE_NAME_OPTION; +import static org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG; import static org.apache.ignite.lang.util.IgniteNameUtils.canonicalName; import org.apache.ignite.internal.cli.CliIntegrationTest; +import org.apache.ignite.internal.testframework.WithSystemProperty; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; /** Base test class for Cluster Recovery reset partitions commands. */ +// TODO https://issues.apache.org/jira/browse/IGNITE-24332 +@WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false") public abstract class ItResetPartitionsTest extends CliIntegrationTest { private static final String ZONE = "first_ZONE"; diff --git a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerResetPartitionsTest.java b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerResetPartitionsTest.java index 2262a902f83..f5b156fb9e1 100644 --- a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerResetPartitionsTest.java +++ b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/recovery/ItDisasterRecoveryControllerResetPartitionsTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.rest.recovery; import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME; +import static org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG; import static org.apache.ignite.internal.rest.constants.HttpCode.OK; import static org.apache.ignite.internal.rest.recovery.ItDisasterRecoveryControllerTest.RESET_PARTITIONS_ENDPOINT; import static org.apache.ignite.lang.util.IgniteNameUtils.canonicalName; @@ -35,11 +36,14 @@ import java.util.Set; import org.apache.ignite.internal.ClusterConfiguration; import org.apache.ignite.internal.ClusterPerClassIntegrationTest; import org.apache.ignite.internal.rest.api.recovery.ResetPartitionsRequest; +import org.apache.ignite.internal.testframework.WithSystemProperty; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; /** Test for disaster recovery reset partitions command, positive cases. */ @MicronautTest +// TODO https://issues.apache.org/jira/browse/IGNITE-24332 +@WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false") public class ItDisasterRecoveryControllerResetPartitionsTest extends ClusterPerClassIntegrationTest { private static final String NODE_URL = "http://localhost:" + ClusterConfiguration.DEFAULT_BASE_HTTP_PORT; diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java index 35eefd31a95..b22c17feb72 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java @@ -2552,6 +2552,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { @CartesianTest @CartesianTest.MethodFactory("singleRowRwOperationTypesFactory") + // TODO: IGNITE-22522 Remove the test. There's a counterpart in ZonePartitionReplicationListenerTest. + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false") void singleRowRwOperationsFailIfTableWasDropped(RequestType requestType, boolean onExistingRow, boolean full) { RwListenerInvocation invocation = null; @@ -2602,6 +2604,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { @CartesianTest @CartesianTest.MethodFactory("multiRowRwOperationTypesFactory") + // TODO: IGNITE-22522 Remove the test. There's a counterpart in ZonePartitionReplicationListenerTest. + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false") void multiRowRwOperationsFailIfTableWasDropped(RequestType requestType, boolean onExistingRow, boolean full) { RwListenerInvocation invocation = null; @@ -2619,6 +2623,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } @CartesianTest + // TODO: IGNITE-22522 Remove the test. There's a counterpart in ZonePartitionReplicationListenerTest. + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false") void replaceRequestFailsIfTableWasDropped( @Values(booleans = {false, true}) boolean onExistingRow, @Values(booleans = {false, true}) boolean full @@ -2634,6 +2640,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } @CartesianTest + // TODO: IGNITE-22522 Remove the test. There's a counterpart in ZonePartitionReplicationListenerTest. + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false") void rwScanRequestFailsIfTableWasDropped(@Values(booleans = {false, true}) boolean onExistingRow) { testRwOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, key) -> { return doRwScanRetrieveBatchRequest(targetTxId); @@ -2653,6 +2661,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } @CartesianTest + // TODO: IGNITE-22522 Remove the test. There's a counterpart in ZonePartitionReplicationListenerTest. + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false") void singleRowRoGetFailsIfTableWasDropped( @Values(booleans = {false, true}) boolean direct, @Values(booleans = {false, true}) boolean onExistingRow @@ -2686,6 +2696,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } @CartesianTest + // TODO: IGNITE-22522 Remove the test. There's a counterpart in ZonePartitionReplicationListenerTest. + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false") void multiRowRoGetFailsIfTableWasDropped( @Values(booleans = {false, true}) boolean direct, @Values(booleans = {false, true}) boolean onExistingRow @@ -2700,6 +2712,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } @CartesianTest + // TODO: IGNITE-22522 Remove the test. There's a counterpart in ZonePartitionReplicationListenerTest. + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false") void roScanRequestFailsIfTableWasDropped(@Values(booleans = {false, true}) boolean onExistingRow) { testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, readTimestamp, key) -> { return doRoScanRetrieveBatchRequest(targetTxId, readTimestamp); @@ -2771,6 +2785,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { @CartesianTest @CartesianTest.MethodFactory("singleRowRwOperationTypesFactory") + // TODO: IGNITE-22522 Remove the test. There's a counterpart in ZonePartitionReplicationListenerTest. + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false") void singleRowRwOperationsFailIfSchemaVersionMismatchesTx(RequestType requestType, boolean onExistingRow, boolean full) { RwListenerInvocation invocation = null; @@ -2811,6 +2827,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { @CartesianTest @CartesianTest.MethodFactory("multiRowRwOperationTypesFactory") + // TODO: IGNITE-22522 Remove the test. There's a counterpart in ZonePartitionReplicationListenerTest. + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false") void multiRowRwOperationsFailIfSchemaVersionMismatchesTx(RequestType requestType, boolean onExistingRow, boolean full) { RwListenerInvocation invocation = null; @@ -2828,6 +2846,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } @CartesianTest + // TODO: IGNITE-22522 Remove the test. There's a counterpart in ZonePartitionReplicationListenerTest. + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false") void replaceRequestFailsIfSchemaVersionMismatchesTx( @Values(booleans = {false, true}) boolean onExistingRow, @Values(booleans = {false, true}) boolean full @@ -2843,6 +2863,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } @CartesianTest + // TODO: IGNITE-22522 Remove the test. There's a counterpart in ZonePartitionReplicationListenerTest. + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false") void singleRowRoGetFailsIfSchemaVersionMismatchesTx( @Values(booleans = {false, true}) boolean direct, @Values(booleans = {false, true}) boolean onExistingRow @@ -2874,6 +2896,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest { } @CartesianTest + // TODO: IGNITE-22522 Remove the test. There's a counterpart in ZonePartitionReplicationListenerTest. + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "false") void multiRowRoGetFailsIfSchemaVersionMismatchesTx( @Values(booleans = {false, true}) boolean direct, @Values(booleans = {false, true}) boolean onExistingRow diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java new file mode 100644 index 00000000000..cb4d6a8b568 --- /dev/null +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java @@ -0,0 +1,1410 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.replication; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; +import static org.apache.ignite.internal.lang.IgniteSystemProperties.enabledColocation; +import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RO_GET; +import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RO_GET_ALL; +import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_REPLACE; +import static org.apache.ignite.internal.partition.replicator.network.replication.RequestType.RW_UPSERT; +import static org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage; +import static org.apache.ignite.internal.testframework.asserts.CompletableFutureAssert.assertWillThrowFast; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; +import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp; +import static org.apache.ignite.internal.tx.TxState.ABORTED; +import static org.apache.ignite.internal.tx.TxState.COMMITTED; +import static org.apache.ignite.internal.tx.TxState.FINISHING; +import static org.apache.ignite.internal.tx.TxState.checkTransitionCorrectness; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.AdditionalMatchers.gt; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Stream; +import org.apache.ignite.distributed.TestPartitionDataStorage; +import org.apache.ignite.distributed.replicator.action.RequestTypes; +import org.apache.ignite.internal.catalog.Catalog; +import org.apache.ignite.internal.catalog.CatalogService; +import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor; +import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; +import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; +import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.failure.NoOpFailureManager; +import org.apache.ignite.internal.hlc.ClockService; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; +import org.apache.ignite.internal.hlc.HybridTimestamp; +import org.apache.ignite.internal.hlc.TestClockService; +import org.apache.ignite.internal.lowwatermark.LowWatermark; +import org.apache.ignite.internal.network.ClusterNodeImpl; +import org.apache.ignite.internal.network.ClusterNodeResolver; +import org.apache.ignite.internal.network.MessagingService; +import org.apache.ignite.internal.network.SingleClusterNodeResolver; +import org.apache.ignite.internal.network.TopologyService; +import org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener; +import org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory; +import org.apache.ignite.internal.partition.replicator.network.command.FinishTxCommand; +import org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand; +import org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand; +import org.apache.ignite.internal.partition.replicator.network.command.WriteIntentSwitchCommand; +import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectMultiRowReplicaRequest; +import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyDirectSingleRowReplicaRequest; +import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyMultiRowPkReplicaRequest; +import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlyScanRetrieveBatchReplicaRequest; +import org.apache.ignite.internal.partition.replicator.network.replication.ReadOnlySingleRowPkReplicaRequest; +import org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSingleRowReplicaRequest; +import org.apache.ignite.internal.partition.replicator.network.replication.RequestType; +import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage; +import org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource; +import org.apache.ignite.internal.partition.replicator.schemacompat.IncompatibleSchemaVersionException; +import org.apache.ignite.internal.partition.replicator.schemacompat.InternalSchemaVersionMismatchException; +import org.apache.ignite.internal.placementdriver.PlacementDriver; +import org.apache.ignite.internal.placementdriver.TestPlacementDriver; +import org.apache.ignite.internal.raft.Command; +import org.apache.ignite.internal.raft.Peer; +import org.apache.ignite.internal.raft.service.LeaderWithTerm; +import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.replicator.ReplicaResult; +import org.apache.ignite.internal.replicator.ReplicaService; +import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.ZonePartitionId; +import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration; +import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory; +import org.apache.ignite.internal.replicator.message.ReplicaRequest; +import org.apache.ignite.internal.replicator.message.TablePartitionIdMessage; +import org.apache.ignite.internal.schema.AlwaysSyncedSchemaSyncService; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.BinaryRowConverter; +import org.apache.ignite.internal.schema.Column; +import org.apache.ignite.internal.schema.ColumnsExtractor; +import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.schema.SchemaSyncService; +import org.apache.ignite.internal.schema.marshaller.KvMarshaller; +import org.apache.ignite.internal.schema.marshaller.MarshallerFactory; +import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory; +import org.apache.ignite.internal.schema.row.Row; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.storage.TestStorageUtils; +import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage; +import org.apache.ignite.internal.storage.index.HashIndexStorage; +import org.apache.ignite.internal.storage.index.IndexStorage; +import org.apache.ignite.internal.storage.index.SortedIndexStorage; +import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor; +import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.StorageHashIndexColumnDescriptor; +import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor; +import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor.StorageSortedIndexColumnDescriptor; +import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage; +import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage; +import org.apache.ignite.internal.table.distributed.HashIndexLocker; +import org.apache.ignite.internal.table.distributed.IndexLocker; +import org.apache.ignite.internal.table.distributed.SortedIndexLocker; +import org.apache.ignite.internal.table.distributed.StorageUpdateHandler; +import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage; +import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage; +import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler; +import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener; +import org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver; +import org.apache.ignite.internal.table.impl.DummyInternalTableImpl; +import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.tostring.IgniteToStringInclude; +import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.internal.tx.LockManager; +import org.apache.ignite.internal.tx.TransactionMeta; +import org.apache.ignite.internal.tx.TransactionResult; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.internal.tx.TxState; +import org.apache.ignite.internal.tx.TxStateMeta; +import org.apache.ignite.internal.tx.UpdateCommandResult; +import org.apache.ignite.internal.tx.impl.HeapLockManager; +import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry; +import org.apache.ignite.internal.tx.impl.TxMessageSender; +import org.apache.ignite.internal.tx.impl.WaitDieDeadlockPreventionPolicy; +import org.apache.ignite.internal.tx.message.TransactionMetaMessage; +import org.apache.ignite.internal.tx.message.TxMessagesFactory; +import org.apache.ignite.internal.tx.message.TxStateCoordinatorRequest; +import org.apache.ignite.internal.tx.message.TxStateResponse; +import org.apache.ignite.internal.tx.storage.state.test.TestTxStatePartitionStorage; +import org.apache.ignite.internal.tx.test.TestTransactionIds; +import org.apache.ignite.internal.type.NativeTypes; +import org.apache.ignite.internal.util.Lazy; +import org.apache.ignite.internal.util.PendingComparableValuesTracker; +import org.apache.ignite.lang.ErrorGroups.Transactions; +import org.apache.ignite.network.ClusterNode; +import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.sql.ColumnType; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junitpioneer.jupiter.cartesian.ArgumentSets; +import org.junitpioneer.jupiter.cartesian.CartesianTest; +import org.junitpioneer.jupiter.cartesian.CartesianTest.Values; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +/** Tests for zone partition replica listener. */ +@ExtendWith(MockitoExtension.class) +@ExtendWith(ConfigurationExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public class ZonePartitionReplicaListenerTest extends IgniteAbstractTest { + private static final int PART_ID = 0; + + private static final int CURRENT_SCHEMA_VERSION = 1; + + private static final int NEXT_SCHEMA_VERSION = 2; + + private static final int TABLE_ID = 1; + + private static final TablePartitionId commitPartitionId = new TablePartitionId(TABLE_ID, PART_ID); + + private static final long ANY_ENLISTMENT_CONSISTENCY_TOKEN = 1L; + private static final String TABLE_NAME = "test"; + private static final String TABLE_NAME_2 = "second_test"; + + private final Map<UUID, Set<RowId>> pendingRows = new ConcurrentHashMap<>(); + + /** The storage stores partition data. */ + private final TestMvPartitionStorage testMvPartitionStorage = spy(new TestMvPartitionStorage(PART_ID)); + + private final LockManager lockManager = lockManager(); + + private final Function<Command, CompletableFuture<?>> defaultMockRaftFutureClosure = cmd -> { + if (cmd instanceof WriteIntentSwitchCommand) { + WriteIntentSwitchCommand switchCommand = (WriteIntentSwitchCommand) cmd; + UUID txId = switchCommand.txId(); + + Set<RowId> rows = pendingRows.remove(txId); + + HybridTimestamp commitTimestamp = switchCommand.commitTimestamp(); + if (switchCommand.commit()) { + assertNotNull(commitTimestamp); + } + + if (rows != null) { + for (RowId row : rows) { + testMvPartitionStorage.commitWrite(row, commitTimestamp); + } + } + + lockManager.releaseAll(txId); + } else if (cmd instanceof UpdateCommand) { + UUID txId = ((UpdateCommand) cmd).txId(); + + pendingRows.compute(txId, (txId0, v) -> { + if (v == null) { + v = new HashSet<>(); + } + + RowId rowId = new RowId(PART_ID, ((UpdateCommand) cmd).rowUuid()); + v.add(rowId); + + return v; + }); + + return completedFuture(new UpdateCommandResult(true, true, 100)); + } else if (cmd instanceof UpdateAllCommand) { + return completedFuture(new UpdateCommandResult(true, true, 100)); + } else if (cmd instanceof FinishTxCommand) { + FinishTxCommand command = (FinishTxCommand) cmd; + + return completedFuture(new TransactionResult(command.commit() ? COMMITTED : ABORTED, command.commitTimestamp())); + } + + return nullCompletedFuture(); + }; + + /** Tx messages factory. */ + private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory(); + + /** Table messages factory. */ + private static final PartitionReplicationMessagesFactory TABLE_MESSAGES_FACTORY = new PartitionReplicationMessagesFactory(); + + /** Replica messages factory. */ + private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory(); + + /** Partition group id. */ + private final TablePartitionId grpId = new TablePartitionId(TABLE_ID, PART_ID); + + /** Hybrid clock. */ + private final HybridClock clock = new HybridClockImpl(); + + private final ClockService clockService = new TestClockService(clock); + + /** The storage stores transaction states. */ + private final TestTxStatePartitionStorage txStateStorage = new TestTxStatePartitionStorage(); + + /** Local cluster node. */ + private final ClusterNode localNode = new ClusterNodeImpl(nodeId(1), "node1", NetworkAddress.from("127.0.0.1:127")); + + /** Another (not local) cluster node. */ + private final ClusterNode anotherNode = new ClusterNodeImpl(nodeId(2), "node2", NetworkAddress.from("127.0.0.2:127")); + + private TransactionStateResolver transactionStateResolver; + + private final PartitionDataStorage partitionDataStorage = new TestPartitionDataStorage(TABLE_ID, PART_ID, testMvPartitionStorage); + + @Mock + private RaftGroupService mockRaftClient; + + @Mock + private TxManager txManager; + + @Mock + private TopologyService topologySrv; + + @Mock + private PendingComparableValuesTracker<HybridTimestamp, Void> safeTimeClock; + + @Mock + private ValidationSchemasSource validationSchemasSource; + + @Spy + private final SchemaSyncService schemaSyncService = new AlwaysSyncedSchemaSyncService(); + + @Mock + private CatalogService catalogService; + + @Mock + private Catalog catalog; + + private final TestCatalogServiceEventProducer catalogServiceEventProducer = new TestCatalogServiceEventProducer(); + + @Mock + private MessagingService messagingService; + + @Mock + private LowWatermark lowWatermark; + + @InjectConfiguration + private ReplicationConfiguration replicationConfiguration; + + /** Schema descriptor for tests. */ + private SchemaDescriptor schemaDescriptor; + + /** Schema descriptor, version 2. */ + private SchemaDescriptor schemaDescriptorVersion2; + + /** Key-value marshaller for tests. */ + private KvMarshaller<TestKey, TestValue> kvMarshaller; + + private final CatalogTableDescriptor tableDescriptor = new CatalogTableDescriptor( + TABLE_ID, 1, 2, TABLE_NAME, 1, + List.of( + new CatalogTableColumnDescriptor("intKey", ColumnType.INT32, false, 0, 0, 0, null), + new CatalogTableColumnDescriptor("strKey", ColumnType.STRING, false, 0, 0, 0, null), + new CatalogTableColumnDescriptor("intVal", ColumnType.INT32, false, 0, 0, 0, null), + new CatalogTableColumnDescriptor("strVal", ColumnType.STRING, false, 0, 0, 0, null) + ), + List.of("intKey", "strKey"), + null, + DEFAULT_STORAGE_PROFILE + ); + + /** Placement driver. */ + private TestPlacementDriver placementDriver; + + private ZonePartitionReplicaListener zonePartitionReplicaListener; + + /** Partition replication listener to test. */ + private PartitionReplicaListener partitionReplicaListener; + + private HashIndexStorage pkIndexStorage; + + /** Primary index. */ + private Lazy<TableSchemaAwareIndexStorage> pkStorageSupplier; + + /** If true the local replica is considered leader, false otherwise. */ + private boolean localLeader; + + /** Secondary sorted index. */ + private TableSchemaAwareIndexStorage sortedIndexStorage; + + /** Secondary hash index. */ + private TableSchemaAwareIndexStorage hashIndexStorage; + + private Function<Command, CompletableFuture<?>> raftClientFutureClosure = defaultMockRaftFutureClosure; + + private static final AtomicInteger nextMonotonicInt = new AtomicInteger(1); + + private final TestValue someValue = new TestValue(1, "v1"); + + @Mock + private IndexMetaStorage indexMetaStorage; + + private static UUID nodeId(int id) { + return new UUID(0, id); + } + + @BeforeEach + public void beforeTest() { + doAnswer(invocation -> { + catalogServiceEventProducer.listen(invocation.getArgument(0), invocation.getArgument(1)); + + return null; + }).when(catalogService).listen(any(), any()); + + doAnswer(invocation -> { + catalogServiceEventProducer.removeListener(invocation.getArgument(0), invocation.getArgument(1)); + + return null; + }).when(catalogService).removeListener(any(), any()); + + when(mockRaftClient.refreshAndGetLeaderWithTerm()).thenAnswer(invocationOnMock -> { + if (!localLeader) { + return completedFuture(new LeaderWithTerm(new Peer(anotherNode.name()), 1L)); + } + + return completedFuture(new LeaderWithTerm(new Peer(localNode.name()), 1L)); + }); + + when(mockRaftClient.run(any())) + .thenAnswer(invocationOnMock -> raftClientFutureClosure.apply(invocationOnMock.getArgument(0))); + + when(topologySrv.getByConsistentId(any())).thenAnswer(invocationOnMock -> { + String consistentId = invocationOnMock.getArgument(0); + if (consistentId.equals(anotherNode.name())) { + return anotherNode; + } else if (consistentId.equals(localNode.name())) { + return localNode; + } else { + return null; + } + }); + + when(topologySrv.localMember()).thenReturn(localNode); + + when(safeTimeClock.waitFor(any())).thenReturn(nullCompletedFuture()); + when(safeTimeClock.current()).thenReturn(HybridTimestamp.MIN_VALUE); + + when(validationSchemasSource.waitForSchemaAvailability(anyInt(), anyInt())).thenReturn(nullCompletedFuture()); + + lenient().when(catalogService.catalog(anyInt())).thenReturn(catalog); + lenient().when(catalogService.activeCatalog(anyLong())).thenReturn(catalog); + + lenient().when(catalog.table(anyInt())).thenReturn(tableDescriptor); + lenient().when(catalog.table(anyInt())).thenReturn(tableDescriptor); + + int pkIndexId = 1; + int sortedIndexId = 2; + int hashIndexId = 3; + + schemaDescriptor = schemaDescriptorWith(CURRENT_SCHEMA_VERSION); + schemaDescriptorVersion2 = schemaDescriptorWith(NEXT_SCHEMA_VERSION); + + ColumnsExtractor row2Tuple = BinaryRowConverter.keyExtractor(schemaDescriptor); + + pkIndexStorage = spy(new TestHashIndexStorage( + PART_ID, + new StorageHashIndexDescriptor(pkIndexId, List.of(), false) + )); + pkStorageSupplier = new Lazy<>(() -> new TableSchemaAwareIndexStorage(pkIndexId, pkIndexStorage, row2Tuple)); + + SortedIndexStorage indexStorage = new TestSortedIndexStorage( + PART_ID, + new StorageSortedIndexDescriptor( + sortedIndexId, + List.of(new StorageSortedIndexColumnDescriptor("intVal", NativeTypes.INT32, false, true)), + false + ) + ); + + // 2 is the index of "intVal" in the list of all columns. + ColumnsExtractor columnsExtractor = BinaryRowConverter.columnsExtractor(schemaDescriptor, 2); + + sortedIndexStorage = new TableSchemaAwareIndexStorage(sortedIndexId, indexStorage, columnsExtractor); + + hashIndexStorage = new TableSchemaAwareIndexStorage( + hashIndexId, + new TestHashIndexStorage( + PART_ID, + new StorageHashIndexDescriptor( + hashIndexId, + List.of(new StorageHashIndexColumnDescriptor("intVal", NativeTypes.INT32, false)), + false + ) + ), + columnsExtractor + ); + + completeBuiltIndexes(sortedIndexStorage.storage(), hashIndexStorage.storage()); + + IndexLocker pkLocker = new HashIndexLocker(pkIndexId, true, lockManager, row2Tuple); + IndexLocker sortedIndexLocker = new SortedIndexLocker(sortedIndexId, PART_ID, lockManager, indexStorage, row2Tuple, false); + IndexLocker hashIndexLocker = new HashIndexLocker(hashIndexId, false, lockManager, row2Tuple); + + IndexUpdateHandler indexUpdateHandler = new IndexUpdateHandler( + DummyInternalTableImpl.createTableIndexStoragesSupplier(Map.of(pkStorage().id(), pkStorage())) + ); + + CatalogIndexDescriptor indexDescriptor = mock(CatalogIndexDescriptor.class); + when(indexDescriptor.id()).thenReturn(pkIndexId); + + when(catalog.indexes(anyInt())).thenReturn(List.of(indexDescriptor)); + + configureTxManager(txManager); + + doAnswer(invocation -> { + Object argument = invocation.getArgument(1); + + if (argument instanceof TxStateCoordinatorRequest) { + TxStateCoordinatorRequest req = (TxStateCoordinatorRequest) argument; + + return completedFuture(toTxStateResponse(txManager.stateMeta(req.txId()))); + } + + return failedFuture(new Exception("Test exception")); + }).when(messagingService).invoke(any(ClusterNode.class), any(), anyLong()); + + doAnswer(invocation -> { + Object argument = invocation.getArgument(1); + + if (argument instanceof TxStateCoordinatorRequest) { + TxStateCoordinatorRequest req = (TxStateCoordinatorRequest) argument; + + return completedFuture(toTxStateResponse(txManager.stateMeta(req.txId()))); + } + + return failedFuture(new Exception("Test exception")); + }).when(messagingService).invoke(anyString(), any(), anyLong()); + + ClusterNodeResolver clusterNodeResolver = new ClusterNodeResolver() { + @Override + public ClusterNode getById(UUID id) { + return id.equals(localNode.id()) ? localNode : anotherNode; + } + + @Override + public ClusterNode getByConsistentId(String consistentId) { + return consistentId.equals(localNode.name()) ? localNode : anotherNode; + } + }; + + transactionStateResolver = new TransactionStateResolver( + txManager, + clockService, + clusterNodeResolver, + messagingService, + mock(PlacementDriver.class), + new TxMessageSender( + messagingService, + mock(ReplicaService.class), + clockService + ) + ); + + transactionStateResolver.start(); + + placementDriver = spy(new TestPlacementDriver(localNode)); + + zonePartitionReplicaListener = new ZonePartitionReplicaListener( + txStateStorage, + clockService, + txManager, + validationSchemasSource, + schemaSyncService, + catalogService, + placementDriver, + clusterNodeResolver, + mockRaftClient, + new NoOpFailureManager(), + localNode, + new ZonePartitionId(tableDescriptor.zoneId(), PART_ID) + ); + + partitionReplicaListener = new PartitionReplicaListener( + testMvPartitionStorage, + mockRaftClient, + txManager, + lockManager, + Runnable::run, + enabledColocation() ? new ZonePartitionId(tableDescriptor.zoneId(), PART_ID) : new TablePartitionId(TABLE_ID, PART_ID), + TABLE_ID, + () -> Map.of(pkLocker.id(), pkLocker, sortedIndexId, sortedIndexLocker, hashIndexId, hashIndexLocker), + pkStorageSupplier, + () -> Map.of(sortedIndexId, sortedIndexStorage, hashIndexId, hashIndexStorage), + clockService, + safeTimeClock, + txStateStorage, + transactionStateResolver, + new StorageUpdateHandler( + PART_ID, + partitionDataStorage, + indexUpdateHandler, + replicationConfiguration + ), + validationSchemasSource, + localNode, + schemaSyncService, + catalogService, + placementDriver, + new SingleClusterNodeResolver(localNode), + new RemotelyTriggeredResourceRegistry(), + new DummySchemaManagerImpl(schemaDescriptor, schemaDescriptorVersion2), + indexMetaStorage, + lowWatermark, + new NoOpFailureManager() + ); + + kvMarshaller = marshallerFor(schemaDescriptor); + + when(lowWatermark.tryLock(any(), any())).thenReturn(true); + + reset(); + } + + @AfterEach + public void clearMocks() { + Mockito.framework().clearInlineMocks(); + } + + private static LockManager lockManager() { + HeapLockManager lockManager = HeapLockManager.smallInstance(); + lockManager.start(new WaitDieDeadlockPreventionPolicy()); + return lockManager; + } + + private static SchemaDescriptor schemaDescriptorWith(int ver) { + return new SchemaDescriptor(ver, new Column[]{ + new Column("intKey".toUpperCase(Locale.ROOT), NativeTypes.INT32, false), + new Column("strKey".toUpperCase(Locale.ROOT), NativeTypes.STRING, false), + }, new Column[]{ + new Column("intVal".toUpperCase(Locale.ROOT), NativeTypes.INT32, false), + new Column("strVal".toUpperCase(Locale.ROOT), NativeTypes.STRING, false), + }); + } + + private static KvMarshaller<TestKey, TestValue> marshallerFor(SchemaDescriptor descriptor) { + MarshallerFactory marshallerFactory = new ReflectionMarshallerFactory(); + + return marshallerFactory.create(descriptor, TestKey.class, TestValue.class); + } + + private TableSchemaAwareIndexStorage pkStorage() { + return Objects.requireNonNull(pkStorageSupplier.get()); + } + + private void reset() { + localLeader = true; + ((TestHashIndexStorage) pkStorage().storage()).clear(); + ((TestHashIndexStorage) hashIndexStorage.storage()).clear(); + ((TestSortedIndexStorage) sortedIndexStorage.storage()).clear(); + testMvPartitionStorage.clear(); + pendingRows.clear(); + + completeBuiltIndexes(hashIndexStorage.storage(), sortedIndexStorage.storage()); + } + + private CompletableFuture<ReplicaResult> doReadOnlySingleGet(BinaryRow pk, HybridTimestamp readTimestamp) { + ReadOnlySingleRowPkReplicaRequest request = readOnlySingleRowPkReplicaRequest(pk, readTimestamp); + + return invokeListener(request); + } + + private CompletableFuture<ReplicaResult> invokeListener(ReplicaRequest request) { + return zonePartitionReplicaListener.invoke(request, localNode.id()); + } + + private ReadOnlySingleRowPkReplicaRequest readOnlySingleRowPkReplicaRequest(BinaryRow pk, HybridTimestamp readTimestamp) { + return readOnlySingleRowPkReplicaRequest(grpId, newTxId(), localNode.id(), pk, readTimestamp); + } + + private static ReadOnlySingleRowPkReplicaRequest readOnlySingleRowPkReplicaRequest( + TablePartitionId grpId, + UUID txId, + UUID coordinatorId, + BinaryRow pk, + HybridTimestamp readTimestamp + ) { + return TABLE_MESSAGES_FACTORY.readOnlySingleRowPkReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableId(TABLE_ID) + .readTimestamp(readTimestamp) + .schemaVersion(pk.schemaVersion()) + .primaryKey(pk.tupleSlice()) + .transactionId(txId) + .coordinatorId(coordinatorId) + .requestType(RO_GET) + .build(); + } + + private CompletableFuture<ReplicaResult> doReadOnlyDirectSingleGet(BinaryRow pk) { + ReadOnlyDirectSingleRowReplicaRequest request = readOnlyDirectSingleRowReplicaRequest(grpId, pk); + + return invokeListener(request); + } + + private static ReadOnlyDirectSingleRowReplicaRequest readOnlyDirectSingleRowReplicaRequest(TablePartitionId grpId, BinaryRow pk) { + return TABLE_MESSAGES_FACTORY.readOnlyDirectSingleRowReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableId(TABLE_ID) + .schemaVersion(pk.schemaVersion()) + .primaryKey(pk.tupleSlice()) + .requestType(RO_GET) + .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) + .build(); + } + + private static <K, V> Row marshalQuietly(K key, KvMarshaller<K, V> marshaller) { + return marshaller.marshal(key); + } + + private CompletableFuture<?> doSingleRowRequest(UUID txId, BinaryRow binaryRow, RequestType requestType, boolean full) { + return zonePartitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableId(TABLE_ID) + .transactionId(txId) + .requestType(requestType) + .schemaVersion(binaryRow.schemaVersion()) + .binaryTuple(binaryRow.tupleSlice()) + .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) + .commitPartitionId(commitPartitionId()) + .coordinatorId(localNode.id()) + .full(full) + .timestamp(clock.now()) + .build(), + localNode.id() + ); + } + + private CompletableFuture<?> doSingleRowPkRequest(UUID txId, BinaryRow binaryRow, RequestType requestType, boolean full) { + return zonePartitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowPkReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableId(TABLE_ID) + .transactionId(txId) + .requestType(requestType) + .schemaVersion(binaryRow.schemaVersion()) + .primaryKey(binaryRow.tupleSlice()) + .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) + .commitPartitionId(commitPartitionId()) + .coordinatorId(localNode.id()) + .full(full) + .timestamp(clock.now()) + .build(), + localNode.id() + ); + } + + private static TablePartitionIdMessage commitPartitionId() { + return REPLICA_MESSAGES_FACTORY.tablePartitionIdMessage() + .partitionId(PART_ID) + .tableId(TABLE_ID) + .build(); + } + + private static TablePartitionIdMessage tablePartitionIdMessage(TablePartitionId tablePartitionId) { + return toTablePartitionIdMessage(REPLICA_MESSAGES_FACTORY, tablePartitionId); + } + + private CompletableFuture<?> doMultiRowRequest(UUID txId, Collection<BinaryRow> binaryRows, RequestType requestType, boolean full) { + return zonePartitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableId(TABLE_ID) + .transactionId(txId) + .requestType(requestType) + .schemaVersion(binaryRows.iterator().next().schemaVersion()) + .binaryTuples(binaryRowsToBuffers(binaryRows)) + .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) + .commitPartitionId(commitPartitionId()) + .coordinatorId(localNode.id()) + .full(full) + .timestamp(clock.now()) + .build(), + localNode.id() + ); + } + + static List<ByteBuffer> binaryRowsToBuffers(Collection<BinaryRow> binaryRows) { + return binaryRows.stream().map(BinaryRow::tupleSlice).collect(toList()); + } + + private CompletableFuture<?> doMultiRowPkRequest(UUID txId, Collection<BinaryRow> binaryRows, RequestType requestType, boolean full) { + return zonePartitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowPkReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableId(TABLE_ID) + .transactionId(txId) + .requestType(requestType) + .schemaVersion(binaryRows.iterator().next().schemaVersion()) + .primaryKeys(binaryRowsToBuffers(binaryRows)) + .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) + .commitPartitionId(commitPartitionId()) + .coordinatorId(localNode.id()) + .full(full) + .timestamp(clock.now()) + .build(), + localNode.id() + ); + } + + private static UUID transactionIdFor(HybridTimestamp beginTimestamp) { + return TestTransactionIds.TRANSACTION_ID_GENERATOR.transactionIdFor(beginTimestamp); + } + + private BinaryRow marshalKeyOrKeyValue(RequestType requestType, TestKey key) { + return RequestTypes.isKeyOnly(requestType) ? marshalQuietly(key, kvMarshaller) : kvMarshaller.marshal(key, someValue); + } + + private CompletableFuture<?> doReplaceRequest(UUID targetTxId, BinaryRow oldRow, BinaryRow newRow, boolean full) { + return zonePartitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSwapRowReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableId(TABLE_ID) + .transactionId(targetTxId) + .requestType(RW_REPLACE) + .schemaVersion(oldRow.schemaVersion()) + .oldBinaryTuple(oldRow.tupleSlice()) + .newBinaryTuple(newRow.tupleSlice()) + .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) + .commitPartitionId(commitPartitionId()) + .coordinatorId(localNode.id()) + .full(full) + .timestamp(clock.now()) + .build(), + localNode.id() + ); + } + + private CompletableFuture<?> doRwScanRetrieveBatchRequest(UUID targetTxId) { + return zonePartitionReplicaListener.invoke( + TABLE_MESSAGES_FACTORY.readWriteScanRetrieveBatchReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableId(TABLE_ID) + .transactionId(targetTxId) + .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) + .scanId(1) + .batchSize(100) + .full(false) + .commitPartitionId(commitPartitionId()) + .coordinatorId(localNode.id()) + .timestamp(clock.now()) + .build(), + localNode.id() + ); + } + + private CompletableFuture<?> doRoScanRetrieveBatchRequest(UUID targetTxId, HybridTimestamp readTimestamp) { + return zonePartitionReplicaListener.invoke( + readOnlyScanRetrieveBatchReplicaRequest(grpId, targetTxId, readTimestamp, localNode.id()), + localNode.id() + ); + } + + private static ReadOnlyScanRetrieveBatchReplicaRequest readOnlyScanRetrieveBatchReplicaRequest( + TablePartitionId grpId, + UUID txId, + HybridTimestamp readTimestamp, + UUID coordinatorId + ) { + return TABLE_MESSAGES_FACTORY.readOnlyScanRetrieveBatchReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableId(TABLE_ID) + .transactionId(txId) + .scanId(1) + .batchSize(100) + .readTimestamp(readTimestamp) + .coordinatorId(coordinatorId) + .build(); + } + + private static void configureTxManager(TxManager txManager) { + ConcurrentHashMap<UUID, TxStateMeta> txStateMap = new ConcurrentHashMap<>(); + + doAnswer(invocation -> txStateMap.get(invocation.getArgument(0))) + .when(txManager).stateMeta(any()); + + doAnswer(invocation -> { + UUID txId = invocation.getArgument(0); + Function<TxStateMeta, TxStateMeta> updater = invocation.getArgument(1); + txStateMap.compute(txId, (k, oldMeta) -> { + TxStateMeta newMeta = updater.apply(oldMeta); + + if (newMeta == null) { + return null; + } + + TxState oldState = oldMeta == null ? null : oldMeta.txState(); + + return checkTransitionCorrectness(oldState, newMeta.txState()) ? newMeta : oldMeta; + }); + return null; + }).when(txManager).updateTxMeta(any(), any()); + + doAnswer(invocation -> nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class)); + + doAnswer(invocation -> nullCompletedFuture()) + .when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(), any(), any()); + doAnswer(invocation -> nullCompletedFuture()) + .when(txManager).cleanup(any(), anyString(), any()); + } + + private void upsertInNewTxFor(TestKey key) { + UUID tx0 = newTxId(); + upsert(tx0, binaryRow(key, someValue)); + cleanup(tx0); + } + + @SuppressWarnings("unused") + private static ArgumentSets singleRowRwOperationTypesFactory() { + return ArgumentSets.argumentsForFirstParameter(singleRowRwOperationTypes()) + .argumentsForNextParameter(false, true) + .argumentsForNextParameter(false, true); + } + + @SuppressWarnings("unused") + private static ArgumentSets finishedTxTypesFactory() { + return ArgumentSets.argumentsForFirstParameter(FINISHING, ABORTED, COMMITTED) + .argumentsForNextParameter(singleRowRwOperationTypes()); + } + + private static Stream<RequestType> singleRowRwOperationTypes() { + return Arrays.stream(RequestType.values()) + .filter(RequestTypes::isSingleRowRw); + } + + @SuppressWarnings("unused") + private static ArgumentSets multiRowRwOperationTypesFactory() { + return ArgumentSets.argumentsForFirstParameter(multiRowRwOperationTypes()) + .argumentsForNextParameter(false, true) + .argumentsForNextParameter(false, true); + } + + private static Stream<RequestType> multiRowRwOperationTypes() { + return Arrays.stream(RequestType.values()) + .filter(RequestTypes::isMultipleRowsRw); + } + + @CartesianTest + @CartesianTest.MethodFactory("singleRowRwOperationTypesFactory") + void singleRowRwOperationsFailIfTableWasDropped(RequestType requestType, boolean onExistingRow, boolean full) { + RwListenerInvocation invocation = null; + + if (RequestTypes.isSingleRowRwPkOnly(requestType)) { + invocation = (targetTxId, key) -> doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full); + } else if (RequestTypes.isSingleRowRwFullRow(requestType)) { + invocation = (targetTxId, key) -> doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full); + } else { + fail("Uncovered type: " + requestType); + } + + testRwOperationFailsIfTableWasDropped(onExistingRow, invocation); + } + + private void testRwOperationFailsIfTableWasDropped(boolean onExistingRow, RwListenerInvocation listenerInvocation) { + TestKey key = nextKey(); + + if (onExistingRow) { + upsertInNewTxFor(key); + } + + UUID txId = newTxId(); + HybridTimestamp txBeginTs = beginTimestamp(txId); + + makeTableBeDroppedAfter(txBeginTs); + + CompletableFuture<?> future = listenerInvocation.invoke(txId, key); + + IncompatibleSchemaVersionException ex = assertWillThrowFast(future, IncompatibleSchemaVersionException.class); + assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR)); + assertThat(ex.getMessage(), is("Table was dropped [tableId=1]")); + } + + private void makeTableBeDroppedAfter(HybridTimestamp txBeginTs) { + makeTableBeDroppedAfter(txBeginTs, TABLE_ID); + } + + private void makeTableBeDroppedAfter(HybridTimestamp txBeginTs, int tableId) { + CatalogTableDescriptor tableVersion1 = mock(CatalogTableDescriptor.class); + when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION); + when(tableVersion1.name()).thenReturn(TABLE_NAME); + + when(catalog.table(tableId)).thenReturn(tableVersion1); + + when(catalogService.activeCatalog(txBeginTs.longValue())).thenReturn(catalog); + when(catalogService.activeCatalog(gt(txBeginTs.longValue()))).thenReturn(mock(Catalog.class)); + } + + @CartesianTest + @CartesianTest.MethodFactory("multiRowRwOperationTypesFactory") + void multiRowRwOperationsFailIfTableWasDropped(RequestType requestType, boolean onExistingRow, boolean full) { + RwListenerInvocation invocation = null; + + if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) { + invocation = (targetTxId, key) + -> doMultiRowPkRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full); + } else if (RequestTypes.isMultipleRowsRwFullRows(requestType)) { + invocation = (targetTxId, key) + -> doMultiRowRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full); + } else { + fail("Uncovered type: " + requestType); + } + + testRwOperationFailsIfTableWasDropped(onExistingRow, invocation); + } + + @CartesianTest + void replaceRequestFailsIfTableWasDropped( + @Values(booleans = {false, true}) boolean onExistingRow, + @Values(booleans = {false, true}) boolean full + ) { + testRwOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, key) -> { + return doReplaceRequest( + targetTxId, + marshalKeyOrKeyValue(RW_REPLACE, key), + marshalKeyOrKeyValue(RW_REPLACE, key), + full + ); + }); + } + + @CartesianTest + void rwScanRequestFailsIfTableWasDropped(@Values(booleans = {false, true}) boolean onExistingRow) { + testRwOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, key) -> { + return doRwScanRetrieveBatchRequest(targetTxId); + }); + } + + @CartesianTest + void singleRowRoGetFailsIfTableWasDropped( + @Values(booleans = {false, true}) boolean direct, + @Values(booleans = {false, true}) boolean onExistingRow + ) { + testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, readTimestamp, key) -> { + if (direct) { + return doReadOnlyDirectSingleGet(marshalQuietly(key, kvMarshaller)); + } else { + return doReadOnlySingleGet(marshalQuietly(key, kvMarshaller), readTimestamp); + } + }); + } + + private void testRoOperationFailsIfTableWasDropped(boolean onExistingRow, RoListenerInvocation listenerInvocation) { + TestKey key = nextKey(); + + if (onExistingRow) { + upsertInNewTxFor(key); + } + + UUID txId = newTxId(); + HybridTimestamp readTs = clock.now(); + + when(catalog.table(eq(TABLE_ID))).thenReturn(null); + + CompletableFuture<?> future = listenerInvocation.invoke(txId, readTs, key); + + IncompatibleSchemaVersionException ex = assertWillThrowFast(future, IncompatibleSchemaVersionException.class); + assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR)); + assertThat(ex.getMessage(), is("Table was dropped [tableId=1]")); + } + + @CartesianTest + void multiRowRoGetFailsIfTableWasDropped( + @Values(booleans = {false, true}) boolean direct, + @Values(booleans = {false, true}) boolean onExistingRow + ) { + testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, readTimestamp, key) -> { + if (direct) { + return doReadOnlyDirectMultiGet(List.of(marshalQuietly(key, kvMarshaller))); + } else { + return doReadOnlyMultiGet(List.of(marshalQuietly(key, kvMarshaller)), readTimestamp); + } + }); + } + + @CartesianTest + void roScanRequestFailsIfTableWasDropped(@Values(booleans = {false, true}) boolean onExistingRow) { + testRoOperationFailsIfTableWasDropped(onExistingRow, (targetTxId, readTimestamp, key) -> { + return doRoScanRetrieveBatchRequest(targetTxId, readTimestamp); + }); + } + + @CartesianTest + @CartesianTest.MethodFactory("singleRowRwOperationTypesFactory") + void singleRowRwOperationsFailIfSchemaVersionMismatchesTx(RequestType requestType, boolean onExistingRow, boolean full) { + RwListenerInvocation invocation = null; + + if (RequestTypes.isSingleRowRwPkOnly(requestType)) { + invocation = (targetTxId, key) -> doSingleRowPkRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full); + } else if (RequestTypes.isSingleRowRwFullRow(requestType)) { + invocation = (targetTxId, key) -> doSingleRowRequest(targetTxId, marshalKeyOrKeyValue(requestType, key), requestType, full); + } else { + fail("Uncovered type: " + requestType); + } + + testRwOperationFailsIfSchemaVersionMismatchesTx(onExistingRow, invocation); + } + + private void testRwOperationFailsIfSchemaVersionMismatchesTx(boolean onExistingRow, RwListenerInvocation listenerInvocation) { + TestKey key = nextKey(); + + if (onExistingRow) { + upsertInNewTxFor(key); + } + + UUID txId = newTxId(); + + makeSchemaBeNextVersion(); + + CompletableFuture<?> future = listenerInvocation.invoke(txId, key); + + assertThat(future, willThrow(InternalSchemaVersionMismatchException.class)); + } + + private void makeSchemaBeNextVersion() { + CatalogTableDescriptor tableVersion2 = mock(CatalogTableDescriptor.class); + when(tableVersion2.tableVersion()).thenReturn(NEXT_SCHEMA_VERSION); + when(tableVersion2.name()).thenReturn(TABLE_NAME_2); + + when(catalog.table(eq(TABLE_ID))).thenReturn(tableVersion2); + } + + @CartesianTest + @CartesianTest.MethodFactory("multiRowRwOperationTypesFactory") + void multiRowRwOperationsFailIfSchemaVersionMismatchesTx(RequestType requestType, boolean onExistingRow, boolean full) { + RwListenerInvocation invocation = null; + + if (RequestTypes.isMultipleRowsRwPkOnly(requestType)) { + invocation = (targetTxId, key) + -> doMultiRowPkRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full); + } else if (RequestTypes.isMultipleRowsRwFullRows(requestType)) { + invocation = (targetTxId, key) + -> doMultiRowRequest(targetTxId, List.of(marshalKeyOrKeyValue(requestType, key)), requestType, full); + } else { + fail("Uncovered type: " + requestType); + } + + testRwOperationFailsIfSchemaVersionMismatchesTx(onExistingRow, invocation); + } + + @CartesianTest + void replaceRequestFailsIfSchemaVersionMismatchesTx( + @Values(booleans = {false, true}) boolean onExistingRow, + @Values(booleans = {false, true}) boolean full + ) { + testRwOperationFailsIfSchemaVersionMismatchesTx(onExistingRow, (targetTxId, key) -> { + return doReplaceRequest( + targetTxId, + marshalKeyOrKeyValue(RW_REPLACE, key), + marshalKeyOrKeyValue(RW_REPLACE, key), + full + ); + }); + } + + @CartesianTest + void singleRowRoGetFailsIfSchemaVersionMismatchesTx( + @Values(booleans = {false, true}) boolean direct, + @Values(booleans = {false, true}) boolean onExistingRow + ) { + testRoOperationFailsIfSchemaVersionMismatchesTx(onExistingRow, (targetTxId, readTimestamp, key) -> { + if (direct) { + return doReadOnlyDirectSingleGet(marshalQuietly(key, kvMarshaller)); + } else { + return doReadOnlySingleGet(marshalQuietly(key, kvMarshaller), readTimestamp); + } + }); + } + + private void testRoOperationFailsIfSchemaVersionMismatchesTx(boolean onExistingRow, RoListenerInvocation listenerInvocation) { + TestKey key = nextKey(); + + if (onExistingRow) { + upsertInNewTxFor(key); + } + + UUID txId = newTxId(); + HybridTimestamp readTs = clock.now(); + + makeSchemaBeNextVersion(); + + CompletableFuture<?> future = listenerInvocation.invoke(txId, readTs, key); + + assertThat(future, willThrow(InternalSchemaVersionMismatchException.class)); + } + + @CartesianTest + void multiRowRoGetFailsIfSchemaVersionMismatchesTx( + @Values(booleans = {false, true}) boolean direct, + @Values(booleans = {false, true}) boolean onExistingRow + ) { + testRoOperationFailsIfSchemaVersionMismatchesTx(onExistingRow, (targetTxId, readTimestamp, key) -> { + if (direct) { + return doReadOnlyDirectMultiGet(List.of(marshalQuietly(key, kvMarshaller))); + } else { + return doReadOnlyMultiGet(List.of(marshalQuietly(key, kvMarshaller)), readTimestamp); + } + }); + } + + private UUID newTxId() { + return transactionIdFor(clock.now()); + } + + private void upsert(UUID txId, BinaryRow row) { + assertThat(upsertAsync(txId, row), willCompleteSuccessfully()); + } + + private CompletableFuture<ReplicaResult> upsertAsync(UUID txId, BinaryRow row) { + return upsertAsync(txId, row, false); + } + + private CompletableFuture<ReplicaResult> upsertAsync(UUID txId, BinaryRow row, boolean full) { + ReadWriteSingleRowReplicaRequest message = TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableId(TABLE_ID) + .requestType(RW_UPSERT) + .transactionId(txId) + .schemaVersion(row.schemaVersion()) + .binaryTuple(row.tupleSlice()) + .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) + .commitPartitionId(commitPartitionId()) + .coordinatorId(localNode.id()) + .full(full) + .timestamp(clock.now()) + .build(); + + return partitionReplicaListener.invoke(message, localNode.id()); + } + + private CompletableFuture<ReplicaResult> doReadOnlyMultiGet(Collection<BinaryRow> rows, HybridTimestamp readTimestamp) { + ReadOnlyMultiRowPkReplicaRequest request = readOnlyMultiRowPkReplicaRequest(grpId, newTxId(), localNode.id(), rows, readTimestamp); + + return invokeListener(request); + } + + private static ReadOnlyMultiRowPkReplicaRequest readOnlyMultiRowPkReplicaRequest( + TablePartitionId grpId, + UUID txId, + UUID coordinatorId, + Collection<BinaryRow> rows, + HybridTimestamp readTimestamp + ) { + return TABLE_MESSAGES_FACTORY.readOnlyMultiRowPkReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableId(TABLE_ID) + .requestType(RO_GET_ALL) + .readTimestamp(readTimestamp) + .schemaVersion(rows.iterator().next().schemaVersion()) + .primaryKeys(binaryRowsToBuffers(rows)) + .transactionId(txId) + .coordinatorId(coordinatorId) + .build(); + } + + private CompletableFuture<ReplicaResult> doReadOnlyDirectMultiGet(Collection<BinaryRow> rows) { + ReadOnlyDirectMultiRowReplicaRequest request = readOnlyDirectMultiRowReplicaRequest(grpId, rows); + + return invokeListener(request); + } + + private static ReadOnlyDirectMultiRowReplicaRequest readOnlyDirectMultiRowReplicaRequest( + TablePartitionId grpId, + Collection<BinaryRow> rows + ) { + return TABLE_MESSAGES_FACTORY.readOnlyDirectMultiRowReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableId(TABLE_ID) + .requestType(RO_GET_ALL) + .schemaVersion(rows.iterator().next().schemaVersion()) + .primaryKeys(binaryRowsToBuffers(rows)) + .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) + .build(); + } + + private void cleanup(UUID txId) { + cleanup(txId, true); + } + + private void cleanup(UUID txId, boolean commit) { + TxState newTxState = commit ? COMMITTED : ABORTED; + + HybridTimestamp commitTs = clock.now(); + HybridTimestamp commitTsOrNull = commit ? commitTs : null; + + txManager.updateTxMeta(txId, old -> new TxStateMeta(newTxState, UUID.randomUUID(), commitPartitionId, commitTsOrNull, null, null)); + lockManager.releaseAll(txId); + partitionReplicaListener.cleanupLocally(txId, commit, commitTs); + } + + private static TestKey nextKey() { + return new TestKey(monotonicInt(), "key " + monotonicInt()); + } + + private static int monotonicInt() { + return nextMonotonicInt.getAndIncrement(); + } + + protected BinaryRow binaryRow(int i) { + return binaryRow(new TestKey(i, "k" + i), new TestValue(i, "v" + i)); + } + + private BinaryRow binaryRow(TestKey key, TestValue value) { + return binaryRow(key, value, kvMarshaller); + } + + private static BinaryRow binaryRow(TestKey key, TestValue value, KvMarshaller<TestKey, TestValue> marshaller) { + return marshaller.marshal(key, value); + } + + /** + * Test pojo key. + */ + private static class TestKey { + @IgniteToStringInclude + public int intKey; + + @IgniteToStringInclude + public String strKey; + + public TestKey() { + } + + public TestKey(int intKey, String strKey) { + this.intKey = intKey; + this.strKey = strKey; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestKey testKey = (TestKey) o; + return intKey == testKey.intKey && Objects.equals(strKey, testKey.strKey); + } + + @Override + public int hashCode() { + return Objects.hash(intKey, strKey); + } + + @Override + public String toString() { + return S.toString(TestKey.class, this); + } + } + + /** + * Test pojo value. + */ + private static class TestValue implements Comparable<TestValue> { + @IgniteToStringInclude + public Integer intVal; + + @IgniteToStringInclude + public String strVal; + + public TestValue() { + } + + public TestValue(Integer intVal, String strVal) { + this.intVal = intVal; + this.strVal = strVal; + } + + @Override + public int compareTo(TestValue o) { + int cmp = Integer.compare(intVal, o.intVal); + + return cmp != 0 ? cmp : strVal.compareTo(o.strVal); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TestValue testValue = (TestValue) o; + return Objects.equals(intVal, testValue.intVal) && Objects.equals(strVal, testValue.strVal); + } + + @Override + public int hashCode() { + return Objects.hash(intVal, strVal); + } + + @Override + public String toString() { + return S.toString(TestValue.class, this); + } + } + + @FunctionalInterface + private interface RwListenerInvocation { + CompletableFuture<?> invoke(UUID targetTxId, TestKey key); + } + + @FunctionalInterface + private interface RoListenerInvocation { + CompletableFuture<?> invoke(UUID targetTxId, HybridTimestamp readTimestamp, TestKey key); + } + + private void completeBuiltIndexes(IndexStorage... indexStorages) { + TestStorageUtils.completeBuiltIndexes(testMvPartitionStorage, indexStorages); + } + + private static @Nullable TxStateResponse toTxStateResponse(@Nullable TransactionMeta transactionMeta) { + TransactionMetaMessage transactionMetaMessage = + transactionMeta == null ? null : transactionMeta.toTransactionMetaMessage(REPLICA_MESSAGES_FACTORY, TX_MESSAGES_FACTORY); + + return TX_MESSAGES_FACTORY.txStateResponse() + .txStateMeta(transactionMetaMessage) + .build(); + } +} diff --git a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java index 0c018d32751..9dfa6cdc053 100644 --- a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java +++ b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java @@ -22,6 +22,7 @@ import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl; import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; +import static org.apache.ignite.internal.lang.IgniteSystemProperties.enabledColocation; import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate; import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCode; import static org.apache.ignite.internal.testframework.IgniteTestUtils.bypassingThreadAssertions; @@ -71,6 +72,7 @@ import org.apache.ignite.internal.placementdriver.message.PlacementDriverMessage import org.apache.ignite.internal.placementdriver.message.StopLeaseProlongationMessage; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; +import org.apache.ignite.internal.replicator.ZonePartitionId; import org.apache.ignite.internal.replicator.message.ErrorTimestampAwareReplicaResponse; import org.apache.ignite.internal.replicator.message.TimestampAwareReplicaResponse; import org.apache.ignite.internal.schema.BinaryRow; @@ -151,7 +153,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { public void testMultipleAbandonedTxsAreAborted() throws Exception { TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME)); - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID); + var tblReplicationGrp = enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID); String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); @@ -207,7 +211,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { public void testMultipleRecoveryRequestsIssued() throws Exception { TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME)); - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID); + var tblReplicationGrp = enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID); String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); @@ -253,7 +259,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { public void testAbandonedTxIsAborted() throws Exception { TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME)); - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID); + var tblReplicationGrp = enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID); String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); @@ -296,7 +304,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { public void testAbandonedTxWithCoarseLockIsAborted() throws Exception { TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME)); - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID); + var tblReplicationGrp = enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID); String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); @@ -331,7 +341,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { public void testWriteIntentRecoverNoCoordinator() throws Exception { TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME)); - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID); + var tblReplicationGrp = enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID); String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); @@ -377,7 +389,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { public void testWriteIntentNoRecovery() throws Exception { TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME)); - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID); + var tblReplicationGrp = enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID); String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); @@ -422,7 +436,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { public void testWriteIntentRecoveryAndLockConflict() throws Exception { TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME)); - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID); + var tblReplicationGrp = enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID); String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); @@ -486,7 +502,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { public void testSendCommitAndDie() throws Exception { TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME)); - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID); + var tblReplicationGrp = enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID); String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); @@ -555,7 +573,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { public void testCommitAndDieRecoveryFirst() throws Exception { TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME)); - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID); + var tblReplicationGrp = enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID); String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); @@ -630,7 +650,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { public void testRecoveryIsTriggeredOnce() throws Exception { TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME)); - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID); + var tblReplicationGrp = enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID); String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); @@ -707,7 +729,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { public void testFinishAlreadyFinishedTx() throws Exception { TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME)); - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID); + var tblReplicationGrp = enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID); String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); @@ -746,7 +770,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { public void testPrimaryFailureRightAfterCommitMsg() throws Exception { TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME)); - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID); + var tblReplicationGrp = enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID); String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); @@ -806,7 +832,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { public void testPrimaryFailureWhileInflightInProgress() throws Exception { TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME)); - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID); + var tblReplicationGrp = enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID); String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); @@ -848,7 +876,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { public void testPrimaryFailureWhileInflightInProgressAfterFirstResponse() throws Exception { TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME)); - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID); + var tblReplicationGrp = enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID); String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); @@ -903,7 +933,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { preloadData(tbl, 10); - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID); + var tblReplicationGrp = enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID); String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); @@ -956,7 +988,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { public void testCursorCleanup(boolean readOnly) throws Exception { TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME)); - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID); + var tblReplicationGrp = enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID); String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); @@ -1038,7 +1072,12 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { Publisher<BinaryRow> publisher; if (tx.isReadOnly()) { - String primary = waitAndGetLeaseholder(node(0), new TablePartitionId(tbl.tableId(), PART_ID)); + String primary = waitAndGetLeaseholder( + node(0), + enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID) + ); ClusterNode primaryNode = node(0).clusterNodes().stream().filter(node -> node.name().equals(primary)).findAny().get(); @@ -1067,7 +1106,9 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { public void testCursorsClosedAfterTxClose() throws Exception { TableImpl tbl = unwrapTableImpl(node(0).tables().table(TABLE_NAME)); - var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID); + var tblReplicationGrp = enabledColocation() + ? new ZonePartitionId(tbl.zoneId(), PART_ID) + : new TablePartitionId(tbl.tableId(), PART_ID); String leaseholder = waitAndGetPrimaryReplica(unwrapIgniteImpl(node(0)), tblReplicationGrp).getLeaseholder(); @@ -1143,7 +1184,10 @@ public class ItTransactionRecoveryTest extends ClusterPerTestIntegrationTest { private static @Nullable TxMeta txStoredMeta(IgniteImpl node, UUID txId) { InternalTable internalTable = unwrapTableImpl(node.tables().table(TABLE_NAME)).internalTable(); - return bypassingThreadAssertions(() -> internalTable.txStateStorage().getPartitionStorage(0).get(txId)); + return enabledColocation() + ? bypassingThreadAssertions( + () -> node.partitionReplicaLifecycleManager().txStatePartitionStorage(internalTable.zoneId(), 0).get(txId)) + : bypassingThreadAssertions(() -> internalTable.txStateStorage().getPartitionStorage(0).get(txId)); } /**