This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 5db7920c6c IGNITE-21658 Make LowWatermark a top level component (#3340)
5db7920c6c is described below
commit 5db7920c6cbfabc0f482fdd0942549b33b2747e2
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Mon Mar 4 14:59:14 2024 +0300
IGNITE-21658 Make LowWatermark a top level component (#3340)
---
.../runner/app/ItIgniteNodeRestartTest.java | 11 ++-
.../org/apache/ignite/internal/app/IgniteImpl.java | 16 +++--
.../rebalance/ItRebalanceDistributedTest.java | 16 +++--
.../internal/table/distributed/LowWatermark.java | 81 ++++++++++++++--------
.../distributed/LowWatermarkChangedListener.java | 37 ++++++++++
.../internal/table/distributed/TableManager.java | 19 ++---
.../ignite/internal/table/distributed/gc/MvGc.java | 8 ++-
.../table/distributed/LowWatermarkTest.java | 25 ++++---
.../table/distributed/TableManagerTest.java | 5 +-
.../internal/table/distributed/gc/MvGcTest.java | 28 ++++----
10 files changed, 168 insertions(+), 78 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 92fd0662f2..4dde7bce06 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -161,6 +161,7 @@ import
org.apache.ignite.internal.storage.DataStorageModules;
import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.distributed.LowWatermark;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
@@ -524,6 +525,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
var sqlRef = new AtomicReference<IgniteSqlImpl>();
+ LowWatermark lowWatermark = new LowWatermark(name,
gcConfig.lowWatermark(), hybridClock, txManager, vault, failureProcessor);
+
TableManager tableManager = new TableManager(
name,
registry,
@@ -547,16 +550,15 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
hybridClock,
new OutgoingSnapshotsManager(clusterSvc.messagingService()),
topologyAwareRaftGroupServiceFactory,
- vault,
distributionZoneManager,
schemaSyncService,
catalogManager,
new HybridTimestampTracker(),
placementDriverManager.placementDriver(),
sqlRef::get,
- failureProcessor,
resourcesRegistry,
- rebalanceScheduler
+ rebalanceScheduler,
+ lowWatermark
);
var indexManager = new IndexManager(
@@ -616,6 +618,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
cmgManager,
replicaMgr,
txManager,
+ lowWatermark,
metaStorageMgr,
clusterCfgMgr,
dataStorageManager,
@@ -635,6 +638,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
components.add(component);
}
+ lowWatermark.scheduleUpdates();
+
PartialNode partialNode = partialNode(
name,
nodeCfgMgr,
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index f550ecdb5e..d1103775b4 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -177,6 +177,7 @@ import
org.apache.ignite.internal.storage.engine.StorageEngine;
import org.apache.ignite.internal.storage.engine.ThreadAssertingStorageEngine;
import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
import org.apache.ignite.internal.systemview.api.SystemViewManager;
+import org.apache.ignite.internal.table.distributed.LowWatermark;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
@@ -344,6 +345,8 @@ public class IgniteImpl implements Ignite {
private final ClockWaiter clockWaiter;
+ private final LowWatermark lowWatermark;
+
private final OutgoingSnapshotsManager outgoingSnapshotsManager;
private final RestAddressReporter restAddressReporter;
@@ -683,6 +686,8 @@ public class IgniteImpl implements Ignite {
StorageUpdateConfiguration storageUpdateConfiguration =
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
+ lowWatermark = new LowWatermark(name, gcConfig.lowWatermark(), clock,
txManager, vaultMgr, failureProcessor);
+
distributedTblMgr = new TableManager(
name,
registry,
@@ -706,16 +711,15 @@ public class IgniteImpl implements Ignite {
clock,
outgoingSnapshotsManager,
topologyAwareRaftGroupServiceFactory,
- vaultMgr,
distributionZoneManager,
schemaSyncService,
catalogManager,
observableTimestampTracker,
placementDriverMgr.placementDriver(),
this::sql,
- failureProcessor,
resourcesRegistry,
- rebalanceScheduler
+ rebalanceScheduler,
+ lowWatermark
);
indexManager = new IndexManager(
@@ -959,7 +963,8 @@ public class IgniteImpl implements Ignite {
restComponent,
raftMgr,
clusterStateStorage,
- cmgMgr
+ cmgMgr,
+ lowWatermark
);
clusterSvc.updateMetadata(new
NodeMetadata(restComponent.hostName(), restComponent.httpPort(),
restComponent.httpsPort()));
@@ -1035,6 +1040,9 @@ public class IgniteImpl implements Ignite {
.thenComposeAsync(ignored ->
awaitSelfInLocalLogicalTopology(), startupExecutor)
.thenRunAsync(() -> {
try {
+ // Enable watermark events.
+ lowWatermark.scheduleUpdates();
+
// Enable REST component on start complete.
restComponent.enable();
// Transfer the node to the STARTED state.
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 5ddbc3db69..6be1f738c4 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -168,6 +168,7 @@ import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableRaftService;
import org.apache.ignite.internal.table.TableTestUtils;
import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.distributed.LowWatermark;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
@@ -954,6 +955,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
private final NetworkAddress networkAddress;
+ private final LowWatermark lowWatermark;
+
/** The future have to be complete after the node start and all Meta
storage watches are deployd. */
private CompletableFuture<Void> deployWatchesFut;
@@ -1168,6 +1171,9 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
StorageUpdateConfiguration storageUpdateConfiguration =
clusterConfigRegistry.getConfiguration(StorageUpdateConfiguration.KEY);
+ HybridClockImpl clock = new HybridClockImpl();
+ lowWatermark = new LowWatermark(name, gcConfig.lowWatermark(),
clock, txManager, vaultManager, failureProcessor);
+
tableManager = new TableManager(
name,
registry,
@@ -1188,19 +1194,18 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
view -> new LocalLogStorageFactory(),
threadPoolsManager.tableIoExecutor(),
threadPoolsManager.partitionOperationsExecutor(),
- new HybridClockImpl(),
+ clock,
new
OutgoingSnapshotsManager(clusterService.messagingService()),
topologyAwareRaftGroupServiceFactory,
- vaultManager,
distributionZoneManager,
schemaSyncService,
catalogManager,
new HybridTimestampTracker(),
placementDriver,
() -> mock(IgniteSql.class),
- failureProcessor,
resourcesRegistry,
- rebalanceScheduler
+ rebalanceScheduler,
+ lowWatermark
) {
@Override
protected TxStateTableStorage createTxStateTableStorage(
@@ -1271,6 +1276,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
deployWatchesFut = CompletableFuture.supplyAsync(() -> {
List<IgniteComponent> secondComponents = List.of(
+ lowWatermark,
metaStorageManager,
clusterCfgMgr,
clockWaiter,
@@ -1298,6 +1304,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
assertThat(configurationNotificationFut, willSucceedIn(1,
TimeUnit.MINUTES));
+ lowWatermark.scheduleUpdates();
+
return metaStorageManager.deployWatches();
}).thenCompose(identity());
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
index c0cf426414..8e5cc3cf20 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
@@ -18,15 +18,19 @@
package org.apache.ignite.internal.table.distributed;
import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -35,8 +39,8 @@ import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
import
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
-import org.apache.ignite.internal.table.distributed.gc.MvGc;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.ByteUtils;
@@ -55,7 +59,7 @@ import org.jetbrains.annotations.Nullable;
*
* @see <a
href="https://cwiki.apache.org/confluence/display/IGNITE/IEP-91%3A+Transaction+protocol">IEP-91</a>
*/
-public class LowWatermark implements ManuallyCloseable {
+public class LowWatermark implements IgniteComponent {
private static final IgniteLogger LOG =
Loggers.forClass(LowWatermark.class);
static final ByteArray LOW_WATERMARK_VAULT_KEY = new
ByteArray("low-watermark");
@@ -68,7 +72,7 @@ public class LowWatermark implements ManuallyCloseable {
private final VaultManager vaultManager;
- private final MvGc mvGc;
+ private final List<LowWatermarkChangedListener> updateListeners = new
CopyOnWriteArrayList<>();
private final ScheduledExecutorService scheduledThreadPool;
@@ -76,7 +80,7 @@ public class LowWatermark implements ManuallyCloseable {
private final AtomicBoolean closeGuard = new AtomicBoolean();
- private volatile HybridTimestamp lowWatermark;
+ private volatile @Nullable HybridTimestamp lowWatermark;
private final AtomicReference<ScheduledFuture<?>> lastScheduledTaskFuture
= new AtomicReference<>();
@@ -90,7 +94,6 @@ public class LowWatermark implements ManuallyCloseable {
* @param clock A hybrid logical clock.
* @param txManager Transaction manager.
* @param vaultManager Vault manager.
- * @param mvGc MVCC garbage collector.
* @param failureProcessor Failure processor tha is used to handle
critical errors.
*/
public LowWatermark(
@@ -99,14 +102,12 @@ public class LowWatermark implements ManuallyCloseable {
HybridClock clock,
TxManager txManager,
VaultManager vaultManager,
- MvGc mvGc,
FailureProcessor failureProcessor
) {
this.lowWatermarkConfig = lowWatermarkConfig;
this.clock = clock;
this.txManager = txManager;
this.vaultManager = vaultManager;
- this.mvGc = mvGc;
this.failureProcessor = failureProcessor;
scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
@@ -117,11 +118,23 @@ public class LowWatermark implements ManuallyCloseable {
/**
* Starts the watermark manager.
*/
- public void start() {
+ @Override
+ public CompletableFuture<Void> start() {
+ inBusyLock(busyLock, () -> {
+ lowWatermark = readLowWatermarkFromVault();
+ });
+
+ return nullCompletedFuture();
+ }
+
+ /**
+ * Schedule watermark updates.
+ */
+ public void scheduleUpdates() {
inBusyLock(busyLock, () -> {
- HybridTimestamp lowWatermark = readLowWatermarkFromVault();
+ HybridTimestamp lowWatermarkCandidate = lowWatermark;
- if (lowWatermark == null) {
+ if (lowWatermarkCandidate == null) {
LOG.info("Previous value of the low watermark was not found,
will schedule to update it");
scheduleUpdateLowWatermarkBusy();
@@ -129,19 +142,14 @@ public class LowWatermark implements ManuallyCloseable {
return;
}
- LOG.info(
- "Low watermark has been successfully retrieved from the
vault and is scheduled to be updated: {}",
- lowWatermark
- );
+ LOG.info("Low watermark has been scheduled to be updated: {}",
lowWatermarkCandidate);
- txManager.updateLowWatermark(lowWatermark)
- .thenRun(() -> inBusyLock(busyLock, () -> {
- this.lowWatermark = lowWatermark;
-
- runGcAndScheduleUpdateLowWatermarkBusy(lowWatermark);
- }))
+ txManager.updateLowWatermark(lowWatermarkCandidate)
+ .thenComposeAsync(unused -> inBusyLock(busyLock, () ->
notifyListeners(lowWatermarkCandidate)), scheduledThreadPool)
.whenComplete((unused, throwable) -> {
- if (throwable != null && !(throwable instanceof
NodeStoppingException)) {
+ if (throwable == null) {
+ inBusyLock(busyLock,
this::scheduleUpdateLowWatermarkBusy);
+ } else if (!(throwable instanceof
NodeStoppingException)) {
LOG.error("Error during the Watermark manager
start", throwable);
failureProcessor.process(new
FailureContext(CRITICAL_ERROR, throwable));
@@ -159,7 +167,7 @@ public class LowWatermark implements ManuallyCloseable {
}
@Override
- public void close() {
+ public void stop() {
if (!closeGuard.compareAndSet(false, true)) {
return;
}
@@ -190,12 +198,12 @@ public class LowWatermark implements ManuallyCloseable {
// created, then we can safely promote the candidate as a new low
watermark, store it in vault, and we can safely start cleaning
// up the stale/junk data in the tables.
txManager.updateLowWatermark(lowWatermarkCandidate)
- .thenRunAsync(() -> inBusyLock(busyLock, () -> {
+ .thenComposeAsync(unused -> inBusyLock(busyLock, () -> {
vaultManager.put(LOW_WATERMARK_VAULT_KEY,
ByteUtils.toBytes(lowWatermarkCandidate));
lowWatermark = lowWatermarkCandidate;
-
runGcAndScheduleUpdateLowWatermarkBusy(lowWatermarkCandidate);
+ return notifyListeners(lowWatermarkCandidate);
}), scheduledThreadPool)
.whenComplete((unused, throwable) -> {
if (throwable != null) {
@@ -206,15 +214,32 @@ public class LowWatermark implements ManuallyCloseable {
}
} else {
LOG.info("Successful low watermark update: {}",
lowWatermarkCandidate);
+
+ scheduleUpdateLowWatermarkBusy();
}
});
});
}
- private void runGcAndScheduleUpdateLowWatermarkBusy(HybridTimestamp
lowWatermark) {
- mvGc.updateLowWatermark(lowWatermark);
+ public void addUpdateListener(LowWatermarkChangedListener listener) {
+ updateListeners.add(listener);
+ }
+
+ public void removeUpdateListener(LowWatermarkChangedListener listener) {
+ updateListeners.remove(listener);
+ }
+
+ private CompletableFuture<Void> notifyListeners(HybridTimestamp
lowWatermark) {
+ if (updateListeners.isEmpty()) {
+ return nullCompletedFuture();
+ }
+
+ ArrayList<CompletableFuture<?>> res = new ArrayList<>();
+ for (LowWatermarkChangedListener updateListener : updateListeners) {
+ res.add(updateListener.onLwmChanged(lowWatermark));
+ }
- scheduleUpdateLowWatermarkBusy();
+ return CompletableFuture.allOf(res.toArray(CompletableFuture[]::new));
}
private void scheduleUpdateLowWatermarkBusy() {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java
new file mode 100644
index 0000000000..2a738a7c46
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermarkChangedListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+
+/**
+ * LWM event listener interface.
+ *
+ * @see LowWatermark
+ */
+@FunctionalInterface
+public interface LowWatermarkChangedListener {
+ /**
+ * Low watermark changed callback.
+ *
+ * @param ts New low watermark.
+ * @return A future, which completes after the event has been processed.
+ */
+ CompletableFuture<Void> onLwmChanged(HybridTimestamp ts);
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index ed3bf6eec2..3df5f576f1 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -114,7 +114,6 @@ import
org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.rebalance.PartitionMover;
import
org.apache.ignite.internal.distributionzones.rebalance.RebalanceRaftGroupEventsListener;
import org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil;
-import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
@@ -209,7 +208,6 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.utils.RebalanceUtilEx;
-import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.worker.ThreadAssertions;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.util.IgniteNameUtils;
@@ -412,11 +410,10 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
* @param partitionOperationsExecutor Striped executor on which partition
operations (potentially requiring I/O with storages)
* will be executed.
* @param raftGroupServiceFactory Factory that is used for creation of
raft group services for replication groups.
- * @param vaultManager Vault manager.
* @param placementDriver Placement driver.
* @param sql A supplier function that returns {@link IgniteSql}.
- * @param failureProcessor Failure processor that is used to process
critical errors.
* @param rebalanceScheduler Executor for scheduling rebalance routine.
+ * @param lowWatermark Low watermark.
*/
public TableManager(
String nodeName,
@@ -441,16 +438,15 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
HybridClock clock,
OutgoingSnapshotsManager outgoingSnapshotsManager,
TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory,
- VaultManager vaultManager,
DistributionZoneManager distributionZoneManager,
SchemaSyncService schemaSyncService,
CatalogService catalogService,
HybridTimestampTracker observableTimestampTracker,
PlacementDriver placementDriver,
Supplier<IgniteSql> sql,
- FailureProcessor failureProcessor,
RemotelyTriggeredResourceRegistry
remotelyTriggeredResourceRegistry,
- ScheduledExecutorService rebalanceScheduler
+ ScheduledExecutorService rebalanceScheduler,
+ LowWatermark lowWatermark
) {
this.topologyService = topologyService;
this.raftMgr = raftMgr;
@@ -474,6 +470,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
this.storageUpdateConfig = storageUpdateConfig;
this.remotelyTriggeredResourceRegistry =
remotelyTriggeredResourceRegistry;
this.rebalanceScheduler = rebalanceScheduler;
+ this.lowWatermark = lowWatermark;
this.executorInclinedSchemaSyncService = new
ExecutorInclinedSchemaSyncService(schemaSyncService,
partitionOperationsExecutor);
this.executorInclinedPlacementDriver = new
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -525,8 +522,6 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
mvGc = new MvGc(nodeName, gcConfig);
- lowWatermark = new LowWatermark(nodeName, gcConfig.lowWatermark(),
clock, txManager, vaultManager, mvGc, failureProcessor);
-
raftCommandsMarshaller = new
ThreadLocalPartitionCommandsMarshaller(messageSerializationRegistry);
partitionReplicatorNodeRecovery = new PartitionReplicatorNodeRecovery(
@@ -552,8 +547,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
public CompletableFuture<Void> start() {
return inBusyLockAsync(busyLock, () -> {
mvGc.start();
-
- lowWatermark.start();
+ lowWatermark.addUpdateListener(mvGc);
transactionStateResolver.start();
@@ -1078,6 +1072,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
metaStorageMgr.unregisterWatch(stableAssignmentsRebalanceListener);
metaStorageMgr.unregisterWatch(assignmentsSwitchRebalanceListener);
+ lowWatermark.removeUpdateListener(mvGc);
+
var tablesToStop = new HashMap<Integer, TableImpl>();
tablesToStop.putAll(latestTablesById());
@@ -1095,7 +1091,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
IgniteUtils.closeAllManually(
- lowWatermark,
mvGc,
fullStateTransferIndexChooser,
sharedTxStateStorage,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
index 4bf46fd6ac..38e26e05d4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
+import
org.apache.ignite.internal.table.distributed.LowWatermarkChangedListener;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.TrackerClosedException;
@@ -50,7 +51,7 @@ import org.jetbrains.annotations.TestOnly;
*
* @see GcUpdateHandler#vacuumBatch(HybridTimestamp, int, boolean)
*/
-public class MvGc implements ManuallyCloseable {
+public class MvGc implements LowWatermarkChangedListener, ManuallyCloseable {
private static final IgniteLogger LOG = Loggers.forClass(MvGc.class);
/** Node name. */
@@ -153,7 +154,8 @@ public class MvGc implements ManuallyCloseable {
* @param newLwm New low watermark.
* @throws IgniteInternalException with {@link
GarbageCollector#CLOSED_ERR} If the garbage collector is closed.
*/
- public void updateLowWatermark(HybridTimestamp newLwm) {
+ @Override
+ public CompletableFuture<Void> onLwmChanged(HybridTimestamp newLwm) {
inBusyLock(() -> {
HybridTimestamp updatedLwm =
lowWatermarkReference.updateAndGet(currentLwm -> {
if (currentLwm == null) {
@@ -171,6 +173,8 @@ public class MvGc implements ManuallyCloseable {
executor.submit(() -> inBusyLock(this::initNewGcBusy));
});
+
+ return nullCompletedFuture();
}
@Override
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
index 42997ac27c..c35e203fdb 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
-import org.apache.ignite.internal.table.distributed.gc.MvGc;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.ByteUtils;
@@ -72,26 +71,31 @@ public class LowWatermarkTest extends
BaseIgniteAbstractTest {
private final VaultManager vaultManager = mock(VaultManager.class);
- private final MvGc mvGc = mock(MvGc.class);
+ private LowWatermarkChangedListener listener;
private LowWatermark lowWatermark;
@BeforeEach
void setUp() {
- lowWatermark = new LowWatermark("test", lowWatermarkConfig, clock,
txManager, vaultManager, mvGc, mock(FailureProcessor.class));
+ listener = mock(LowWatermarkChangedListener.class);
+
when(listener.onLwmChanged(any(HybridTimestamp.class))).thenReturn(nullCompletedFuture());
+
+ lowWatermark = new LowWatermark("test", lowWatermarkConfig, clock,
txManager, vaultManager, mock(FailureProcessor.class));
+ lowWatermark.addUpdateListener(listener);
}
@AfterEach
void tearDown() {
- lowWatermark.close();
+ lowWatermark.stop();
}
@Test
void testStartWithEmptyVault() {
// Let's check the start with no low watermark in vault.
lowWatermark.start();
+ lowWatermark.scheduleUpdates();
- verify(mvGc, never()).updateLowWatermark(any(HybridTimestamp.class));
+ verify(listener, never()).onLwmChanged(any(HybridTimestamp.class));
assertNull(lowWatermark.getLowWatermark());
}
@@ -106,7 +110,11 @@ public class LowWatermarkTest extends
BaseIgniteAbstractTest {
this.lowWatermark.start();
- verify(mvGc).updateLowWatermark(lowWatermark);
+ assertEquals(lowWatermark, this.lowWatermark.getLowWatermark());
+
+ this.lowWatermark.scheduleUpdates();
+
+ verify(listener, timeout(1_000)).onLwmChanged(lowWatermark);
assertEquals(lowWatermark, this.lowWatermark.getLowWatermark());
}
@@ -135,13 +143,13 @@ public class LowWatermarkTest extends
BaseIgniteAbstractTest {
lowWatermark.updateLowWatermark();
- InOrder inOrder = inOrder(txManager, vaultManager, mvGc);
+ InOrder inOrder = inOrder(txManager, vaultManager, listener);
inOrder.verify(txManager).updateLowWatermark(newLowWatermarkCandidate);
inOrder.verify(vaultManager,
timeout(1000)).put(LOW_WATERMARK_VAULT_KEY,
ByteUtils.toBytes(newLowWatermarkCandidate));
- inOrder.verify(mvGc).updateLowWatermark(newLowWatermarkCandidate);
+ inOrder.verify(listener,
timeout(1_000)).onLwmChanged(newLowWatermarkCandidate);
assertEquals(newLowWatermarkCandidate, lowWatermark.getLowWatermark());
}
@@ -167,6 +175,7 @@ public class LowWatermarkTest extends
BaseIgniteAbstractTest {
});
lowWatermark.start();
+ this.lowWatermark.scheduleUpdates();
// Let's check that it hasn't been called more than once.
assertFalse(startGetAllReadOnlyTransactions.await(1,
TimeUnit.SECONDS));
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 264b0d69c9..f03fa9043c 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -769,16 +769,15 @@ public class TableManagerTest extends IgniteAbstractTest {
clock,
new
OutgoingSnapshotsManager(clusterService.messagingService()),
mock(TopologyAwareRaftGroupServiceFactory.class),
- vaultManager,
distributionZoneManager,
new AlwaysSyncedSchemaSyncService(),
catalogManager,
new HybridTimestampTracker(),
new TestPlacementDriver(node),
() -> mock(IgniteSql.class),
- mock(FailureProcessor.class),
new RemotelyTriggeredResourceRegistry(),
- mock(ScheduledExecutorService.class)
+ mock(ScheduledExecutorService.class),
+ new LowWatermark(NODE_NAME, gcConfig.lowWatermark(), clock,
tm, vaultManager, mock(FailureProcessor.class))
) {
@Override
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
index abe97336be..04f6035ff5 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
@@ -96,7 +96,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
void testAddStorageWithLowWatermark() {
HybridTimestamp lowWatermark = new HybridTimestamp(1, 1);
- gc.updateLowWatermark(lowWatermark);
+ gc.onLwmChanged(lowWatermark);
CompletableFuture<Void> invokeVacuumMethodFuture = new
CompletableFuture<>();
@@ -119,7 +119,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
gc.addStorage(createTablePartitionId(), gcUpdateHandler0);
gc.addStorage(createTablePartitionId(), gcUpdateHandler1);
- gc.updateLowWatermark(lowWatermark0);
+ gc.onLwmChanged(lowWatermark0);
// We expect GcUpdateHandler#vacuum to be called with the set
lowWatermark0.
assertThat(invokeVacuumMethodFuture0, willCompleteSuccessfully());
@@ -134,7 +134,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
completeFutureOnVacuum(gcUpdateHandler0, invokeVacuumMethodFuture2,
lowWatermark1);
completeFutureOnVacuum(gcUpdateHandler1, invokeVacuumMethodFuture3,
lowWatermark1);
- gc.updateLowWatermark(lowWatermark1);
+ gc.onLwmChanged(lowWatermark1);
// We expect GcUpdateHandler#vacuum to be called with the set
lowWatermark0.
assertThat(invokeVacuumMethodFuture2, willCompleteSuccessfully());
@@ -154,7 +154,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
gc.addStorage(createTablePartitionId(), gcUpdateHandler0);
gc.addStorage(createTablePartitionId(), gcUpdateHandler1);
- gc.updateLowWatermark(firstLowWatermark);
+ gc.onLwmChanged(firstLowWatermark);
// We expect GcUpdateHandler#vacuum to be called with the set
lowWatermark0.
assertThat(invokeVacuumMethodFuture0, willCompleteSuccessfully());
@@ -169,7 +169,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
completeFutureOnVacuum(gcUpdateHandler0,
invokeVacuumMethodFutureForSame0, sameLowWatermark);
completeFutureOnVacuum(gcUpdateHandler1,
invokeVacuumMethodFutureForSame1, sameLowWatermark);
- gc.updateLowWatermark(sameLowWatermark);
+ gc.onLwmChanged(sameLowWatermark);
// We expect that GcUpdateHandler#vacuum will not be called.
assertThat(invokeVacuumMethodFutureForSame0, willTimeoutFast());
@@ -184,7 +184,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
completeFutureOnVacuum(gcUpdateHandler0,
invokeVacuumMethodFutureForLower0, lowerLowWatermark);
completeFutureOnVacuum(gcUpdateHandler1,
invokeVacuumMethodFutureForLower1, lowerLowWatermark);
- gc.updateLowWatermark(lowerLowWatermark);
+ gc.onLwmChanged(lowerLowWatermark);
// We expect that GcUpdateHandler#vacuum will not be called.
assertThat(invokeVacuumMethodFutureForSame0, willTimeoutFast());
@@ -201,7 +201,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
gc.addStorage(createTablePartitionId(), gcUpdateHandler);
- gc.updateLowWatermark(new HybridTimestamp(2, 2));
+ gc.onLwmChanged(new HybridTimestamp(2, 2));
assertTrue(latch.await(200, TimeUnit.MILLISECONDS));
}
@@ -219,7 +219,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
gc.addStorage(tablePartitionId,
createWithCompleteFutureOnVacuum(invokeVacuumMethodFuture, null));
- gc.updateLowWatermark(new HybridTimestamp(1, 1));
+ gc.onLwmChanged(new HybridTimestamp(1, 1));
assertThat(invokeVacuumMethodFuture, willCompleteSuccessfully());
assertThat(gc.removeStorage(tablePartitionId),
willCompleteSuccessfully());
@@ -237,7 +237,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
gc.addStorage(tablePartitionId,
createWithWaitFinishVacuum(startInvokeVacuumMethodFuture,
finishInvokeVacuumMethodFuture));
- gc.updateLowWatermark(new HybridTimestamp(1, 1));
+ gc.onLwmChanged(new HybridTimestamp(1, 1));
assertThat(startInvokeVacuumMethodFuture, willCompleteSuccessfully());
@@ -259,7 +259,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
gc.addStorage(tablePartitionId,
createWithWaitFinishVacuum(startInvokeVacuumMethodFuture,
finishInvokeVacuumMethodFuture));
- gc.updateLowWatermark(new HybridTimestamp(1, 1));
+ gc.onLwmChanged(new HybridTimestamp(1, 1));
assertThat(startInvokeVacuumMethodFuture, willCompleteSuccessfully());
@@ -282,7 +282,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
gc.addStorage(tablePartitionId, gcUpdateHandler);
- gc.updateLowWatermark(new HybridTimestamp(1, 1));
+ gc.onLwmChanged(new HybridTimestamp(1, 1));
assertThat(invokeVacuumMethodFuture0, willCompleteSuccessfully());
assertThat(gc.removeStorage(tablePartitionId),
willCompleteSuccessfully());
@@ -301,7 +301,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
assertThrowsClosed(() -> gc.addStorage(createTablePartitionId(),
createGcUpdateHandler()));
assertThrowsClosed(() -> gc.removeStorage(createTablePartitionId()));
- assertThrowsClosed(() -> gc.updateLowWatermark(new HybridTimestamp(1,
1)));
+ assertThrowsClosed(() -> gc.onLwmChanged(new HybridTimestamp(1, 1)));
assertDoesNotThrow(gc::close);
}
@@ -317,7 +317,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
gc.start();
- gc.updateLowWatermark(new HybridTimestamp(1, 1));
+ gc.onLwmChanged(new HybridTimestamp(1, 1));
for (int i = 0; i < 100; i++) {
CountDownLatch latch = new CountDownLatch(5);
@@ -357,7 +357,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
gc.addStorage(createTablePartitionId(), gcUpdateHandler);
// Let's update the low watermark and see that we didn't start the
garbage collection because we didn't reach the safe time.
- gc.updateLowWatermark(lvm);
+ gc.onLwmChanged(lvm);
assertThat(invokeVacuumMethodFuture, willTimeoutFast());
verify(safeTimeTracker).waitFor(lvm);