This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 39a3ac4ba6 IGNITE-18856 Switch primary replica calls from Raft leader
to primary replica (#2488)
39a3ac4ba6 is described below
commit 39a3ac4ba6027d2048b9deeea4ef1ca2e7a7ee91
Author: Alexander Lapin <[email protected]>
AuthorDate: Tue Sep 12 16:03:16 2023 +0300
IGNITE-18856 Switch primary replica calls from Raft leader to primary
replica (#2488)
---
.../ignite/client/fakes/FakeInternalTable.java | 6 +
modules/placement-driver-api/build.gradle | 4 +
.../internal/placementdriver/PlacementDriver.java | 4 +-
.../{LeaseMeta.java => ReplicaMeta.java} | 3 +-
.../placementdriver/TestPlacementDriver.java | 47 ++++++++
.../placementdriver/TestReplicaMetaImpl.java | 82 +++++++++++++
.../internal/placementdriver/ActiveActorTest.java | 4 +-
.../MultiActorPlacementDriverTest.java | 1 -
.../PlacementDriverManagerTest.java | 1 -
.../placementdriver/AssignmentsTracker.java | 43 ++++---
.../internal/placementdriver/LeaseUpdater.java | 5 +-
.../placementdriver/PlacementDriverManager.java | 15 ++-
.../internal/placementdriver/leases/Lease.java | 14 ++-
.../placementdriver/leases/LeaseTracker.java | 78 ++++++-------
.../placementdriver/PlacementDriverTest.java | 33 +++---
modules/runner/build.gradle | 1 +
.../raftsnapshot/ItTableRaftSnapshotsTest.java | 1 +
.../rebalance/ItRebalanceDistributedTest.java | 4 +-
.../runner/app/ItIgniteNodeRestartTest.java | 17 ++-
.../ItRaftCommandLeftInLogUntilRestartTest.java | 2 +
.../runner/app/ItSchemaChangeKvViewTest.java | 3 +
.../runner/app/ItSchemaChangeTableViewTest.java | 2 +
.../schemasync/ItSchemaSyncAndReplicationTest.java | 2 +
.../ignite/internal/table/ItTableScanTest.java | 34 ++++--
.../org/apache/ignite/internal/app/IgniteImpl.java | 15 ++-
modules/sql-engine/build.gradle | 1 +
.../sql/engine/exec/MockedStructuresTest.java | 6 +-
.../exec/rel/TableScanNodeExecutionTest.java | 4 +-
modules/table/build.gradle | 5 +
.../ItAbstractInternalTableScanTest.java | 7 +-
.../ItInternalTableReadOnlyScanTest.java | 2 +-
.../ignite/distributed/ItTablePersistenceTest.java | 8 +-
.../ignite/internal/table/ItColocationTest.java | 15 +--
.../ignite/internal/table/InternalTable.java | 3 +
.../internal/table/distributed/TableManager.java | 32 +++--
.../table/distributed/raft/PartitionListener.java | 33 ++++--
.../request/ReadWriteReplicaRequest.java | 2 +-
.../replicator/PartitionReplicaListener.java | 129 ++++++++++++++-------
...ntDriver.java => TransactionStateResolver.java} | 6 +-
.../distributed/storage/InternalTableImpl.java | 84 ++++++++------
.../apache/ignite/internal/table/TxLocalTest.java | 46 +++++++-
.../table/distributed/TableManagerTest.java | 4 +-
.../PartitionReplicaListenerIndexLockingTest.java | 12 +-
.../replication/PartitionReplicaListenerTest.java | 13 ++-
.../distributed/storage/InternalTableImplTest.java | 7 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 20 +++-
.../ignite/internal/table/TxAbstractTest.java | 2 +
.../table/impl/DummyInternalTableImpl.java | 40 +++----
48 files changed, 619 insertions(+), 283 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index b754b441dc..cb036447d2 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Flow.Publisher;
import java.util.function.BiConsumer;
+import java.util.function.Function;
import javax.naming.OperationNotSupportedException;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -498,4 +499,9 @@ public class FakeInternalTable implements InternalTable {
public @Nullable PendingComparableValuesTracker<Long, Void>
getPartitionStorageIndexTracker(int partitionId) {
return null;
}
+
+ @Override
+ public Function<String, ClusterNode> getClusterNodeResolver() {
+ return null;
+ }
}
diff --git a/modules/placement-driver-api/build.gradle
b/modules/placement-driver-api/build.gradle
index 2d68f81821..a05a01e61e 100644
--- a/modules/placement-driver-api/build.gradle
+++ b/modules/placement-driver-api/build.gradle
@@ -18,6 +18,7 @@
apply from: "$rootDir/buildscripts/java-core.gradle"
apply from: "$rootDir/buildscripts/publishing.gradle"
apply from: "$rootDir/buildscripts/java-junit5.gradle"
+apply from: "$rootDir/buildscripts/java-test-fixtures.gradle"
dependencies {
annotationProcessor project(":ignite-network-annotation-processor")
@@ -25,6 +26,9 @@ dependencies {
implementation project(':ignite-core')
implementation project(':ignite-network-api')
implementation libs.jetbrains.annotations
+
+ testFixturesImplementation project(':ignite-core')
+ testFixturesImplementation libs.jetbrains.annotations
}
description = 'ignite-placement-driver-api'
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
index b18e7d952b..7e262ed8f8 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
@@ -36,7 +36,7 @@ public interface PlacementDriver {
* @param timestamp Timestamp reference value.
* @return Primary replica future.
*/
- CompletableFuture<LeaseMeta> awaitPrimaryReplica(ReplicationGroupId
groupId, HybridTimestamp timestamp);
+ CompletableFuture<ReplicaMeta> awaitPrimaryReplica(ReplicationGroupId
groupId, HybridTimestamp timestamp);
/**
* Same as {@link #awaitPrimaryReplica(ReplicationGroupId,
HybridTimestamp)} despite the fact that given method await logic is bounded.
@@ -47,5 +47,5 @@ public interface PlacementDriver {
* @param timestamp Timestamp reference value.
* @return Primary replica future.
*/
- CompletableFuture<LeaseMeta> getPrimaryReplica(ReplicationGroupId
replicationGroupId, HybridTimestamp timestamp);
+ CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId
replicationGroupId, HybridTimestamp timestamp);
}
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/LeaseMeta.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/ReplicaMeta.java
similarity index 94%
rename from
modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/LeaseMeta.java
rename to
modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/ReplicaMeta.java
index 6e68132d65..b03a5a2781 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/LeaseMeta.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/ReplicaMeta.java
@@ -17,12 +17,13 @@
package org.apache.ignite.internal.placementdriver;
+import java.io.Serializable;
import org.apache.ignite.internal.hlc.HybridTimestamp;
/**
* Replica lease meta.
*/
-public interface LeaseMeta {
+public interface ReplicaMeta extends Serializable {
/**
* Get a leaseholder node.
*
diff --git
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
new file mode 100644
index 0000000000..3eaec15b03
--- /dev/null
+++
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Test placement driver service that immediately returns unbounded primary
replica from both await and get methods for the specified
+ * leaseholder.
+ */
+@TestOnly
+public class TestPlacementDriver implements PlacementDriver {
+
+ private final TestReplicaMetaImpl primaryReplica;
+
+ public TestPlacementDriver(String leaseholder) {
+ this.primaryReplica = new TestReplicaMetaImpl(leaseholder);
+ }
+
+ @Override
+ public CompletableFuture<ReplicaMeta>
awaitPrimaryReplica(ReplicationGroupId groupId, HybridTimestamp timestamp) {
+ return CompletableFuture.completedFuture(primaryReplica);
+ }
+
+ @Override
+ public CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId
replicationGroupId, HybridTimestamp timestamp) {
+ return CompletableFuture.completedFuture(primaryReplica);
+ }
+}
diff --git
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestReplicaMetaImpl.java
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestReplicaMetaImpl.java
new file mode 100644
index 0000000000..35d01de9ed
--- /dev/null
+++
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestReplicaMetaImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import static org.apache.ignite.internal.hlc.HybridTimestamp.MAX_VALUE;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.jetbrains.annotations.TestOnly;
+
+/** Test implementation of the {@link ReplicaMeta}. */
+@TestOnly
+public class TestReplicaMetaImpl implements ReplicaMeta {
+ private static final long serialVersionUID = -382174507405586033L;
+
+ /** A node that holds a lease. */
+ private final String leaseholder;
+
+ /** Lease start timestamp. The timestamp is assigned when the lease
created and is not changed when the lease is prolonged. */
+ private final HybridTimestamp startTime;
+
+ /** Timestamp to expiration the lease. */
+ private final HybridTimestamp expirationTime;
+
+ /**
+ * Creates a new primary meta with unbounded period.
+ *
+ * @param leaseholder Lease holder.
+ */
+ public TestReplicaMetaImpl(String leaseholder) {
+ this.leaseholder = leaseholder;
+ this.startTime = MIN_VALUE;
+ this.expirationTime = MAX_VALUE;
+ }
+
+ /**
+ * Creates a new primary meta.
+ *
+ * @param leaseholder Lease holder.
+ * @param startTime Start lease timestamp.
+ * @param leaseExpirationTime Lease expiration timestamp.
+ */
+ public TestReplicaMetaImpl(
+ String leaseholder,
+ HybridTimestamp startTime,
+ HybridTimestamp leaseExpirationTime
+ ) {
+ this.leaseholder = leaseholder;
+ this.startTime = startTime;
+ this.expirationTime = leaseExpirationTime;
+ }
+
+ @Override
+ public String getLeaseholder() {
+ return leaseholder;
+ }
+
+ @Override
+ public HybridTimestamp getStartTime() {
+ return startTime;
+ }
+
+ @Override
+ public HybridTimestamp getExpirationTime() {
+ return expirationTime;
+ }
+}
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
index 7d32e08d03..83ae5118d7 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ActiveActorTest.java
@@ -72,8 +72,6 @@ import
org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.vault.VaultManager;
-import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
@@ -156,7 +154,6 @@ public class ActiveActorTest extends IgniteAbstractTest {
PlacementDriverManager placementDriverManager = new
PlacementDriverManager(
nodeName,
msm,
- new VaultManager(new InMemoryVaultService()),
GROUP_ID,
clusterService,
() -> completedFuture(placementDriverNodesNames),
@@ -432,6 +429,7 @@ public class ActiveActorTest extends IgniteAbstractTest {
int nodes,
int clientPort
) {
+ when(msm.recoveryFinishedFuture()).thenReturn(completedFuture(0L));
when(msm.invoke(any(), any(Operation.class),
any(Operation.class))).thenReturn(completedFuture(true));
List<NetworkAddress> addresses = getNetworkAddresses(nodes);
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
index 3f9f9e53fa..8ae953ff9f 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java
@@ -275,7 +275,6 @@ public class MultiActorPlacementDriverTest extends
BasePlacementDriverTest {
var placementDriverManager = new PlacementDriverManager(
nodeName,
metaStorageManager,
- vaultManager,
MetastorageGroupId.INSTANCE,
clusterService,
cmgManager::metaStorageNodes,
diff --git
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
index ee4db86945..54fadc01e4 100644
---
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
+++
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java
@@ -193,7 +193,6 @@ public class PlacementDriverManagerTest extends
BasePlacementDriverTest {
placementDriverManager = new PlacementDriverManager(
nodeName,
metaStorageManager,
- vaultManager,
MetastorageGroupId.INSTANCE,
clusterService,
cmgManager::metaStorageNodes,
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index 9bf4778909..20daacc536 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -30,6 +30,7 @@ import java.util.stream.Collectors;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
@@ -39,8 +40,6 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.internal.vault.VaultEntry;
-import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
/**
@@ -50,9 +49,6 @@ public class AssignmentsTracker {
/** Ignite logger. */
private static final IgniteLogger LOG =
Loggers.forClass(AssignmentsTracker.class);
- /** Vault manager. */
- private final VaultManager vaultManager;
-
/** Meta storage manager. */
private final MetaStorageManager msManager;
@@ -65,11 +61,9 @@ public class AssignmentsTracker {
/**
* The constructor.
*
- * @param vaultManager Vault manager.
* @param msManager Metastorage manager.
*/
- public AssignmentsTracker(VaultManager vaultManager, MetaStorageManager
msManager) {
- this.vaultManager = vaultManager;
+ public AssignmentsTracker(MetaStorageManager msManager) {
this.msManager = msManager;
this.groupAssignments = new ConcurrentHashMap<>();
@@ -82,22 +76,33 @@ public class AssignmentsTracker {
public void startTrack() {
msManager.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
assignmentsListener);
- try (Cursor<VaultEntry> cursor = vaultManager.range(
- ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
-
ByteArray.fromString(incrementLastChar(STABLE_ASSIGNMENTS_PREFIX))
- )) {
- for (VaultEntry entry : cursor) {
- String key = entry.key().toString();
+ msManager.recoveryFinishedFuture().thenAccept(recoveryRevision -> {
+ try (Cursor<Entry> cursor =
msManager.getLocally(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
+
ByteArray.fromString(incrementLastChar(STABLE_ASSIGNMENTS_PREFIX)),
recoveryRevision);
+ ) {
+ for (Entry entry : cursor) {
+ if (entry.tombstone()) {
+ continue;
+ }
+
+ byte[] key = entry.key();
+ byte[] value = entry.value();
+
+ // MetaStorage iterator should not return nulls as values.
+ assert value != null;
- key = key.replace(STABLE_ASSIGNMENTS_PREFIX, "");
+ String strKey = new String(key, StandardCharsets.UTF_8);
- TablePartitionId grpId = TablePartitionId.fromString(key);
+ strKey = strKey.replace(STABLE_ASSIGNMENTS_PREFIX, "");
- Set<Assignment> assignments =
ByteUtils.fromBytes(entry.value());
+ TablePartitionId grpId =
TablePartitionId.fromString(strKey);
- groupAssignments.put(grpId, assignments);
+ Set<Assignment> assignments =
ByteUtils.fromBytes(entry.value());
+
+ groupAssignments.put(grpId, assignments);
+ }
}
- }
+ });
LOG.info("Assignment cache initialized for placement driver
[groupAssignments={}]", groupAssignments);
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index f99669bc89..687b7c98d0 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -52,7 +52,6 @@ import
org.apache.ignite.internal.placementdriver.negotiation.LeaseNegotiator;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.thread.IgniteThread;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteSystemProperties;
import org.apache.ignite.network.ClusterNode;
@@ -116,7 +115,6 @@ public class LeaseUpdater {
* Constructor.
*
* @param clusterService Cluster service.
- * @param vaultManager Vault manager.
* @param msManager Meta storage manager.
* @param topologyService Topology service.
* @param leaseTracker Lease tracker.
@@ -125,7 +123,6 @@ public class LeaseUpdater {
LeaseUpdater(
String nodeName,
ClusterService clusterService,
- VaultManager vaultManager,
MetaStorageManager msManager,
LogicalTopologyService topologyService,
LeaseTracker leaseTracker,
@@ -138,7 +135,7 @@ public class LeaseUpdater {
this.clock = clock;
this.longLeaseInterval =
IgniteSystemProperties.getLong("IGNITE_LONG_LEASE", 120_000);
- this.assignmentsTracker = new AssignmentsTracker(vaultManager,
msManager);
+ this.assignmentsTracker = new AssignmentsTracker(msManager);
this.topologyTracker = new TopologyTracker(topologyService);
this.updater = new Updater();
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index 9c0b01e69d..de67f489fe 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -39,7 +39,6 @@ import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
@@ -93,7 +92,6 @@ public class PlacementDriverManager implements
IgniteComponent {
*
* @param nodeName Node name.
* @param metaStorageMgr Meta Storage manager.
- * @param vaultManager Vault manager.
* @param replicationGroupId Id of placement driver group.
* @param clusterService Cluster service.
* @param placementDriverNodesNamesProvider Provider of the set of
placement driver nodes' names.
@@ -105,7 +103,6 @@ public class PlacementDriverManager implements
IgniteComponent {
public PlacementDriverManager(
String nodeName,
MetaStorageManager metaStorageMgr,
- VaultManager vaultManager,
ReplicationGroupId replicationGroupId,
ClusterService clusterService,
Supplier<CompletableFuture<Set<String>>>
placementDriverNodesNamesProvider,
@@ -122,11 +119,10 @@ public class PlacementDriverManager implements
IgniteComponent {
this.raftClientFuture = new CompletableFuture<>();
- this.leaseTracker = new LeaseTracker(vaultManager, metaStorageMgr);
+ this.leaseTracker = new LeaseTracker(metaStorageMgr);
this.leaseUpdater = new LeaseUpdater(
nodeName,
clusterService,
- vaultManager,
metaStorageMgr,
logicalTopologyService,
leaseTracker,
@@ -231,4 +227,13 @@ public class PlacementDriverManager implements
IgniteComponent {
boolean isActiveActor() {
return leaseUpdater.active();
}
+
+ /**
+ * Returns placement driver service.
+ *
+ * @return Placement driver service.
+ */
+ public PlacementDriver placementDriver() {
+ return leaseTracker;
+ }
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
index 87d36dd768..b8bef14056 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/Lease.java
@@ -26,7 +26,7 @@ import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.placementdriver.LeaseMeta;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.ByteUtils;
@@ -35,7 +35,7 @@ import org.apache.ignite.internal.util.ByteUtils;
* A lease representation in memory.
* The real lease is stored in Meta storage.
*/
-public class Lease implements LeaseMeta {
+public class Lease implements ReplicaMeta {
/** The object is used when nothing holds the lease. Empty lease is always
expired. */
public static Lease EMPTY_LEASE = new Lease(null, MIN_VALUE, MIN_VALUE,
null);
@@ -65,8 +65,12 @@ public class Lease implements LeaseMeta {
* @param leaseExpirationTime Lease expiration timestamp.
* @param replicationGroupId Id of replication group.
*/
- public Lease(String leaseholder, HybridTimestamp startTime,
HybridTimestamp leaseExpirationTime,
- ReplicationGroupId replicationGroupId) {
+ public Lease(
+ String leaseholder,
+ HybridTimestamp startTime,
+ HybridTimestamp leaseExpirationTime,
+ ReplicationGroupId replicationGroupId
+ ) {
this(leaseholder, startTime, leaseExpirationTime, false, false,
replicationGroupId);
}
@@ -79,7 +83,7 @@ public class Lease implements LeaseMeta {
* @param prolong Lease is available to prolong.
* @param accepted The flag is true when the holder accepted the lease,
the false otherwise.
*/
- Lease(
+ public Lease(
String leaseholder,
HybridTimestamp startTime,
HybridTimestamp leaseExpirationTime,
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index 22b1e0e8c2..1d1350c469 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -21,6 +21,7 @@ import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.hlc.HybridTimestamp.CLOCK_SKEW;
import static org.apache.ignite.internal.hlc.HybridTimestamp.MIN_VALUE;
import static
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
import static
org.apache.ignite.internal.placementdriver.leases.Lease.EMPTY_LEASE;
@@ -45,13 +46,11 @@ import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
-import org.apache.ignite.internal.placementdriver.LeaseMeta;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import
org.apache.ignite.internal.util.PendingIndependentComparableValuesTracker;
-import org.apache.ignite.internal.vault.VaultEntry;
-import org.apache.ignite.internal.vault.VaultManager;
/**
* Class tracks cluster leases in memory.
@@ -61,9 +60,6 @@ public class LeaseTracker implements PlacementDriver {
/** Ignite logger. */
private static final IgniteLogger LOG =
Loggers.forClass(LeaseTracker.class);
- /** Vault manager. */
- private final VaultManager vaultManager;
-
/** Meta storage manager. */
private final MetaStorageManager msManager;
@@ -77,7 +73,7 @@ public class LeaseTracker implements PlacementDriver {
private volatile Leases leases = new Leases(emptyMap(), BYTE_EMPTY_ARRAY);
/** Map of primary replica waiters. */
- private final Map<ReplicationGroupId,
PendingIndependentComparableValuesTracker<HybridTimestamp, LeaseMeta>>
primaryReplicaWaiters
+ private final Map<ReplicationGroupId,
PendingIndependentComparableValuesTracker<HybridTimestamp, ReplicaMeta>>
primaryReplicaWaiters
= new ConcurrentHashMap<>();
/** Listener to update a leases cache. */
@@ -86,11 +82,9 @@ public class LeaseTracker implements PlacementDriver {
/**
* Constructor.
*
- * @param vaultManager Vault manager.
* @param msManager Meta storage manager.
*/
- public LeaseTracker(VaultManager vaultManager, MetaStorageManager
msManager) {
- this.vaultManager = vaultManager;
+ public LeaseTracker(MetaStorageManager msManager) {
this.msManager = msManager;
}
@@ -99,31 +93,33 @@ public class LeaseTracker implements PlacementDriver {
inBusyLock(busyLock, () -> {
msManager.registerPrefixWatch(PLACEMENTDRIVER_LEASES_KEY,
updateListener);
- CompletableFuture<VaultEntry> entryFut =
vaultManager.get(PLACEMENTDRIVER_LEASES_KEY);
-
- VaultEntry entry = entryFut.join();
+ msManager.recoveryFinishedFuture().thenAccept(recoveryRevision -> {
+ Entry entry = msManager.getLocally(PLACEMENTDRIVER_LEASES_KEY,
recoveryRevision);
- Map<ReplicationGroupId, Lease> leasesMap = new HashMap<>();
+ Map<ReplicationGroupId, Lease> leasesMap = new HashMap<>();
- byte[] leasesBytes;
+ byte[] leasesBytes;
- if (entry != null) {
- leasesBytes = entry.value();
+ if (entry.empty() || entry.tombstone()) {
+ leasesBytes = BYTE_EMPTY_ARRAY;
+ } else {
+ leasesBytes = entry.value();
- LeaseBatch leaseBatch =
LeaseBatch.fromBytes(ByteBuffer.wrap(leasesBytes).order(LITTLE_ENDIAN));
+ LeaseBatch leaseBatch =
LeaseBatch.fromBytes(ByteBuffer.wrap(leasesBytes).order(LITTLE_ENDIAN));
- leaseBatch.leases().forEach(lease -> {
- leasesMap.put(lease.replicationGroupId(), lease);
+ leaseBatch.leases().forEach(lease -> {
+ leasesMap.put(lease.replicationGroupId(), lease);
-
getOrCreatePrimaryReplicaWaiter(lease.replicationGroupId()).update(lease.getExpirationTime(),
lease);
- });
- } else {
- leasesBytes = BYTE_EMPTY_ARRAY;
- }
+ if (lease.isAccepted()) {
+
getOrCreatePrimaryReplicaWaiter(lease.replicationGroupId()).update(lease.getExpirationTime(),
lease);
+ }
+ });
+ }
- leases = new Leases(unmodifiableMap(leasesMap), leasesBytes);
+ leases = new Leases(unmodifiableMap(leasesMap), leasesBytes);
- LOG.info("Leases cache recovered [leases={}]", leases);
+ LOG.info("Leases cache recovered [leases={}]", leases);
+ });
});
}
@@ -181,9 +177,11 @@ public class LeaseTracker implements PlacementDriver {
leasesMap.put(grpId, lease);
- primaryReplicaWaiters
- .computeIfAbsent(grpId, groupId -> new
PendingIndependentComparableValuesTracker<>(MIN_VALUE))
- .update(lease.getExpirationTime(), lease);
+ if (lease.isAccepted()) {
+ primaryReplicaWaiters
+ .computeIfAbsent(grpId, groupId -> new
PendingIndependentComparableValuesTracker<>(MIN_VALUE))
+ .update(lease.getExpirationTime(), lease);
+ }
}
for (Iterator<Map.Entry<ReplicationGroupId, Lease>>
iterator = leasesMap.entrySet().iterator(); iterator.hasNext();) {
@@ -208,28 +206,28 @@ public class LeaseTracker implements PlacementDriver {
}
@Override
- public CompletableFuture<LeaseMeta> awaitPrimaryReplica(ReplicationGroupId
groupId, HybridTimestamp timestamp) {
+ public CompletableFuture<ReplicaMeta>
awaitPrimaryReplica(ReplicationGroupId groupId, HybridTimestamp timestamp) {
return inBusyLockAsync(busyLock, () ->
getOrCreatePrimaryReplicaWaiter(groupId).waitFor(timestamp));
}
@Override
- public CompletableFuture<LeaseMeta> getPrimaryReplica(ReplicationGroupId
replicationGroupId, HybridTimestamp timestamp) {
- return inBusyLockAsync(busyLock, () -> {
- Map<ReplicationGroupId, Lease> leasesMap = leases.leaseByGroupId();
+ public CompletableFuture<ReplicaMeta> getPrimaryReplica(ReplicationGroupId
replicationGroupId, HybridTimestamp timestamp) {
+ HybridTimestamp timestampWithClockSkew =
timestamp.addPhysicalTime(CLOCK_SKEW);
- Lease lease = leasesMap.getOrDefault(replicationGroupId,
EMPTY_LEASE);
+ return inBusyLockAsync(busyLock, () -> {
+ Lease lease =
leases.leaseByGroupId().getOrDefault(replicationGroupId, EMPTY_LEASE);
- if (lease.getExpirationTime().after(timestamp)) {
+ if (lease.isAccepted() &&
lease.getExpirationTime().after(timestampWithClockSkew)) {
return completedFuture(lease);
}
return msManager
.clusterTime()
- .waitFor(timestamp)
+ .waitFor(timestampWithClockSkew)
.thenApply(ignored -> inBusyLock(busyLock, () -> {
- Lease lease0 =
leasesMap.getOrDefault(replicationGroupId, EMPTY_LEASE);
+ Lease lease0 =
leases.leaseByGroupId().getOrDefault(replicationGroupId, EMPTY_LEASE);
- if (lease0.getExpirationTime().after(timestamp)) {
+ if (lease0.isAccepted() &&
lease0.getExpirationTime().after(timestampWithClockSkew)) {
return lease0;
} else {
return null;
@@ -254,7 +252,7 @@ public class LeaseTracker implements PlacementDriver {
});
}
- private PendingIndependentComparableValuesTracker<HybridTimestamp,
LeaseMeta> getOrCreatePrimaryReplicaWaiter(
+ private PendingIndependentComparableValuesTracker<HybridTimestamp,
ReplicaMeta> getOrCreatePrimaryReplicaWaiter(
ReplicationGroupId groupId
) {
return primaryReplicaWaiters.computeIfAbsent(groupId, key -> new
PendingIndependentComparableValuesTracker<>(MIN_VALUE));
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
index 4f2af2c7f6..a03a3669a0 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
@@ -65,6 +65,8 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
LEASEHOLDER_1,
new HybridTimestamp(1, 0),
new HybridTimestamp(5_000, 0),
+ false,
+ true,
GROUP_1
);
@@ -72,6 +74,8 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
LEASEHOLDER_1,
new HybridTimestamp(1, 0),
new HybridTimestamp(15_000, 0),
+ false,
+ true,
GROUP_1
);
@@ -79,6 +83,8 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
LEASEHOLDER_1,
new HybridTimestamp(15_000, 0),
new HybridTimestamp(30_000, 0),
+ false,
+ true,
GROUP_1
);
@@ -100,10 +106,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
revisionTracker = new PendingComparableValuesTracker<>(-1L);
- placementDriver = new LeaseTracker(
- vault,
- metastore
- );
+ placementDriver = new LeaseTracker(metastore);
metastore.registerRevisionUpdateListener(rev -> {
revisionTracker.update(rev, null);
@@ -139,7 +142,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
@Test
public void testAwaitPrimaryReplicaInInterval() throws Exception {
// Await primary replica for time 10.
- CompletableFuture<LeaseMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
assertFalse(primaryReplicaFuture.isDone());
// Publish primary replica for an interval [1, 5].
@@ -176,7 +179,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
@Test
public void testAwaitPrimaryReplicaBeforeInterval() throws Exception {
// Await primary replica for time 10.
- CompletableFuture<LeaseMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
assertFalse(primaryReplicaFuture.isDone());
// Publish primary replica for an interval [1, 5].
@@ -217,7 +220,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
AWAIT_PERIOD_FOR_LOCAL_NODE_TO_BE_NOTIFIED_ABOUT_LEASE_UPDATES));
// Await primary replica for time 10.
- CompletableFuture<LeaseMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
// Assert that primary waiter is completed.
assertTrue(primaryReplicaFuture.isDone());
@@ -238,8 +241,8 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
@Test
public void testTwoWaitersSameTime() throws Exception {
// Await primary replica for time 10 twice.
- CompletableFuture<LeaseMeta> primaryReplicaFuture1 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
- CompletableFuture<LeaseMeta> primaryReplicaFuture2 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture1 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture2 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
assertFalse(primaryReplicaFuture1.isDone());
assertFalse(primaryReplicaFuture2.isDone());
@@ -272,8 +275,8 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
@Test
public void testTwoWaitersSameTimeFirstTimedOutSecondSucceed() throws
Exception {
// Await primary replica for time 10 twice.
- CompletableFuture<LeaseMeta> primaryReplicaFuture1 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
- CompletableFuture<LeaseMeta> primaryReplicaFuture2 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture1 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture2 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
assertFalse(primaryReplicaFuture1.isDone());
assertFalse(primaryReplicaFuture2.isDone());
@@ -310,7 +313,7 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
@Test
public void testGetPrimaryReplica() throws Exception {
// Await primary replica for time 10.
- CompletableFuture<LeaseMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
assertFalse(primaryReplicaFuture.isDone());
// Publish primary replica for an interval [1, 15].
@@ -320,16 +323,16 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
assertThat(primaryReplicaFuture,
CompletableFutureMatcher.willSucceedFast());
// Assert that retrieved primary replica for same awaiting timestamp
as within await ones will be completed immediately.
- CompletableFuture<LeaseMeta> retrievedPrimaryReplicaSameTime =
placementDriver.getPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+ CompletableFuture<ReplicaMeta> retrievedPrimaryReplicaSameTime =
placementDriver.getPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
assertTrue(retrievedPrimaryReplicaSameTime.isDone());
// Assert that retrieved primary replica for awaiting timestamp lt
lease expiration time will be completed immediately.
- CompletableFuture<LeaseMeta>
retrievedPrimaryReplicaTimeLtLeaseExpiration =
+ CompletableFuture<ReplicaMeta>
retrievedPrimaryReplicaTimeLtLeaseExpiration =
placementDriver.getPrimaryReplica(GROUP_1, new
HybridTimestamp(14_000, 0));
assertTrue(retrievedPrimaryReplicaTimeLtLeaseExpiration.isDone());
// Assert that retrieved primary replica for awaiting timestamp gt
lease expiration time will be completed soon with null.
- CompletableFuture<LeaseMeta>
retrievedPrimaryReplicaTimeGtLeaseExpiration =
+ CompletableFuture<ReplicaMeta>
retrievedPrimaryReplicaTimeGtLeaseExpiration =
placementDriver.getPrimaryReplica(GROUP_1, new
HybridTimestamp(16_000, 0));
assertThat(retrievedPrimaryReplicaTimeGtLeaseExpiration,
CompletableFutureMatcher.willSucceedFast());
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 40fea9ac27..02fe02c42b 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -160,6 +160,7 @@ dependencies {
integrationTestImplementation testFixtures(project(':ignite-sql-engine'))
integrationTestImplementation testFixtures(project(':ignite-transactions'))
integrationTestImplementation testFixtures(project(':ignite-runner'))
+ integrationTestImplementation
testFixtures(project(':ignite-placement-driver-api'))
integrationTestImplementation libs.jetbrains.annotations
integrationTestImplementation libs.awaitility
integrationTestImplementation libs.rocksdb.jni
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 37299f5675..63985779b9 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -116,6 +116,7 @@ import org.junit.jupiter.params.provider.ValueSource;
*/
@SuppressWarnings("resource")
@Timeout(90)
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-20367")
class ItTableRaftSnapshotsTest extends IgniteIntegrationTest {
private static final IgniteLogger LOG =
Loggers.forClass(ItTableRaftSnapshotsTest.class);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 1be39e9a0d..8d28a2ef16 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -122,6 +122,7 @@ import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStora
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import
org.apache.ignite.internal.pagememory.configuration.schema.UnsafeMemoryAllocatorConfigurationSchema;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftNodeId;
@@ -978,7 +979,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
distributionZoneManager,
schemaSyncService,
catalogManager,
- new HybridTimestampTracker()
+ new HybridTimestampTracker(),
+ new TestPlacementDriver(name)
) {
@Override
protected TxStateTableStorage createTxStateTableStorage(
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 0b93eb93d3..628ac9b46d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -93,6 +93,7 @@ import
org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import org.apache.ignite.internal.network.recovery.VaultStateIds;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftNodeId;
@@ -398,7 +399,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
null,
schemaSyncService,
catalogManager,
- new HybridTimestampTracker()
+ new HybridTimestampTracker(),
+ new TestPlacementDriver(name)
);
var indexManager = new IndexManager(tablesConfig, schemaManager,
tableManager);
@@ -769,6 +771,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* Restarts the node which stores some data.
*/
@ParameterizedTest
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18733")
@ValueSource(booleans = {true, false})
public void metastorageRecoveryTest(boolean useSnapshot) throws
InterruptedException {
List<IgniteImpl> nodes = startNodes(2);
@@ -888,6 +891,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* Starts two nodes and checks that the data are storing through restarts.
Nodes restart in the same order when they started at first.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18733")
public void testTwoNodesRestartDirect() throws InterruptedException {
twoNodesRestart(true);
}
@@ -896,6 +900,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* Starts two nodes and checks that the data are storing through restarts.
Nodes restart in reverse order when they started at first.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18733")
public void testTwoNodesRestartReverse() throws InterruptedException {
twoNodesRestart(false);
}
@@ -1125,6 +1130,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* The test for node restart when there is a gap between the node local
configuration and distributed configuration.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18733")
public void testCfgGap() throws InterruptedException {
List<IgniteImpl> nodes = startNodes(4);
@@ -1193,9 +1199,12 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
try {
Tuple row = table.keyValueView().get(null,
Tuple.create().set("id", fi));
- assertEquals(VALUE_PRODUCER.apply(fi),
row.stringValue("name"));
-
- return true;
+ if (row == null) {
+ return false;
+ } else {
+ assertEquals(VALUE_PRODUCER.apply(fi),
row.stringValue("name"));
+ return true;
+ }
} catch (TransactionException te) {
// There may be an exception if the primary
replica node was stopped. We should wait for new primary to appear.
return false;
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
index ae2d5f9f7e..b931ad44b3 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -54,11 +54,13 @@ import org.apache.ignite.raft.jraft.entity.NodeId;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
* The class has tests of cluster recovery when no all committed RAFT commands
applied to the state machine.
*/
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-20393")
public class ItRaftCommandLeftInLogUntilRestartTest extends
ClusterPerClassIntegrationTest {
private final Object[][] dataSet = {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItSchemaChangeKvViewTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItSchemaChangeKvViewTest.java
index deb86eaadf..1bdde2d6b9 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItSchemaChangeKvViewTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItSchemaChangeKvViewTest.java
@@ -28,6 +28,7 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.internal.schema.SchemaMismatchException;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -124,6 +125,7 @@ class ItSchemaChangeKvViewTest extends
AbstractSchemaChangeTest {
* Check rename column from table schema.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18733") // get
hangs on registry.waitLatestSchema() after column update.
public void testRenameColumn() throws Exception {
List<Ignite> grid = startGrid();
@@ -256,6 +258,7 @@ class ItSchemaChangeKvViewTest extends
AbstractSchemaChangeTest {
* Check merge table schema changes.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18733") // get
hangs on registry.waitLatestSchema() after column update.
public void testMergeChangesColumnDefault() throws Exception {
List<Ignite> grid = startGrid();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItSchemaChangeTableViewTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItSchemaChangeTableViewTest.java
index 9906221940..075dc7a109 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItSchemaChangeTableViewTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItSchemaChangeTableViewTest.java
@@ -113,6 +113,7 @@ class ItSchemaChangeTableViewTest extends
AbstractSchemaChangeTest {
* Check column renaming.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18733") // get
hangs on registry.waitLatestSchema() after column update.
void testRenameColumn() throws Exception {
List<Ignite> grid = startGrid();
@@ -266,6 +267,7 @@ class ItSchemaChangeTableViewTest extends
AbstractSchemaChangeTest {
* Check merge column default value changes.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-18733") // get
hangs on registry.waitLatestSchema() after column update.
public void testMergeChangesColumnDefault() throws Exception {
List<Ignite> grid = startGrid();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java
index 34a68ef9e1..3e9d62b10d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaSyncAndReplicationTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.test.WatchListenerInhibitor;
import org.apache.ignite.internal.testframework.log4j2.LogInspector;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -70,6 +71,7 @@ class ItSchemaSyncAndReplicationTest extends
ClusterPerTestIntegrationTest {
* cannot execute without waiting for schemas). This method tests this
scenario.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-20394")
void laggingSchemasPreventPartitionDataReplication() throws Exception {
createTestTableWith3Replicas();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 1cca1475d6..4baf77cf00 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -46,7 +46,8 @@ import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
-import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
@@ -134,7 +135,7 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
List<BinaryRow> scannedRows = new ArrayList<>();
- PrimaryReplica recipient = getLeaderRecipient(PART_ID, tx1);
+ PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx1);
Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>(
tx1,
@@ -410,7 +411,7 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
InternalTransaction tx = startTxWithEnlistedPartition(PART_ID, false);
- PrimaryReplica recipient = getLeaderRecipient(PART_ID, tx);
+ PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx);
Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>(
tx,
@@ -466,7 +467,7 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
int soredIndexId = getSortedIndexId();
InternalTransaction tx = startTxWithEnlistedPartition(PART_ID, false);
- PrimaryReplica recipient = getLeaderRecipient(PART_ID, tx);
+ PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx);
Publisher<BinaryRow> publisher = new RollbackTxOnErrorPublisher<>(
tx,
@@ -549,7 +550,7 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
InternalTransaction tx = startTxWithEnlistedPartition(PART_ID,
false);
try {
- PrimaryReplica recipient = getLeaderRecipient(PART_ID, tx);
+ PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx);
Publisher<BinaryRow> publisher = new
RollbackTxOnErrorPublisher<>(
tx,
@@ -652,7 +653,7 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
//noinspection DataFlowIssue
publisher = internalTable.scan(PART_ID, tx.readTimestamp(),
node0, sortedIndexId, null, null, 0, null);
} else {
- PrimaryReplica recipient = getLeaderRecipient(PART_ID, tx);
+ PrimaryReplica recipient = getPrimaryReplica(PART_ID, tx);
publisher = new RollbackTxOnErrorPublisher<>(
tx,
@@ -669,10 +670,10 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
}
}
- private PrimaryReplica getLeaderRecipient(int partId, InternalTransaction
tx) {
- IgniteBiTuple<ClusterNode, Long> leaderWithTerm =
tx.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
+ private PrimaryReplica getPrimaryReplica(int partId, InternalTransaction
tx) {
+ IgniteBiTuple<ClusterNode, Long> primaryReplica =
tx.enlistedNodeAndTerm(new TablePartitionId(table.tableId(), partId));
- return new PrimaryReplica(leaderWithTerm.get1(),
leaderWithTerm.get2());
+ return new PrimaryReplica(primaryReplica.get1(),
primaryReplica.get2());
}
/**
@@ -876,11 +877,20 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
InternalTable table = ((TableImpl)
ignite.tables().table(TABLE_NAME)).internalTable();
TablePartitionId tblPartId = new TablePartitionId(table.tableId(),
partId);
- RaftGroupService raftSvc = table.partitionRaftGroupService(partId);
- long term =
IgniteTestUtils.await(raftSvc.refreshAndGetLeaderWithTerm()).term();
+
+ PlacementDriver placementDriver = ((IgniteImpl)
ignite).placementDriver();
+ ReplicaMeta primaryReplica = IgniteTestUtils.await(
+ placementDriver.awaitPrimaryReplica(tblPartId, ((IgniteImpl)
ignite).clock().now()));
+
+ tx.enlist(
+ tblPartId,
+ new IgniteBiTuple<>(
+
table.getClusterNodeResolver().apply(primaryReplica.getLeaseholder()),
+ primaryReplica.getStartTime().longValue()
+ )
+ );
tx.assignCommitPartition(tblPartId);
- tx.enlist(tblPartId, new
IgniteBiTuple<>(table.leaderAssignment(partId), term));
return tx;
}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index b4e8ed6a60..37c9062aca 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -98,6 +98,7 @@ import
org.apache.ignite.internal.metrics.sources.JvmMetricSource;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
import
org.apache.ignite.internal.network.configuration.NetworkConfigurationSchema;
import org.apache.ignite.internal.network.recovery.VaultStateIds;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
import org.apache.ignite.internal.raft.Loza;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
@@ -468,7 +469,6 @@ public class IgniteImpl implements Ignite {
placementDriverMgr = new PlacementDriverManager(
name,
metaStorageMgr,
- vaultMgr,
MetastorageGroupId.INSTANCE,
clusterSvc,
cmgMgr::metaStorageNodes,
@@ -568,7 +568,8 @@ public class IgniteImpl implements Ignite {
distributionZoneManager,
schemaSyncService,
catalogManager,
- observableTimestampTracker
+ observableTimestampTracker,
+ placementDriverMgr.placementDriver()
);
indexManager = new IndexManager(tablesConfig, schemaManager,
distributedTblMgr);
@@ -1121,4 +1122,14 @@ public class IgniteImpl implements Ignite {
public TxManager txManager() {
return txManager;
}
+
+ /**
+ * Returns the node's placement driver service.
+ *
+ * @return Placement driver service
+ */
+ @TestOnly
+ public PlacementDriver placementDriver() {
+ return placementDriverMgr.placementDriver();
+ }
}
diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index bf73afb346..55f708a199 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -84,6 +84,7 @@ dependencies {
testImplementation(testFixtures(project(':ignite-configuration')))
testImplementation(testFixtures(project(':ignite-storage-api')))
testImplementation(testFixtures(project(':ignite-distribution-zones')))
+ testImplementation(testFixtures(project(':ignite-placement-driver-api')))
testImplementation libs.mockito.junit
testImplementation libs.mockito.core
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 961bacd999..8a3d94be4c 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -67,6 +67,7 @@ import
org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.metrics.configuration.MetricConfiguration;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftManager;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
@@ -575,7 +576,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
private TableManager createTableManager() {
TableManager tableManager = new TableManager(
- "",
+ NODE_NAME,
revisionUpdater,
tblsCfg,
dstZnsCfg,
@@ -601,7 +602,8 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
distributionZoneManager,
schemaSyncService,
catalogManager,
- new HybridTimestampTracker()
+ new HybridTimestampTracker(),
+ new TestPlacementDriver(NODE_NAME)
);
tableManager.start();
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 36195e7208..11a214ff6a 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -34,6 +34,7 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -162,7 +163,8 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest {
mock(TxStateTableStorage.class),
replicaSvc,
mock(HybridClock.class),
- timestampTracker
+ timestampTracker,
+ mock(PlacementDriver.class)
);
this.dataAmount = dataAmount;
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 0ca8cfdddf..d93dbdea6c 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -44,6 +44,7 @@ dependencies {
implementation project(':ignite-vault')
implementation project(':ignite-cluster-management')
implementation project(':ignite-catalog')
+ implementation project(':ignite-placement-driver-api')
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
implementation libs.auto.service.annotations
@@ -58,12 +59,14 @@ dependencies {
testImplementation project(':ignite-schema')
testImplementation project(':ignite-page-memory')
testImplementation project(':ignite-storage-rocksdb')
+ testImplementation project(':ignite-placement-driver-api')
testImplementation(testFixtures(project(':ignite-core')))
testImplementation(testFixtures(project(':ignite-schema')))
testImplementation(testFixtures(project(':ignite-configuration')))
testImplementation(testFixtures(project(':ignite-transactions')))
testImplementation(testFixtures(project(':ignite-storage-api')))
testImplementation(testFixtures(project(':ignite-metastorage')))
+ testImplementation(testFixtures(project(':ignite-placement-driver-api')))
testImplementation libs.mockito.core
testImplementation libs.mockito.junit
testImplementation libs.hamcrest.core
@@ -89,6 +92,7 @@ dependencies {
testFixturesImplementation(testFixtures(project(':ignite-transactions')))
testFixturesImplementation(testFixtures(project(':ignite-cluster-management')))
testFixturesImplementation(testFixtures(project(':ignite-network')))
+
testFixturesImplementation(testFixtures(project(':ignite-placement-driver-api')))
testFixturesImplementation libs.jetbrains.annotations
testFixturesImplementation libs.fastutil.core
testFixturesImplementation libs.mockito.core
@@ -111,6 +115,7 @@ dependencies {
integrationTestImplementation(testFixtures(project(':ignite-storage-api')))
integrationTestImplementation(testFixtures(project(':ignite-transactions')))
integrationTestImplementation(testFixtures(project(':ignite-cluster-management')))
+
integrationTestImplementation(testFixtures(project(':ignite-placement-driver-api')))
integrationTestImplementation libs.fastutil.core
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index 69f4accb17..6376e3bbbc 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -42,8 +42,6 @@ import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -86,8 +84,6 @@ public abstract class ItAbstractInternalTableScanTest extends
IgniteAbstractTest
/** Internal table to test. */
DummyInternalTableImpl internalTbl;
- final HybridClock clock = new HybridClockImpl();
-
/**
* Prepare test environment using DummyInternalTableImpl and Mocked
storage.
*/
@@ -400,7 +396,8 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
when(cursor.hasNext()).thenAnswer(hnInvocation ->
cursorTouchCnt.get() < submittedItems.size());
lenient().when(cursor.next()).thenAnswer(ninvocation ->
- ReadResult.createFromCommitted(new RowId(0),
submittedItems.get(cursorTouchCnt.getAndIncrement()), clock.now()));
+ ReadResult.createFromCommitted(new RowId(0),
submittedItems.get(cursorTouchCnt.getAndIncrement()),
+ internalTbl.CLOCK.now()));
return cursor;
});
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
index 0e2a5bf2c3..62ee173cbe 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
@@ -35,7 +35,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
public class ItInternalTableReadOnlyScanTest extends
ItAbstractInternalTableScanTest {
@Override
protected Publisher<BinaryRow> scan(int part, InternalTransaction tx) {
- return internalTbl.scan(part, clock.now(), mock(ClusterNode.class));
+ return internalTbl.scan(part, internalTbl.CLOCK.now(),
mock(ClusterNode.class));
}
// TODO: IGNITE-17666 Use super test as is.
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index d73e364c76..72757f1ed7 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -47,6 +47,7 @@ import
org.apache.ignite.internal.distributionzones.configuration.DistributionZo
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.raft.service.ItAbstractListenerSnapshotTest;
@@ -115,6 +116,8 @@ import org.junit.jupiter.api.extension.ExtendWith;
*/
@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<PartitionListener> {
+ private static final String NODE_NAME = "node1";
+
/** Factory to create RAFT command messages. */
private final TableMessagesFactory msgFactory = new TableMessagesFactory();
@@ -164,7 +167,7 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
private final ReplicaService replicaService = mock(ReplicaService.class);
private final Function<String, ClusterNode> consistentIdToNode = addr
- -> new ClusterNodeImpl("node1", "node1", new NetworkAddress(addr,
3333));
+ -> new ClusterNodeImpl(NODE_NAME, NODE_NAME, new
NetworkAddress(addr, 3333));
private final HybridClock hybridClock = new HybridClockImpl();
@@ -225,7 +228,8 @@ public class ItTablePersistenceTest extends
ItAbstractListenerSnapshotTest<Parti
new TestTxStateTableStorage(),
replicaService,
hybridClock,
- new HybridTimestampTracker()
+ new HybridTimestampTracker(),
+ new TestPlacementDriver(NODE_NAME)
);
closeables.add(() -> table.close());
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 61b2b0ba71..020b1cf550 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -54,9 +54,8 @@ import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Command;
-import org.apache.ignite.internal.raft.Peer;
-import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
@@ -93,9 +92,7 @@ import org.apache.ignite.internal.tx.test.TestTransactionIds;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.network.ClusterNodeImpl;
import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
@@ -134,7 +131,7 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
ClusterService clusterService = Mockito.mock(ClusterService.class,
RETURNS_DEEP_STUBS);
when(clusterService.topologyService().localMember().address()).thenReturn(DummyInternalTableImpl.ADDR);
- ClusterNode clusterNode = new
ClusterNodeImpl(UUID.randomUUID().toString(), "node", new NetworkAddress("",
0));
+ ClusterNode clusterNode = DummyInternalTableImpl.LOCAL_NODE;
ReplicaService replicaService = Mockito.mock(ReplicaService.class,
RETURNS_DEEP_STUBS);
@@ -165,12 +162,7 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
int tblId = 1;
for (int i = 0; i < PARTS; ++i) {
- TablePartitionId groupId = new TablePartitionId(tblId, i);
-
RaftGroupService r = Mockito.mock(RaftGroupService.class);
- when(r.leader()).thenReturn(Mockito.mock(Peer.class));
- when(r.groupId()).thenReturn(groupId);
-
when(r.refreshAndGetLeaderWithTerm()).thenReturn(completedFuture(new
LeaderWithTerm(new Peer(clusterNode.name()), 0L)));
final int part = i;
doAnswer(invocation -> {
@@ -244,7 +236,8 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
new TestTxStateTableStorage(),
replicaService,
Mockito.mock(HybridClock.class),
- new HybridTimestampTracker()
+ new HybridTimestampTracker(),
+ new TestPlacementDriver(clusterNode.name())
);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 0a48a5cd14..4980843145 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
+import java.util.function.Function;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -474,4 +475,6 @@ public interface InternalTable extends ManuallyCloseable {
* @param partitionId Partition ID.
*/
@Nullable PendingComparableValuesTracker<Long, Void>
getPartitionStorageIndexTracker(int partitionId);
+
+ Function<String, ClusterNode> getClusterNodeResolver();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index e1d03f8ca9..b63129b35f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -113,6 +113,7 @@ import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Conditions;
import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
@@ -165,7 +166,7 @@ import
org.apache.ignite.internal.table.distributed.raft.snapshot.PartitionSnaps
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.SnapshotAwarePartitionDataStorage;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
-import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
+import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.table.distributed.schema.NonHistoricSchemas;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
@@ -272,8 +273,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
/** Data storage manager. */
private final DataStorageManager dataStorageMgr;
- /** Placement driver. */
- private final PlacementDriver placementDriver;
+ /** Transaction state resolver. */
+ private final TransactionStateResolver transactionStateResolver;
/** Here a table future stores during creation (until the table can be
provided to client). */
private final Map<Integer, CompletableFuture<Table>> tableCreateFuts = new
ConcurrentHashMap<>();
@@ -391,6 +392,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
private final HybridTimestampTracker observableTimestampTracker;
+ /** Placement driver. */
+ private final PlacementDriver placementDriver;
+
/**
* Creates a new table manager.
*
@@ -411,6 +415,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
* volatile tables.
* @param raftGroupServiceFactory Factory that is used for creation of
raft group services for replication groups.
* @param vaultManager Vault manager.
+ * @param placementDriver Placement driver.
*/
public TableManager(
String nodeName,
@@ -439,7 +444,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
DistributionZoneManager distributionZoneManager,
SchemaSyncService schemaSyncService,
CatalogService catalogService,
- HybridTimestampTracker observableTimestampTracker
+ HybridTimestampTracker observableTimestampTracker,
+ PlacementDriver placementDriver
) {
this.tablesCfg = tablesCfg;
this.zonesConfig = zonesConfig;
@@ -465,10 +471,11 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
this.schemaSyncService = schemaSyncService;
this.catalogService = catalogService;
this.observableTimestampTracker = observableTimestampTracker;
+ this.placementDriver = placementDriver;
clusterNodeResolver = topologyService::getByConsistentId;
- placementDriver = new PlacementDriver(replicaSvc, clusterNodeResolver);
+ transactionStateResolver = new TransactionStateResolver(replicaSvc,
clusterNodeResolver);
tablesByIdVv = new IncrementalVersionedValue<>(registry, HashMap::new);
@@ -793,7 +800,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
TablePartitionId replicaGrpId = new TablePartitionId(tableId,
partId);
- placementDriver.updateAssignment(replicaGrpId,
newConfiguration.peers().stream().map(Peer::consistentId)
+ transactionStateResolver.updateAssignment(replicaGrpId,
newConfiguration.peers().stream().map(Peer::consistentId)
.collect(toList()));
var safeTimeTracker = new
PendingComparableValuesTracker<HybridTimestamp, Void>(
@@ -1041,7 +1048,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
clock,
safeTimeTracker,
txStatePartitionStorage,
- placementDriver,
+ transactionStateResolver,
partitionUpdateHandlers.storageUpdateHandler,
new NonHistoricSchemas(schemaManager),
localNode(),
@@ -1049,7 +1056,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
indexBuilder,
schemaSyncService,
catalogService,
- tablesCfg
+ tablesCfg,
+ placementDriver
);
}
@@ -1296,10 +1304,12 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
int partitions = zoneDescriptor.partitions();
- InternalTableImpl internalTable = new InternalTableImpl(tableName,
tableId,
+ InternalTableImpl internalTable = new InternalTableImpl(
+ tableName,
+ tableId,
new Int2ObjectOpenHashMap<>(partitions),
partitions, clusterNodeResolver, txManager, tableStorage,
- txStateStorage, replicaSvc, clock, observableTimestampTracker);
+ txStateStorage, replicaSvc, clock, observableTimestampTracker,
placementDriver);
var table = new TableImpl(internalTable, lockMgr);
@@ -2261,7 +2271,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
Set<Assignment> stableAssignments =
ByteUtils.fromBytes(stableAssignmentsEntry.value());
- placementDriver.updateAssignment(
+ transactionStateResolver.updateAssignment(
replicaGrpId,
stableAssignments.stream().filter(Assignment::isPeer).map(Assignment::consistentId).collect(toList())
);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index edf36d9d9c..61022405c2 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -230,12 +230,18 @@ public class PartitionListener implements
RaftGroupListener {
return;
}
- storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(),
cmd.tablePartitionId().asTablePartitionId(), cmd.row(),
- !cmd.full(),
- () -> storage.lastApplied(commandIndex, commandTerm),
- cmd.full() ? cmd.safeTime() : null
- );
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Proper
storage/raft index handling is required.
+ synchronized (safeTime) {
+ if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
+ storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(),
cmd.tablePartitionId().asTablePartitionId(), cmd.row(),
+ !cmd.full(),
+ () -> storage.lastApplied(commandIndex, commandTerm),
+ cmd.full() ? cmd.safeTime() : null
+ );
+ }
+ updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
+ }
replicaTouch(cmd.txId(), cmd.txCoordinatorId(), cmd.full() ?
cmd.safeTime() : null, cmd.full());
}
@@ -252,11 +258,18 @@ public class PartitionListener implements
RaftGroupListener {
return;
}
- storageUpdateHandler.handleUpdateAll(cmd.txId(), cmd.rowsToUpdate(),
cmd.tablePartitionId().asTablePartitionId(),
- !cmd.full(),
- () -> storage.lastApplied(commandIndex, commandTerm),
- cmd.full() ? cmd.safeTime() : null
- );
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Proper
storage/raft index handling is required.
+ synchronized (safeTime) {
+ if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
+ storageUpdateHandler.handleUpdateAll(cmd.txId(),
cmd.rowsToUpdate(), cmd.tablePartitionId().asTablePartitionId(),
+ !cmd.full(),
+ () -> storage.lastApplied(commandIndex, commandTerm),
+ cmd.full() ? cmd.safeTime() : null
+ );
+
+ updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
+ }
+ }
replicaTouch(cmd.txId(), cmd.txCoordinatorId(), cmd.full() ?
cmd.safeTime() : null, cmd.full());
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteReplicaRequest.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteReplicaRequest.java
index abc0e3250a..404869b3c3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteReplicaRequest.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ReadWriteReplicaRequest.java
@@ -30,7 +30,7 @@ public interface ReadWriteReplicaRequest extends
PrimaryReplicaRequest, Timestam
/**
* Gets a raft term.
- * TODO: A temp solution until lease-based engine will be implemented
(IGNITE-17256, IGNITE-15083)
+ * TODO: A temp solution until lease-based engine will be implemented
(IGNITE-17256, IGNITE-15083, IGNITE-20377)
*
* @return Raft term.
*/
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index ade96e0d68..2ac0c90e26 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -71,8 +71,8 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.raft.Command;
-import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
@@ -151,6 +151,7 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
@@ -212,8 +213,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
/** Safe time. */
private final PendingComparableValuesTracker<HybridTimestamp, Void>
safeTime;
- /** Placement Driver. */
- private final PlacementDriver placementDriver;
+ /** Transaction state resolver. */
+ private final TransactionStateResolver transactionStateResolver;
/** Runs async scan tasks for effective tail recursion execution (avoid
deep recursive calls). */
private final Executor scanRequestExecutor;
@@ -259,6 +260,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
private final int pkLength;
+ /** Placement driver. */
+ private final PlacementDriver placementDriver;
+
/**
* The constructor.
*
@@ -274,12 +278,13 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @param hybridClock Hybrid clock.
* @param safeTime Safe time clock.
* @param txStateStorage Transaction state storage.
- * @param placementDriver Placement driver.
+ * @param transactionStateResolver Transaction state resolver.
* @param storageUpdateHandler Handler that processes updates writing them
to storage.
* @param localNode Instance of the local node.
* @param mvTableStorage Table storage.
* @param indexBuilder Index builder.
* @param tablesConfig Tables configuration.
+ * @param placementDriver Placement driver.
*/
public PartitionReplicaListener(
MvPartitionStorage mvDataStorage,
@@ -295,7 +300,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
HybridClock hybridClock,
PendingComparableValuesTracker<HybridTimestamp, Void> safeTime,
TxStateStorage txStateStorage,
- PlacementDriver placementDriver,
+ TransactionStateResolver transactionStateResolver,
StorageUpdateHandler storageUpdateHandler,
Schemas schemas,
ClusterNode localNode,
@@ -303,7 +308,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
IndexBuilder indexBuilder,
SchemaSyncService schemaSyncService,
CatalogService catalogService,
- TablesConfiguration tablesConfig
+ TablesConfiguration tablesConfig,
+ PlacementDriver placementDriver
) {
this.mvDataStorage = mvDataStorage;
this.raftClient = raftClient;
@@ -316,7 +322,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
this.hybridClock = hybridClock;
this.safeTime = safeTime;
this.txStateStorage = txStateStorage;
- this.placementDriver = placementDriver;
+ this.transactionStateResolver = transactionStateResolver;
this.storageUpdateHandler = storageUpdateHandler;
this.localNode = localNode;
this.mvTableStorage = mvTableStorage;
@@ -324,6 +330,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
this.schemaSyncService = schemaSyncService;
this.catalogService = catalogService;
this.tablesConfig = tablesConfig;
+ this.placementDriver = placementDriver;
this.replicationGroupId = new TablePartitionId(tableId, partId);
@@ -421,16 +428,14 @@ public class PartitionReplicaListener implements
ReplicaListener {
* @return Result future.
*/
private CompletableFuture<LeaderOrTxState>
processTxStateReplicaRequest(TxStateReplicaRequest request) {
- return raftClient.refreshAndGetLeaderWithTerm()
- .thenCompose(replicaAndTerm -> {
- Peer leader = replicaAndTerm.leader();
-
- if (isLocalPeer(leader)) {
+ return placementDriver.getPrimaryReplica(replicationGroupId,
hybridClock.now())
+ .thenCompose(primaryReplica -> {
+ if (isLocalPeer(primaryReplica.getLeaseholder())) {
CompletableFuture<TxMeta> txStateFut =
getTxStateConcurrently(request);
return txStateFut.thenApply(txMeta -> new
LeaderOrTxState(null, txMeta));
} else {
- return completedFuture(new
LeaderOrTxState(leader.consistentId(), null));
+ return completedFuture(new
LeaderOrTxState(primaryReplica.getLeaseholder(), null));
}
});
}
@@ -1899,21 +1904,30 @@ public class PartitionReplicaListener implements
ReplicaListener {
true,
null,
null);
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 tmp
+ synchronized (safeTime) {
+ updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
+ }
}
return applyCmdWithExceptionHandling(cmd).thenApply(res -> {
// Try to avoid double write if an entry is already replicated.
- if (cmd.full() && cmd.safeTime().compareTo(safeTime.current()) >
0) {
- storageUpdateHandler.handleUpdate(
- cmd.txId(),
- cmd.rowUuid(),
- cmd.tablePartitionId().asTablePartitionId(),
- cmd.row(),
- false,
- null,
- cmd.safeTime());
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 tmp
+ synchronized (safeTime) {
+ if (cmd.full() && cmd.safeTime().compareTo(safeTime.current())
> 0) {
+ storageUpdateHandler.handleUpdate(
+ cmd.txId(),
+ cmd.rowUuid(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ cmd.row(),
+ false,
+ null,
+ cmd.safeTime());
+
+ updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
+ }
}
-
return res;
});
}
@@ -1933,17 +1947,27 @@ public class PartitionReplicaListener implements
ReplicaListener {
true,
null,
null);
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 tmp
+ synchronized (safeTime) {
+ updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
+ }
}
return applyCmdWithExceptionHandling(cmd).thenApply(res -> {
- if (cmd.full() && cmd.safeTime().compareTo(safeTime.current()) >
0) {
- storageUpdateHandler.handleUpdateAll(
- cmd.txId(),
- cmd.rowsToUpdate(),
- cmd.tablePartitionId().asTablePartitionId(),
- false,
- null,
- cmd.safeTime());
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 tmp
+ synchronized (safeTime) {
+ if (cmd.full() && cmd.safeTime().compareTo(safeTime.current())
> 0) {
+ storageUpdateHandler.handleUpdateAll(
+ cmd.txId(),
+ cmd.rowsToUpdate(),
+ cmd.tablePartitionId().asTablePartitionId(),
+ false,
+ null,
+ cmd.safeTime());
+
+ updateTrackerIgnoringTrackerClosedException(safeTime,
cmd.safeTime());
+ }
}
return res;
@@ -2372,20 +2396,29 @@ public class PartitionReplicaListener implements
ReplicaListener {
expectedTerm = null;
}
- if (expectedTerm != null) {
- return raftClient.refreshAndGetLeaderWithTerm()
- .thenCompose(replicaAndTerm -> {
- long currentTerm = replicaAndTerm.term();
+ HybridTimestamp now = hybridClock.now();
- if (expectedTerm == currentTerm) {
- return completedFuture(null);
+ if (expectedTerm != null) {
+ return placementDriver.getPrimaryReplica(replicationGroupId, now)
+ .thenCompose(primaryReplica -> {
+ long currentEnlistmentConsistencyToken =
primaryReplica.getStartTime().longValue();
+
+ if
(expectedTerm.equals(currentEnlistmentConsistencyToken)) {
+ if
(primaryReplica.getExpirationTime().before(now)) {
+ // TODO:
https://issues.apache.org/jira/browse/IGNITE-20377
+ return failedFuture(
+ new
PrimaryReplicaMissException(expectedTerm, currentEnlistmentConsistencyToken));
+ } else {
+ return completedFuture(null);
+ }
} else {
- return failedFuture(new
PrimaryReplicaMissException(expectedTerm, currentTerm));
+ return failedFuture(new
PrimaryReplicaMissException(expectedTerm, currentEnlistmentConsistencyToken));
}
}
);
} else if (request instanceof ReadOnlyReplicaRequest || request
instanceof ReplicaSafeTimeSyncRequest) {
- return
raftClient.refreshAndGetLeaderWithTerm().thenApply(replicaAndTerm ->
isLocalPeer(replicaAndTerm.leader()));
+ return placementDriver.getPrimaryReplica(replicationGroupId, now)
+ .thenApply(primaryReplica -> (primaryReplica != null &&
isLocalPeer(primaryReplica.getLeaseholder())));
} else {
return completedFuture(null);
}
@@ -2473,7 +2506,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
) {
boolean readLatest = timestamp == null;
- return placementDriver.sendMetaRequest(commitGrpId,
FACTORY.txStateReplicaRequest()
+ return transactionStateResolver.sendMetaRequest(commitGrpId,
FACTORY.txStateReplicaRequest()
.groupId(commitGrpId)
.readTimestampLong((readLatest ?
HybridTimestamp.MIN_VALUE : timestamp).longValue())
.txId(txId)
@@ -2744,8 +2777,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
return replicationGroupId.tableId();
}
- private boolean isLocalPeer(Peer peer) {
- return peer.consistentId().equals(localNode.name());
+ private boolean isLocalPeer(String nodeName) {
+ return localNode.name().equals(nodeName);
}
private void inBusyLock(Runnable runnable) {
@@ -2806,4 +2839,16 @@ public class PartitionReplicaListener implements
ReplicaListener {
? null
: new TxStateMeta(txState, old.txCoordinatorId(), txState ==
COMMITED ? commitTimestamp : null));
}
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 tmp
+ private static <T extends Comparable<T>> void
updateTrackerIgnoringTrackerClosedException(
+ PendingComparableValuesTracker<T, Void> tracker,
+ T newValue
+ ) {
+ try {
+ tracker.update(newValue, null);
+ } catch (TrackerClosedException ignored) {
+ // No-op.
+ }
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
similarity index 95%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
rename to
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
index ea43c02085..288be7b281 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PlacementDriver.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TransactionStateResolver.java
@@ -32,9 +32,9 @@ import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
/**
- * Placement driver.
+ * Helper class that allows to resolve transaction state.
*/
-public class PlacementDriver {
+public class TransactionStateResolver {
/** Assignment node names per replication group. */
private final Map<ReplicationGroupId, LinkedHashSet<String>>
primaryReplicaMapping = new ConcurrentHashMap<>();
@@ -49,7 +49,7 @@ public class PlacementDriver {
*
* @param replicaService Replication service.
*/
- public PlacementDriver(ReplicaService replicaService, Function<String,
ClusterNode> clusterNodeResolver) {
+ public TransactionStateResolver(ReplicaService replicaService,
Function<String, ClusterNode> clusterNodeResolver) {
this.replicaService = replicaService;
this.clusterNodeResolver = clusterNodeResolver;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 8651f4ccbd..72a00a0e9d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -48,6 +48,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -58,8 +59,9 @@ import java.util.stream.Stream;
import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.raft.Peer;
-import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
@@ -112,6 +114,8 @@ public class InternalTableImpl implements InternalTable {
/** Number of attempts. */
private static final int ATTEMPTS_TO_ENLIST_PARTITION = 5;
+ private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 30;
+
/** Map update guarded by {@link #updatePartitionMapsMux}. */
protected volatile Int2ObjectMap<RaftGroupService>
raftGroupServiceByPartitionId;
@@ -151,6 +155,9 @@ public class InternalTableImpl implements InternalTable {
/** Observable timestamp tracker. */
private final HybridTimestampTracker observableTimestampTracker;
+ /** Placement driver. */
+ private final PlacementDriver placementDriver;
+
/** Map update guarded by {@link #updatePartitionMapsMux}. */
private volatile
Int2ObjectMap<PendingComparableValuesTracker<HybridTimestamp, Void>>
safeTimeTrackerByPartitionId = emptyMap();
@@ -169,6 +176,7 @@ public class InternalTableImpl implements InternalTable {
* @param txStateStorage Transaction state storage.
* @param replicaSvc Replica service.
* @param clock A hybrid logical clock.
+ * @param placementDriver Placement driver.
*/
public InternalTableImpl(
String tableName,
@@ -181,7 +189,8 @@ public class InternalTableImpl implements InternalTable {
TxStateTableStorage txStateStorage,
ReplicaService replicaSvc,
HybridClock clock,
- HybridTimestampTracker observableTimestampTracker
+ HybridTimestampTracker observableTimestampTracker,
+ PlacementDriver placementDriver
) {
this.tableName = tableName;
this.tableId = tableId;
@@ -195,6 +204,7 @@ public class InternalTableImpl implements InternalTable {
this.tableMessagesFactory = new TableMessagesFactory();
this.clock = clock;
this.observableTimestampTracker = observableTimestampTracker;
+ this.placementDriver = placementDriver;
}
/** {@inheritDoc} */
@@ -1171,7 +1181,7 @@ public class InternalTableImpl implements InternalTable {
}
/** {@inheritDoc} */
- // TODO: IGNITE-17256 Use a placement driver for getting a primary replica.
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19619 The method
should be removed, SQL engine should use placementDriver directly
@Override
public List<String> assignments() {
awaitLeaderInitialization();
@@ -1185,6 +1195,7 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19619 The method
should be removed, SQL engine should use placementDriver directly
public CompletableFuture<List<PrimaryReplica>> primaryReplicas() {
List<Entry<RaftGroupService>> entries = new
ArrayList<>(raftGroupServiceByPartitionId.int2ObjectEntrySet());
List<CompletableFuture<PrimaryReplica>> result = new ArrayList<>();
@@ -1192,11 +1203,12 @@ public class InternalTableImpl implements InternalTable
{
entries.sort(Comparator.comparingInt(Entry::getIntKey));
for (Entry<RaftGroupService> e : entries) {
- CompletableFuture<LeaderWithTerm> f =
e.getValue().refreshAndGetLeaderWithTerm();
+ CompletableFuture<ReplicaMeta> f =
placementDriver.awaitPrimaryReplica(e.getValue().groupId(), clock.now())
+ .orTimeout(AWAIT_PRIMARY_REPLICA_TIMEOUT,
TimeUnit.SECONDS);
- result.add(f.thenApply(lt -> {
- ClusterNode node =
clusterNodeResolver.apply(lt.leader().consistentId());
- return new PrimaryReplica(node, lt.term());
+ result.add(f.thenApply(primaryReplica -> {
+ ClusterNode node =
clusterNodeResolver.apply(primaryReplica.getLeaseholder());
+ return new PrimaryReplica(node,
primaryReplica.getStartTime().longValue());
}));
}
@@ -1373,36 +1385,30 @@ public class InternalTableImpl implements InternalTable
{
* @return The enlist future (then will a leader become known).
*/
protected CompletableFuture<IgniteBiTuple<ClusterNode, Long>> enlist(int
partId, InternalTransaction tx) {
- RaftGroupService svc = raftGroupServiceByPartitionId.get(partId);
- assert svc != null : "No raft group service for partition " + partId;
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partId);
+ tx.assignCommitPartition(tablePartitionId);
- tx.assignCommitPartition(new TablePartitionId(tableId, partId));
+ HybridTimestamp now = clock.now();
- // TODO: IGNITE-17256 Use a placement driver for getting a primary
replica.
- CompletableFuture<LeaderWithTerm> fut0 =
svc.refreshAndGetLeaderWithTerm();
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(tablePartitionId, now)
+ .orTimeout(AWAIT_PRIMARY_REPLICA_TIMEOUT, TimeUnit.SECONDS);
- // TODO asch IGNITE-15091 fixme need to map to the same leaseholder.
- // TODO asch a leader race is possible when enlisting different keys
from the same partition.
- return fut0.handle((primaryPeerAndTerm, e) -> {
+ return primaryReplicaFuture.handle((primaryReplica, e) -> {
if (e != null) {
- throw withCause(TransactionException::new,
REPLICA_UNAVAILABLE_ERR, "Failed to get the primary replica.", e);
- }
- if (primaryPeerAndTerm.leader() == null) {
- throw new TransactionException(REPLICA_UNAVAILABLE_ERR,
"Failed to get the primary replica.");
+ throw withCause(TransactionException::new,
REPLICA_UNAVAILABLE_ERR, "Failed to get the primary replica"
+ + " [tablePartitionId=" + tablePartitionId + ",
awaitTimestamp=" + now + ']', e);
}
- String consistentId = primaryPeerAndTerm.leader().consistentId();
-
- ClusterNode node = clusterNodeResolver.apply(consistentId);
+ ClusterNode node =
clusterNodeResolver.apply(primaryReplica.getLeaseholder());
if (node == null) {
throw new TransactionException(REPLICA_UNAVAILABLE_ERR,
"Failed to resolve the primary replica node [consistentId="
- + consistentId + ']');
+ + primaryReplica.getLeaseholder() + ']');
}
TablePartitionId partGroupId = new TablePartitionId(tableId,
partId);
- return tx.enlist(partGroupId, new IgniteBiTuple<>(node,
primaryPeerAndTerm.term()));
+ return tx.enlist(partGroupId, new IgniteBiTuple<>(node,
primaryReplica.getStartTime().longValue()));
});
}
@@ -1589,19 +1595,20 @@ public class InternalTableImpl implements InternalTable
{
* @return Cluster node to evalute read-only request.
*/
protected CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int
partId) {
- RaftGroupService svc = raftGroupServiceByPartitionId.get(partId);
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partId);
- return svc.refreshAndGetLeaderWithTerm().handle((res, e) -> {
- if (e != null) {
- throw withCause(TransactionException::new,
REPLICA_UNAVAILABLE_ERR, e);
- } else {
- if (res == null || res.leader() == null) {
- throw withCause(TransactionException::new,
REPLICA_UNAVAILABLE_ERR, e);
- } else {
- return
clusterNodeResolver.apply(res.leader().consistentId());
- }
- }
- });
+ return placementDriver.awaitPrimaryReplica(tablePartitionId,
clock.now())
+ .orTimeout(AWAIT_PRIMARY_REPLICA_TIMEOUT,
TimeUnit.SECONDS).handle((res, e) -> {
+ if (e != null) {
+ throw withCause(TransactionException::new,
REPLICA_UNAVAILABLE_ERR, e);
+ } else {
+ if (res == null) {
+ throw withCause(TransactionException::new,
REPLICA_UNAVAILABLE_ERR, e);
+ } else {
+ return
clusterNodeResolver.apply(res.getLeaseholder());
+ }
+ }
+ });
}
/**
@@ -1701,4 +1708,9 @@ public class InternalTableImpl implements InternalTable {
.full(full)
.build();
}
+
+ @Override
+ public Function<String, ClusterNode> getClusterNodeResolver() {
+ return clusterNodeResolver;
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
index 780e70064c..dc67894f76 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
@@ -36,7 +36,7 @@ import
org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.replicator.message.TimestampAware;
-import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
+import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.tx.LockManager;
@@ -107,14 +107,14 @@ public class TxLocalTest extends TxAbstractTest {
}).when(msgSvc).invoke(anyString(), any(), anyLong());
- PlacementDriver placementDriver = mock(PlacementDriver.class,
RETURNS_DEEP_STUBS);
+ TransactionStateResolver transactionStateResolver =
mock(TransactionStateResolver.class, RETURNS_DEEP_STUBS);
doAnswer(invocationOnMock -> {
TxStateReplicaRequest request = invocationOnMock.getArgument(1);
return CompletableFuture.completedFuture(
tables.get(request.groupId()).txStateStorage().getTxStateStorage(0).get(request.txId()));
- }).when(placementDriver).sendMetaRequest(any(), any());
+ }).when(transactionStateResolver).sendMetaRequest(any(), any());
txManager = new TxManagerImpl(replicaSvc, lockManager, localClock, new
TransactionIdGenerator(0xdeadbeef), () -> "local");
@@ -124,7 +124,7 @@ public class TxLocalTest extends TxAbstractTest {
replicaSvc,
txManager,
true,
- placementDriver,
+ transactionStateResolver,
ACCOUNTS_SCHEMA,
timestampTracker
);
@@ -135,7 +135,7 @@ public class TxLocalTest extends TxAbstractTest {
replicaSvc,
txManager,
true,
- placementDriver,
+ transactionStateResolver,
CUSTOMERS_SCHEMA,
timestampTracker
);
@@ -171,4 +171,40 @@ public class TxLocalTest extends TxAbstractTest {
protected boolean assertPartitionsSame(TableImpl table, int partId) {
return true;
}
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
+ @Override
+ public void testReadOnlyGet() {
+ // No-op
+ }
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
+ @Override
+ public void testReadOnlyScan() throws Exception {
+ // No-op
+ }
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
+ @Override
+ public void testReadOnlyGetWriteIntentResolutionUpdate() {
+ // No-op
+ }
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
+ @Override
+ public void testReadOnlyGetWriteIntentResolutionRemove() {
+ // No-op
+ }
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
+ @Override
+ public void testReadOnlyGetAll() {
+ // No-op
+ }
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-20355
+ @Override
+ public void testReadOnlyPendingWriteIntentSkippedCombined() {
+ super.testReadOnlyPendingWriteIntentSkippedCombined();
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 091ac009ca..68725ffd44 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -82,6 +82,7 @@ import
org.apache.ignite.internal.distributionzones.configuration.DistributionZo
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.dsl.Operation;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
@@ -837,7 +838,8 @@ public class TableManagerTest extends IgniteAbstractTest {
distributionZoneManager,
mock(SchemaSyncService.class),
mock(CatalogService.class),
- new HybridTimestampTracker()
+ new HybridTimestampTracker(),
+ new TestPlacementDriver(NODE_NAME)
) {
@Override
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index caeaa23a4b..d63a88a6fd 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -45,6 +45,7 @@ import
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -81,7 +82,7 @@ import
org.apache.ignite.internal.table.distributed.index.IndexBuilder;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import
org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
-import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
+import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
@@ -193,6 +194,8 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
TestPartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(TEST_MV_PARTITION_STORAGE);
+ ClusterNode localNode = mock(ClusterNode.class);
+
partitionReplicaListener = new PartitionReplicaListener(
TEST_MV_PARTITION_STORAGE,
mockRaftClient,
@@ -214,7 +217,7 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
CLOCK,
safeTime,
new TestTxStateStorage(),
- mock(PlacementDriver.class),
+ mock(TransactionStateResolver.class),
new StorageUpdateHandler(
PART_ID,
partitionDataStorage,
@@ -224,12 +227,13 @@ public class PartitionReplicaListenerIndexLockingTest
extends IgniteAbstractTest
new GcUpdateHandler(partitionDataStorage, safeTime,
indexUpdateHandler)
),
new DummySchemas(schemaManager),
- mock(ClusterNode.class),
+ localNode,
new TestMvTableStorage(TABLE_ID, DEFAULT_PARTITION_COUNT),
mock(IndexBuilder.class),
mock(SchemaSyncService.class, invocation ->
completedFuture(null)),
mock(CatalogService.class),
- tablesConfig
+ tablesConfig,
+ new TestPlacementDriver(localNode.name())
);
kvMarshaller = new
ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class,
Integer.class);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index c9c358027d..dd69c9e333 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -82,6 +82,7 @@ import
org.apache.ignite.internal.configuration.testframework.InjectConfiguratio
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
@@ -136,7 +137,7 @@ import
org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchem
import
org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaException;
import org.apache.ignite.internal.table.distributed.replicator.LeaderOrTxState;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
-import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
+import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.table.distributed.schema.FullTableSchema;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
@@ -262,7 +263,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
/** Another (not local) cluster node. */
private final ClusterNode anotherNode = new ClusterNodeImpl("node2",
"node2", NetworkAddress.from("127.0.0.2:127"));
- private final PlacementDriver placementDriver =
mock(PlacementDriver.class);
+ private final TransactionStateResolver transactionStateResolver =
mock(TransactionStateResolver.class);
private final PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(testMvPartitionStorage);
@@ -361,7 +362,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
HybridTimestamp txFixedTimestamp = clock.now();
- when(placementDriver.sendMetaRequest(any(),
any())).thenAnswer(invocationOnMock -> {
+ when(transactionStateResolver.sendMetaRequest(any(),
any())).thenAnswer(invocationOnMock -> {
TxMeta txMeta;
if (txState == null) {
@@ -443,7 +444,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
clock,
safeTimeClock,
txStateStorage,
- placementDriver,
+ transactionStateResolver,
new StorageUpdateHandler(
partId,
partitionDataStorage,
@@ -458,7 +459,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
mock(IndexBuilder.class),
schemaSyncService,
catalogService,
- tablesConfig
+ tablesConfig,
+ new TestPlacementDriver(localNode.name())
);
kvMarshaller = marshallerFor(schemaDescriptor);
@@ -537,6 +539,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
}
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-20365")
public void testTxStateReplicaRequestMissLeaderMiss() throws Exception {
localLeader = false;
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 92c73d09af..8e3fb9740f 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -38,6 +38,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import java.util.List;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
@@ -66,7 +67,8 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
mock(TxStateTableStorage.class),
mock(ReplicaService.class),
mock(HybridClock.class),
- new HybridTimestampTracker()
+ new HybridTimestampTracker(),
+ mock(PlacementDriver.class)
);
// Let's check the empty table.
@@ -111,7 +113,8 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
mock(TxStateTableStorage.class),
mock(ReplicaService.class),
mock(HybridClock.class),
- new HybridTimestampTracker()
+ new HybridTimestampTracker(),
+ mock(PlacementDriver.class)
);
List<BinaryRowEx> originalRows = List.of(
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 3fed2fe27d..4beff4a5d4 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -57,6 +57,8 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
@@ -92,7 +94,7 @@ import
org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
-import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
+import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
@@ -174,6 +176,8 @@ public class ItTxTestCluster {
protected final List<ClusterService> cluster = new
CopyOnWriteArrayList<>();
+ protected PlacementDriver placementDriver;
+
private ScheduledThreadPoolExecutor executor;
protected IgniteTransactions igniteTransactions;
@@ -246,6 +250,8 @@ public class ItTxTestCluster {
assertTrue(waitForTopology(node, nodes, 1000));
}
+ placementDriver = new TestPlacementDriver(cluster.get(0).nodeName());
+
LOG.info("The cluster has been started");
if (startClient) {
@@ -392,10 +398,10 @@ public class ItTxTestCluster {
var mvTableStorage = new TestMvTableStorage(tableId,
DEFAULT_PARTITION_COUNT);
var mvPartStorage = new TestMvPartitionStorage(partId);
var txStateStorage = txStateStorages.get(assignment);
- var placementDriver = new
PlacementDriver(replicaServices.get(assignment), consistentIdToNode);
+ var transactionStateResolver = new
TransactionStateResolver(replicaServices.get(assignment), consistentIdToNode);
for (int part = 0; part < assignments.size(); part++) {
- placementDriver.updateAssignment(grpIds.get(part),
assignments.get(part));
+
transactionStateResolver.updateAssignment(grpIds.get(part),
assignments.get(part));
}
int indexId = globalIndexId++;
@@ -474,7 +480,7 @@ public class ItTxTestCluster {
clocks.get(assignment),
safeTime,
txStateStorage,
- placementDriver,
+ transactionStateResolver,
storageUpdateHandler,
new
DummySchemas(schemaManager),
consistentIdToNode.apply(assignment),
@@ -482,7 +488,8 @@ public class ItTxTestCluster {
mock(IndexBuilder.class),
mock(SchemaSyncService.class,
invocation -> completedFuture(null)),
mock(CatalogService.class),
- tablesConfig
+ tablesConfig,
+ placementDriver
),
raftSvc,
storageIndexTracker
@@ -544,7 +551,8 @@ public class ItTxTestCluster {
mock(TxStateTableStorage.class),
startClient ? clientReplicaSvc :
replicaServices.get(localNodeName),
startClient ? clientClock : clocks.get(localNodeName),
- timestampTracker
+ timestampTracker,
+ placementDriver
), new DummySchemaManagerImpl(schemaDescriptor),
clientTxManager.lockManager());
}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index f4320eb6ad..fdabc84947 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -405,6 +405,7 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
}
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-20366")
public void testBatchPutConcurrently() {
Transaction tx1 = igniteTransactions.begin();
Transaction tx2 = igniteTransactions.begin();
@@ -433,6 +434,7 @@ public abstract class TxAbstractTest extends
IgniteAbstractTest {
}
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-20366")
public void testBatchReadPutConcurrently() throws InterruptedException {
InternalTransaction tx1 = (InternalTransaction)
igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction)
igniteTransactions.begin();
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 3d142e669b..3a4fe1692b 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Command;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.WriteCommand;
@@ -85,7 +86,7 @@ import
org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import
org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
-import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
+import
org.apache.ignite.internal.table.distributed.replicator.TransactionStateResolver;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
@@ -97,6 +98,7 @@ import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import
org.apache.ignite.internal.tx.storage.state.test.TestTxStateTableStorage;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import
org.apache.ignite.internal.util.PendingIndependentComparableValuesTracker;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterNodeImpl;
@@ -112,6 +114,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
public static final NetworkAddress ADDR = new NetworkAddress("127.0.0.1",
2004);
+ public static final ClusterNode LOCAL_NODE = new ClusterNodeImpl("id",
"node", ADDR);
+
public static final HybridClock CLOCK = new TestHybridClock(new
LongSupplier() {
@Override
public long getAsLong() {
@@ -168,7 +172,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
* @param txManager Transaction manager.
* @param crossTableUsage If this dummy table is going to be used in
cross-table tests, it won't mock the calls of
* ReplicaService by itself.
- * @param placementDriver Placement driver.
+ * @param transactionStateResolver Transaction state resolver.
* @param schema Schema descriptor.
* @param tracker Observable timestamp tracker.
*/
@@ -176,11 +180,11 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
ReplicaService replicaSvc,
TxManager txManager,
boolean crossTableUsage,
- PlacementDriver placementDriver,
+ TransactionStateResolver transactionStateResolver,
SchemaDescriptor schema,
HybridTimestampTracker tracker
) {
- this(replicaSvc, new TestMvPartitionStorage(0), txManager,
crossTableUsage, placementDriver, schema, tracker);
+ this(replicaSvc, new TestMvPartitionStorage(0), txManager,
crossTableUsage, transactionStateResolver, schema, tracker);
}
/**
@@ -206,7 +210,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
* @param txManager Transaction manager, if {@code null}, then default one
will be created.
* @param crossTableUsage If this dummy table is going to be used in
cross-table tests, it won't mock the calls of
* ReplicaService by itself.
- * @param placementDriver Placement driver.
+ * @param transactionStateResolver Transaction state resolver.
* @param schema Schema descriptor.
*/
public DummyInternalTableImpl(
@@ -214,7 +218,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
MvPartitionStorage mvPartStorage,
@Nullable TxManager txManager,
boolean crossTableUsage,
- PlacementDriver placementDriver,
+ TransactionStateResolver transactionStateResolver,
SchemaDescriptor schema,
HybridTimestampTracker tracker
) {
@@ -223,7 +227,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
nextTableId.getAndIncrement(),
Int2ObjectMaps.singleton(PART_ID,
mock(RaftGroupService.class)),
1,
- name -> mockClusterNode(),
+ name -> LOCAL_NODE,
txManager == null
? new TxManagerImpl(replicaSvc, new HeapLockManager(),
CLOCK, new TransactionIdGenerator(0xdeadbeef), () -> "local")
: txManager,
@@ -231,7 +235,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
new TestTxStateTableStorage(),
replicaSvc,
CLOCK,
- tracker
+ tracker,
+ new TestPlacementDriver(LOCAL_NODE.name())
);
RaftGroupService svc = raftGroupServiceByPartitionId.get(0);
@@ -314,7 +319,8 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
IndexLocker pkLocker = new HashIndexLocker(indexId, true,
this.txManager.lockManager(), row2Tuple);
- safeTime = mock(PendingComparableValuesTracker.class);
+ safeTime = new
PendingIndependentComparableValuesTracker<>(HybridTimestamp.MIN_VALUE);
+
PartitionDataStorage partitionDataStorage = new
TestPartitionDataStorage(mvPartStorage);
TableIndexStoragesSupplier indexes =
createTableIndexStoragesSupplier(Map.of(pkStorage.get().id(), pkStorage.get()));
@@ -366,20 +372,18 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
CLOCK,
safeTime,
txStateStorage().getOrCreateTxStateStorage(PART_ID),
- placementDriver,
+ transactionStateResolver,
storageUpdateHandler,
new DummySchemas(schemaManager),
- mockClusterNode(),
+ LOCAL_NODE,
mock(MvTableStorage.class),
mock(IndexBuilder.class),
mock(SchemaSyncService.class, invocation ->
completedFuture(null)),
mock(CatalogService.class),
- tablesConfig
+ tablesConfig,
+ new TestPlacementDriver(LOCAL_NODE.name())
);
-
lenient().when(safeTime.waitFor(any())).thenReturn(completedFuture(null));
- lenient().when(safeTime.current()).thenReturn(new HybridTimestamp(1,
0));
-
partitionListener = new PartitionListener(
this.txManager,
new TestPartitionDataStorage(mvPartStorage),
@@ -390,10 +394,6 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
);
}
- private static ClusterNode mockClusterNode() {
- return new ClusterNodeImpl("id", "node", new
NetworkAddress("127.0.0.1", 20000));
- }
-
/**
* Replica listener.
*
@@ -442,7 +442,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
/** {@inheritDoc} */
@Override
public CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int
partId) {
- return completedFuture(mockClusterNode());
+ return completedFuture(LOCAL_NODE);
}
/**