This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6f4458ab8c IGNITE-19326 Close partition safe time trackers (#1960)
6f4458ab8c is described below
commit 6f4458ab8c99a10ef9669be48e88379bb656bfd4
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Apr 20 19:36:01 2023 +0300
IGNITE-19326 Close partition safe time trackers (#1960)
---
.../ignite/client/fakes/FakeInternalTable.java | 11 +++
.../util/PendingComparableValuesTracker.java | 95 ++++++++++++++++-----
.../internal/util/TrackerClosedException.java | 25 ++++++
.../util/PendingComparableValuesTrackerTest.java | 17 ++++
.../ignite/internal/table/InternalTable.java | 15 ++++
.../internal/table/distributed/TableManager.java | 64 ++++++++++----
.../table/distributed/raft/PartitionListener.java | 16 +++-
.../distributed/storage/InternalTableImpl.java | 98 +++++++++++++++++-----
.../distributed/storage/InternalTableImplTest.java | 85 +++++++++++++++++++
.../table/impl/DummyInternalTableImpl.java | 4 +-
10 files changed, 370 insertions(+), 60 deletions(-)
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index ba232b6580..393ce89cfb 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -39,6 +39,7 @@ import
org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
@@ -466,4 +467,14 @@ public class FakeInternalTable implements InternalTable {
dataAccessListener.accept(operation, arg);
}
}
+
+ @Override
+ public @Nullable PendingComparableValuesTracker<HybridTimestamp>
getPartitionSafeTimeTracker(int partitionId) {
+ return null;
+ }
+
+ @Override
+ public @Nullable PendingComparableValuesTracker<Long>
getPartitionStorageIndexTracker(int partitionId) {
+ return null;
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
index 545e8d0d09..8e4fa4a55e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/PendingComparableValuesTracker.java
@@ -18,23 +18,28 @@
package org.apache.ignite.internal.util;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.ignite.internal.close.ManuallyCloseable;
/**
* Tracker that stores comparable value internally, this value can grow when
{@link #update(Comparable)} method is called. The tracker gives
* ability to wait for certain value, see {@link #waitFor(Comparable)}.
*/
-public class PendingComparableValuesTracker<T extends Comparable<T>> {
+public class PendingComparableValuesTracker<T extends Comparable<T>>
implements ManuallyCloseable {
private static final VarHandle CURRENT;
+ private static final VarHandle CLOSE_GUARD;
+
static {
try {
CURRENT =
MethodHandles.lookup().findVarHandle(PendingComparableValuesTracker.class,
"current", Comparable.class);
+ CLOSE_GUARD =
MethodHandles.lookup().findVarHandle(PendingComparableValuesTracker.class,
"closeGuard", boolean.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
@@ -46,6 +51,13 @@ public class PendingComparableValuesTracker<T extends
Comparable<T>> {
/** Current value. */
private volatile T current;
+ /** Prevents double closing. */
+ @SuppressWarnings("unused")
+ private volatile boolean closeGuard;
+
+ /** Busy lock to close synchronously. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
/**
* Constructor with initial value.
*
@@ -60,23 +72,32 @@ public class PendingComparableValuesTracker<T extends
Comparable<T>> {
* that had been created for corresponding values that are lower than the
given one.
*
* @param newValue New value.
+ * @throws TrackerClosedException if the tracker is closed.
*/
public void update(T newValue) {
while (true) {
- T current = this.current;
-
- if (newValue.compareTo(current) <= 0) {
- break;
+ if (!busyLock.enterBusy()) {
+ throw new TrackerClosedException();
}
- if (CURRENT.compareAndSet(this, current, newValue)) {
- ConcurrentNavigableMap<T, CompletableFuture<Void>>
smallerFutures = valueFutures.headMap(newValue, true);
+ try {
+ T current = this.current;
+
+ if (newValue.compareTo(current) <= 0) {
+ break;
+ }
+
+ if (CURRENT.compareAndSet(this, current, newValue)) {
+ ConcurrentNavigableMap<T, CompletableFuture<Void>>
smallerFutures = valueFutures.headMap(newValue, true);
- smallerFutures.forEach((k, f) -> f.complete(null));
+ smallerFutures.forEach((k, f) -> f.complete(null));
- smallerFutures.clear();
+ smallerFutures.clear();
- break;
+ break;
+ }
+ } finally {
+ busyLock.leaveBusy();
}
}
}
@@ -85,31 +106,63 @@ public class PendingComparableValuesTracker<T extends
Comparable<T>> {
* Provides the future that is completed when this tracker's internal
value reaches given one. If the internal value is greater or equal
* then the given one, returns completed future.
*
+ * <p>When the tracker is closed, the future will complete with an {@link
TrackerClosedException}.
+ *
* @param valueToWait Value to wait.
- * @return Future.
*/
public CompletableFuture<Void> waitFor(T valueToWait) {
- if (current.compareTo(valueToWait) >= 0) {
- return completedFuture(null);
+ if (!busyLock.enterBusy()) {
+ return failedFuture(new TrackerClosedException());
}
- CompletableFuture<Void> future =
valueFutures.computeIfAbsent(valueToWait, k -> new CompletableFuture<>());
+ try {
+ if (current.compareTo(valueToWait) >= 0) {
+ return completedFuture(null);
+ }
+
+ CompletableFuture<Void> future =
valueFutures.computeIfAbsent(valueToWait, k -> new CompletableFuture<>());
- if (current.compareTo(valueToWait) >= 0) {
- future.complete(null);
+ if (current.compareTo(valueToWait) >= 0) {
+ future.complete(null);
- valueFutures.remove(valueToWait);
- }
+ valueFutures.remove(valueToWait);
+ }
- return future;
+ return future;
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/**
* Returns current internal value.
*
- * @return Current value.
+ * @throws TrackerClosedException if the tracker is closed.
*/
public T current() {
- return current;
+ if (!busyLock.enterBusy()) {
+ throw new TrackerClosedException();
+ }
+
+ try {
+ return current;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (!CLOSE_GUARD.compareAndSet(this, false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ TrackerClosedException trackerClosedException = new
TrackerClosedException();
+
+ valueFutures.values().forEach(future ->
future.completeExceptionally(trackerClosedException));
+
+ valueFutures.clear();
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/TrackerClosedException.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/TrackerClosedException.java
new file mode 100644
index 0000000000..2ab0b0e2d0
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/TrackerClosedException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+/**
+ * Exception that will be thrown when the {@link
PendingComparableValuesTracker} is closed.
+ */
+public class TrackerClosedException extends RuntimeException {
+ private static final long serialVersionUID = -3685913884384983930L;
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
index d30aeeb1ef..7dc8952746 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/PendingComparableValuesTrackerTest.java
@@ -18,9 +18,11 @@
package org.apache.ignite.internal.util;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreaded;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.ArrayList;
import java.util.Collections;
@@ -151,4 +153,19 @@ public class PendingComparableValuesTrackerTest {
assertThat(writerFuture, willCompleteSuccessfully());
assertThat(readerFuture, willCompleteSuccessfully());
}
+
+ @Test
+ void testClose() {
+ var tracker = new PendingComparableValuesTracker<>(1);
+
+ CompletableFuture<Void> future0 = tracker.waitFor(2);
+
+ tracker.close();
+
+ assertThrows(TrackerClosedException.class, tracker::current);
+ assertThrows(TrackerClosedException.class, () -> tracker.update(2));
+
+ assertThat(future0, willThrowFast(TrackerClosedException.class));
+ assertThat(tracker.waitFor(2),
willThrowFast(TrackerClosedException.class));
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 8b97f5d05a..ffd324d4dd 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -34,6 +34,7 @@ import
org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.tx.TransactionException;
@@ -447,4 +448,18 @@ public interface InternalTable extends ManuallyCloseable {
*/
@Override
void close();
+
+ /**
+ * Returns the partition safe time tracker, {@code null} means not added.
+ *
+ * @param partitionId Partition ID.
+ */
+ @Nullable PendingComparableValuesTracker<HybridTimestamp>
getPartitionSafeTimeTracker(int partitionId);
+
+ /**
+ * Returns the partition storage index tracker, {@code null} means not
added.
+ *
+ * @param partitionId Partition ID.
+ */
+ @Nullable PendingComparableValuesTracker<Long>
getPartitionStorageIndexTracker(int partitionId);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 8ea8ea8c3d..e5480730c3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -725,8 +725,10 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
placementDriver.updateAssignment(replicaGrpId,
newConfiguration.peers().stream().map(Peer::consistentId).collect(toList()));
- PendingComparableValuesTracker<HybridTimestamp> safeTime = new
PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
- PendingComparableValuesTracker<Long> storageIndexTracker = new
PendingComparableValuesTracker<>(0L);
+ var safeTimeTracker = new PendingComparableValuesTracker<>(new
HybridTimestamp(1, 0));
+ var storageIndexTracker = new
PendingComparableValuesTracker<>(0L);
+
+ ((InternalTableImpl)
internalTbl).updatePartitionTrackers(partId, safeTimeTracker,
storageIndexTracker);
CompletableFuture<PartitionStorages> partitionStoragesFut =
getOrCreatePartitionStorages(table, partId);
@@ -814,7 +816,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
partitionDataStorage,
storageUpdateHandler,
txStatePartitionStorage,
- safeTime,
+ safeTimeTracker,
storageIndexTracker
),
new
RebalanceRaftGroupEventsListener(
@@ -878,7 +880,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
new Lazy<>(() ->
table.indexStorageAdapters(partId).get().get(table.pkId())),
() ->
table.indexStorageAdapters(partId).get(),
clock,
- safeTime,
+ safeTimeTracker,
txStateStorage,
placementDriver,
storageUpdateHandler,
@@ -1033,7 +1035,11 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
AtomicBoolean nodeStoppingEx = new AtomicBoolean();
- for (int p = 0; p < table.internalTable().partitions(); p++) {
+ InternalTable internalTable = table.internalTable();
+
+ for (int p = 0; p < internalTable.partitions(); p++) {
+ int partitionId = p;
+
TablePartitionId replicationGroupId = new
TablePartitionId(table.tableId(), p);
stopping.add(() -> {
@@ -1054,6 +1060,14 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
CompletableFuture<Void> removeFromGcFuture =
mvGc.removeStorage(replicationGroupId);
+ stopping.add(() -> {
+ try {
+ closePartitionTrackers(internalTable, partitionId);
+ } catch (Throwable t) {
+ handleExceptionOnCleanUpTablesResources(t, throwable,
nodeStoppingEx);
+ }
+ });
+
stopping.add(() -> {
try {
// Should be done fairly quickly.
@@ -1068,9 +1082,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
try {
IgniteUtils.closeAllManually(
- table.internalTable().storage(),
- table.internalTable().txStateStorage(),
- table.internalTable()
+ internalTable.storage(),
+ internalTable.txStateStorage(),
+ internalTable
);
} catch (Throwable t) {
handleExceptionOnCleanUpTablesResources(t, throwable,
nodeStoppingEx);
@@ -1277,12 +1291,18 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
assert table != null : IgniteStringFormatter.format("There is
no table with the name specified [name={}, id={}]",
name, tblId);
+ InternalTable internalTable = table.internalTable();
+
+ for (int partitionId = 0; partitionId < partitions;
partitionId++) {
+ closePartitionTrackers(internalTable, partitionId);
+ }
+
// TODO: IGNITE-18703 Destroy raft log and meta
CompletableFuture<Void> destroyTableStoragesFuture =
allOf(removeStorageFromGcFutures)
.thenCompose(unused -> allOf(
- table.internalTable().storage().destroy(),
- runAsync(() ->
table.internalTable().txStateStorage().destroy(), ioExecutor))
+ internalTable.storage().destroy(),
+ runAsync(() ->
internalTable.txStateStorage().destroy(), ioExecutor))
);
CompletableFuture<?> dropSchemaRegistryFuture =
schemaManager.dropRegistry(causalityToken, table.tableId());
@@ -2019,11 +2039,13 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
.filter(assignment ->
localMember.name().equals(assignment.consistentId()))
.anyMatch(assignment ->
!stableAssignments.contains(assignment));
- var safeTime = new PendingComparableValuesTracker<>(new
HybridTimestamp(1, 0));
- PendingComparableValuesTracker<Long> storageIndexTracker = new
PendingComparableValuesTracker<>(0L);
+ var safeTimeTracker = new PendingComparableValuesTracker<>(new
HybridTimestamp(1, 0));
+ var storageIndexTracker = new PendingComparableValuesTracker<>(0L);
InternalTable internalTable = tbl.internalTable();
+ ((InternalTableImpl) internalTable).updatePartitionTrackers(partId,
safeTimeTracker, storageIndexTracker);
+
LOG.info("Received update on pending assignments. Check if new raft
group should be started"
+ " [key={}, partition={}, table={},
localMemberAddress={}]",
pendingAssignmentsEntry.key(), partId, tbl.name(),
localMember.address());
@@ -2055,7 +2077,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
partitionDataStorage,
storageUpdateHandler,
txStatePartitionStorage,
- safeTime,
+ safeTimeTracker,
storageIndexTracker
);
@@ -2104,7 +2126,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
new Lazy<>(() ->
tbl.indexStorageAdapters(partId).get().get(tbl.pkId())),
() ->
tbl.indexStorageAdapters(partId).get(),
clock,
- safeTime,
+ safeTimeTracker,
txStatePartitionStorage,
placementDriver,
storageUpdateHandler,
@@ -2345,6 +2367,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
.thenCombine(mvGc.removeStorage(tablePartitionId),
(tables, unused) -> {
InternalTable internalTable =
tables.get(tableId).internalTable();
+ closePartitionTrackers(internalTable,
partitionId);
+
return allOf(
internalTable.storage().destroyPartition(partitionId),
runAsync(() ->
internalTable.txStateStorage().destroyTxStateStorage(partitionId), ioExecutor)
@@ -2389,4 +2413,16 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
return indexIds;
}
+
+ private static void closePartitionTrackers(InternalTable internalTable,
int partitionId) {
+ closeTracker(internalTable.getPartitionSafeTimeTracker(partitionId));
+
+
closeTracker(internalTable.getPartitionStorageIndexTracker(partitionId));
+ }
+
+ private static void closeTracker(@Nullable
PendingComparableValuesTracker<?> tracker) {
+ if (tracker != null) {
+ tracker.close();
+ }
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 57762c063a..d9285fa634 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
import org.apache.ignite.lang.IgniteInternalException;
import org.jetbrains.annotations.TestOnly;
@@ -195,10 +196,10 @@ public class PartitionListener implements
RaftGroupListener {
assert safeTimePropagatingCommand.safeTime() != null;
-
safeTime.update(safeTimePropagatingCommand.safeTime().asHybridTimestamp());
+ updateTrackerIgnoringTrackerClosedException(safeTime,
safeTimePropagatingCommand.safeTime().asHybridTimestamp());
}
- storageIndexTracker.update(commandIndex);
+ updateTrackerIgnoringTrackerClosedException(storageIndexTracker,
commandIndex);
});
}
@@ -465,4 +466,15 @@ public class PartitionListener implements
RaftGroupListener {
);
}
}
+
+ private static <T extends Comparable<T>> void
updateTrackerIgnoringTrackerClosedException(
+ PendingComparableValuesTracker<T> tracker,
+ T newValue
+ ) {
+ try {
+ tracker.update(newValue);
+ } catch (TrackerClosedException ignored) {
+ // No-op.
+ }
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index deb430c1e8..8e1532ac3b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.storage;
+import static it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
@@ -78,6 +79,7 @@ import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.utils.PrimaryReplica;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
@@ -101,8 +103,8 @@ public class InternalTableImpl implements InternalTable {
/** Number of attempts. */
private static final int ATTEMPTS_TO_ENLIST_PARTITION = 5;
- /** Partition map. */
- protected volatile Int2ObjectMap<RaftGroupService> partitionMap;
+ /** Map update guarded by {@link #updatePartitionMapsMux}. */
+ protected volatile Int2ObjectMap<RaftGroupService>
raftGroupServiceByPartitionId;
/** Partitions. */
private final int partitions;
@@ -128,8 +130,8 @@ public class InternalTableImpl implements InternalTable {
/** Replica service. */
private final ReplicaService replicaSvc;
- /** Mutex for the partition map update. */
- private final Object updatePartMapMux = new Object();
+ /** Mutex for the partition maps update. */
+ private final Object updatePartitionMapsMux = new Object();
/** Table messages factory. */
private final TableMessagesFactory tableMessagesFactory;
@@ -137,6 +139,12 @@ public class InternalTableImpl implements InternalTable {
/** A hybrid logical clock. */
private final HybridClock clock;
+ /** Map update guarded by {@link #updatePartitionMapsMux}. */
+ private volatile
Int2ObjectMap<PendingComparableValuesTracker<HybridTimestamp>>
safeTimeTrackerByPartitionId = emptyMap();
+
+ /** Map update guarded by {@link #updatePartitionMapsMux}. */
+ private volatile Int2ObjectMap<PendingComparableValuesTracker<Long>>
storageIndexTrackerByPartitionId = emptyMap();
+
/**
* Constructor.
*
@@ -164,7 +172,7 @@ public class InternalTableImpl implements InternalTable {
) {
this.tableName = tableName;
this.tableId = tableId;
- this.partitionMap = partMap;
+ this.raftGroupServiceByPartitionId = partMap;
this.partitions = partitions;
this.clusterNodeResolver = clusterNodeResolver;
this.txManager = txManager;
@@ -524,7 +532,7 @@ public class InternalTableImpl implements InternalTable {
@NotNull ClusterNode recipientNode
) {
int partId = partitionId(keyRow);
- ReplicationGroupId partGroupId = partitionMap.get(partId).groupId();
+ ReplicationGroupId partGroupId =
raftGroupServiceByPartitionId.get(partId).groupId();
return replicaSvc.invoke(recipientNode,
tableMessagesFactory.readOnlySingleRowReplicaRequest()
.groupId(partGroupId)
@@ -577,7 +585,7 @@ public class InternalTableImpl implements InternalTable {
int batchNum = 0;
for (Int2ObjectOpenHashMap.Entry<List<BinaryRow>> partToRows :
keyRowsByPartition.int2ObjectEntrySet()) {
- ReplicationGroupId partGroupId =
partitionMap.get(partToRows.getIntKey()).groupId();
+ ReplicationGroupId partGroupId =
raftGroupServiceByPartitionId.get(partToRows.getIntKey()).groupId();
CompletableFuture<Object> fut = replicaSvc.invoke(recipientNode,
tableMessagesFactory.readOnlyMultiRowReplicaRequest()
.groupId(partGroupId)
@@ -888,7 +896,7 @@ public class InternalTableImpl implements InternalTable {
return new PartitionScanPublisher(
(scanId, batchSize) -> {
- ReplicationGroupId partGroupId =
partitionMap.get(partId).groupId();
+ ReplicationGroupId partGroupId =
raftGroupServiceByPartitionId.get(partId).groupId();
ReadOnlyScanRetrieveBatchReplicaRequest request =
tableMessagesFactory.readOnlyScanRetrieveBatchReplicaRequest()
.groupId(partGroupId)
@@ -999,7 +1007,7 @@ public class InternalTableImpl implements InternalTable {
) {
return new PartitionScanPublisher(
(scanId, batchSize) -> {
- ReplicationGroupId partGroupId =
partitionMap.get(partId).groupId();
+ ReplicationGroupId partGroupId =
raftGroupServiceByPartitionId.get(partId).groupId();
ReadWriteScanRetrieveBatchReplicaRequest request =
tableMessagesFactory.readWriteScanRetrieveBatchReplicaRequest()
.groupId(partGroupId)
@@ -1062,7 +1070,7 @@ public class InternalTableImpl implements InternalTable {
public List<String> assignments() {
awaitLeaderInitialization();
- return partitionMap.int2ObjectEntrySet().stream()
+ return raftGroupServiceByPartitionId.int2ObjectEntrySet().stream()
.sorted(Comparator.comparingInt(Int2ObjectOpenHashMap.Entry::getIntKey))
.map(Map.Entry::getValue)
.map(service -> service.leader().consistentId())
@@ -1072,7 +1080,7 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override
public List<PrimaryReplica> primaryReplicas() {
- List<Entry<RaftGroupService>> entries = new
ArrayList<>(partitionMap.int2ObjectEntrySet());
+ List<Entry<RaftGroupService>> entries = new
ArrayList<>(raftGroupServiceByPartitionId.int2ObjectEntrySet());
List<CompletableFuture<LeaderWithTerm>> futs = new ArrayList<>();
entries.sort(Comparator.comparingInt(Entry::getIntKey));
@@ -1097,7 +1105,7 @@ public class InternalTableImpl implements InternalTable {
public ClusterNode leaderAssignment(int partition) {
awaitLeaderInitialization();
- RaftGroupService raftGroupService = partitionMap.get(partition);
+ RaftGroupService raftGroupService =
raftGroupServiceByPartitionId.get(partition);
if (raftGroupService == null) {
throw new IgniteInternalException("No such partition " + partition
+ " in table " + tableName);
}
@@ -1108,7 +1116,7 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override
public RaftGroupService partitionRaftGroupService(int partition) {
- RaftGroupService raftGroupService = partitionMap.get(partition);
+ RaftGroupService raftGroupService =
raftGroupServiceByPartitionId.get(partition);
if (raftGroupService == null) {
throw new IgniteInternalException("No such partition " + partition
+ " in table " + tableName);
}
@@ -1129,7 +1137,7 @@ public class InternalTableImpl implements InternalTable {
private void awaitLeaderInitialization() {
List<CompletableFuture<Void>> futs = new ArrayList<>();
- for (RaftGroupService raftSvc : partitionMap.values()) {
+ for (RaftGroupService raftSvc :
raftGroupServiceByPartitionId.values()) {
if (raftSvc.leader() == null) {
futs.add(raftSvc.refreshLeader());
}
@@ -1152,7 +1160,7 @@ public class InternalTableImpl implements InternalTable {
public Map<Integer, List<String>> peersAndLearners() {
awaitLeaderInitialization();
- return partitionMap.int2ObjectEntrySet().stream()
+ return raftGroupServiceByPartitionId.int2ObjectEntrySet().stream()
.collect(Collectors.toMap(Entry::getIntKey, e -> {
RaftGroupService service = e.getValue();
return Stream.of(service.peers(), service.learners())
@@ -1201,14 +1209,14 @@ public class InternalTableImpl implements InternalTable
{
public void updateInternalTableRaftGroupService(int p, RaftGroupService
raftGrpSvc) {
RaftGroupService oldSrvc;
- synchronized (updatePartMapMux) {
+ synchronized (updatePartitionMapsMux) {
Int2ObjectMap<RaftGroupService> newPartitionMap = new
Int2ObjectOpenHashMap<>(partitions);
- newPartitionMap.putAll(partitionMap);
+ newPartitionMap.putAll(raftGroupServiceByPartitionId);
oldSrvc = newPartitionMap.put(p, raftGrpSvc);
- partitionMap = newPartitionMap;
+ raftGroupServiceByPartitionId = newPartitionMap;
}
if (oldSrvc != null) {
@@ -1224,7 +1232,7 @@ public class InternalTableImpl implements InternalTable {
* @return The enlist future (then will a leader become known).
*/
protected CompletableFuture<IgniteBiTuple<ClusterNode, Long>> enlist(int
partId, InternalTransaction tx) {
- RaftGroupService svc = partitionMap.get(partId);
+ RaftGroupService svc = raftGroupServiceByPartitionId.get(partId);
tx.assignCommitPartition(new TablePartitionId(tableId, partId));
// TODO: IGNITE-17256 Use a placement driver for getting a primary
replica.
@@ -1415,7 +1423,7 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override
public void close() {
- for (RaftGroupService srv : partitionMap.values()) {
+ for (RaftGroupService srv : raftGroupServiceByPartitionId.values()) {
srv.shutdown();
}
}
@@ -1428,7 +1436,7 @@ public class InternalTableImpl implements InternalTable {
* @return Cluster node to evalute read-only request.
*/
protected CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int
partId) {
- RaftGroupService svc = partitionMap.get(partId);
+ RaftGroupService svc = raftGroupServiceByPartitionId.get(partId);
return svc.refreshAndGetLeaderWithTerm().handle((res, e) -> {
if (e != null) {
@@ -1470,4 +1478,52 @@ public class InternalTableImpl implements InternalTable {
return e0;
}
+
+ @Override
+ public @Nullable PendingComparableValuesTracker<HybridTimestamp>
getPartitionSafeTimeTracker(int partitionId) {
+ return safeTimeTrackerByPartitionId.get(partitionId);
+ }
+
+ @Override
+ public @Nullable PendingComparableValuesTracker<Long>
getPartitionStorageIndexTracker(int partitionId) {
+ return storageIndexTrackerByPartitionId.get(partitionId);
+ }
+
+ /**
+ * Updates the partition trackers, if there were previous ones, it closes
them.
+ *
+ * @param partitionId Partition ID.
+ * @param newSafeTimeTracker New partition safe time tracker.
+ * @param newStorageIndexTracker New partition storage index tracker.
+ */
+ public void updatePartitionTrackers(
+ int partitionId,
+ PendingComparableValuesTracker<HybridTimestamp> newSafeTimeTracker,
+ PendingComparableValuesTracker<Long> newStorageIndexTracker
+ ) {
+ PendingComparableValuesTracker<HybridTimestamp>
previousSafeTimeTracker;
+ PendingComparableValuesTracker<Long> previousStorageIndexTracker;
+
+ synchronized (updatePartitionMapsMux) {
+ Int2ObjectMap<PendingComparableValuesTracker<HybridTimestamp>>
newSafeTimeTrackerMap = new Int2ObjectOpenHashMap<>(partitions);
+ Int2ObjectMap<PendingComparableValuesTracker<Long>>
newStorageIndexTrackerMap = new Int2ObjectOpenHashMap<>(partitions);
+
+ newSafeTimeTrackerMap.putAll(safeTimeTrackerByPartitionId);
+ newStorageIndexTrackerMap.putAll(storageIndexTrackerByPartitionId);
+
+ previousSafeTimeTracker = newSafeTimeTrackerMap.put(partitionId,
newSafeTimeTracker);
+ previousStorageIndexTracker =
newStorageIndexTrackerMap.put(partitionId, newStorageIndexTracker);
+
+ safeTimeTrackerByPartitionId = newSafeTimeTrackerMap;
+ storageIndexTrackerByPartitionId = newStorageIndexTrackerMap;
+ }
+
+ if (previousSafeTimeTracker != null) {
+ previousSafeTimeTracker.close();
+ }
+
+ if (previousStorageIndexTracker != null) {
+ previousStorageIndexTracker.close();
+ }
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
new file mode 100644
index 0000000000..e709ab3c04
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.storage;
+
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
+import java.util.UUID;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.network.ClusterNode;
+import org.junit.jupiter.api.Test;
+
+/**
+ * For {@link InternalTableImpl} testing.
+ */
+public class InternalTableImplTest {
+ @Test
+ void testUpdatePartitionTrackers() {
+ InternalTableImpl internalTable = new InternalTableImpl(
+ "test",
+ UUID.randomUUID(),
+ Int2ObjectMaps.emptyMap(),
+ 1,
+ s -> mock(ClusterNode.class),
+ mock(TxManager.class),
+ mock(MvTableStorage.class),
+ mock(TxStateTableStorage.class),
+ mock(ReplicaService.class),
+ mock(HybridClock.class)
+ );
+
+ // Let's check the empty table.
+ assertNull(internalTable.getPartitionSafeTimeTracker(0));
+ assertNull(internalTable.getPartitionStorageIndexTracker(0));
+
+ // Let's check the first insert.
+ PendingComparableValuesTracker<HybridTimestamp> safeTime0 =
mock(PendingComparableValuesTracker.class);
+ PendingComparableValuesTracker<Long> storageIndex0 =
mock(PendingComparableValuesTracker.class);
+
+ internalTable.updatePartitionTrackers(0, safeTime0, storageIndex0);
+
+ assertSame(safeTime0, internalTable.getPartitionSafeTimeTracker(0));
+ assertSame(storageIndex0,
internalTable.getPartitionStorageIndexTracker(0));
+
+ verify(safeTime0, never()).close();
+ verify(storageIndex0, never()).close();
+
+ // Let's check the new insert.
+ PendingComparableValuesTracker<HybridTimestamp> safeTime1 =
mock(PendingComparableValuesTracker.class);
+ PendingComparableValuesTracker<Long> storageIndex1 =
mock(PendingComparableValuesTracker.class);
+
+ internalTable.updatePartitionTrackers(0, safeTime1, storageIndex1);
+
+ assertSame(safeTime1, internalTable.getPartitionSafeTimeTracker(0));
+ assertSame(storageIndex1,
internalTable.getPartitionStorageIndexTracker(0));
+
+ verify(safeTime0).close();
+ verify(storageIndex0).close();
+ }
+}
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 0d607f4f65..6c482ade7b 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -188,7 +188,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
replicaSvc,
CLOCK
);
- RaftGroupService svc = partitionMap.get(0);
+ RaftGroupService svc = raftGroupServiceByPartitionId.get(0);
groupId = crossTableUsage ? new TablePartitionId(tableId(), PART_ID) :
crossTableGroupId;
@@ -277,7 +277,7 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
replicaListener = new PartitionReplicaListener(
mvPartStorage,
- partitionMap.get(PART_ID),
+ raftGroupServiceByPartitionId.get(PART_ID),
this.txManager,
this.txManager.lockManager(),
Runnable::run,