This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new eed4ba0edd8 IGNITE-27477 Raise LWM on recovery if it lags behind
Catalog history (#7330)
eed4ba0edd8 is described below
commit eed4ba0edd8cb2411a9646aba88b44a84a652824
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Dec 30 17:59:15 2025 +0400
IGNITE-27477 Raise LWM on recovery if it lags behind Catalog history (#7330)
---
modules/catalog-compaction/build.gradle | 2 +
.../compaction/CatalogCompactionRunner.java | 20 ++++++---
.../CatalogCompactionRunnerSelfTest.java | 2 +
.../apache/ignite/internal/index/IndexManager.java | 3 +-
.../ignite/internal/lowwatermark/LowWatermark.java | 11 +++++
.../internal/lowwatermark/LowWatermarkImpl.java | 18 ++++++--
.../internal/lowwatermark/TestLowWatermark.java | 5 +++
.../partition/replicator/fixtures/Node.java | 6 +--
.../PartitionReplicaLifecycleManager.java | 4 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 7 +---
.../internal/app/LowWatermarkRectifier.java} | 49 ++++++++++++----------
.../CatalogStorageIndexDescriptorSupplier.java | 4 +-
.../internal/table/distributed/TableManager.java | 11 ++---
.../table/distributed/index/IndexMetaStorage.java | 3 +-
14 files changed, 86 insertions(+), 59 deletions(-)
diff --git a/modules/catalog-compaction/build.gradle
b/modules/catalog-compaction/build.gradle
index d7ffe10d047..5dbee301560 100644
--- a/modules/catalog-compaction/build.gradle
+++ b/modules/catalog-compaction/build.gradle
@@ -38,6 +38,7 @@ dependencies {
implementation project(':ignite-table')
implementation project(':ignite-schema')
implementation project(':ignite-transactions')
+ implementation project(':ignite-low-watermark')
implementation libs.jetbrains.annotations
implementation libs.fastutil.core
@@ -47,6 +48,7 @@ dependencies {
testImplementation testFixtures(project(':ignite-metastorage'))
testImplementation testFixtures(project(':ignite-catalog'))
testImplementation testFixtures(project(':ignite-failure-handler'))
+ testImplementation testFixtures(project(':ignite-low-watermark'))
testImplementation libs.awaitility
integrationTestImplementation libs.fastutil.core
diff --git
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
index 5e981fe731e..c4521564770 100644
---
a/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
+++
b/modules/catalog-compaction/src/main/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunner.java
@@ -66,6 +66,9 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.lowwatermark.LowWatermark;
+import
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
+import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.network.InternalClusterNode;
@@ -150,6 +153,8 @@ public class CatalogCompactionRunner implements
IgniteComponent {
private final TopologyService topologyService;
+ private final LowWatermark lowWatermark;
+
private final RebalanceMinimumRequiredTimeProvider
rebalanceMinimumRequiredTimeProvider;
private CompletableFuture<Void> lastRunFuture =
CompletableFutures.nullCompletedFuture();
@@ -161,7 +166,7 @@ public class CatalogCompactionRunner implements
IgniteComponent {
*/
private volatile @Nullable String compactionCoordinatorNodeName;
- private volatile HybridTimestamp lowWatermark;
+ private volatile HybridTimestamp lowWatermarkValue;
private volatile UUID localNodeId;
@@ -178,6 +183,7 @@ public class CatalogCompactionRunner implements
IgniteComponent {
ClockService clockService,
SchemaSyncService schemaSyncService,
TopologyService topologyService,
+ LowWatermark lowWatermark,
ActiveLocalTxMinimumRequiredTimeProvider
activeLocalTxMinimumRequiredTimeProvider,
MinimumRequiredTimeCollectorService
minimumRequiredTimeCollectorService,
RebalanceMinimumRequiredTimeProvider
rebalanceMinimumRequiredTimeProvider
@@ -189,6 +195,7 @@ public class CatalogCompactionRunner implements
IgniteComponent {
this.clockService = clockService;
this.schemaSyncService = schemaSyncService;
this.topologyService = topologyService;
+ this.lowWatermark = lowWatermark;
this.placementDriver = placementDriver;
this.replicaService = replicaService;
this.activeLocalTxMinimumRequiredTimeProvider =
activeLocalTxMinimumRequiredTimeProvider;
@@ -199,6 +206,9 @@ public class CatalogCompactionRunner implements
IgniteComponent {
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
+ lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
+ params ->
onLowWatermarkChanged(((ChangeLowWatermarkEventParameters)
params).newLowWatermark()));
+
messagingService.addMessageHandler(CatalogCompactionMessageGroup.class, new
CatalogCompactionMessageHandler());
localNodeId = topologyService.localMember().id();
@@ -223,7 +233,7 @@ public class CatalogCompactionRunner implements
IgniteComponent {
public void updateCoordinator(InternalClusterNode newCoordinator) {
compactionCoordinatorNodeName = newCoordinator.name();
- triggerCompaction(lowWatermark);
+ triggerCompaction(lowWatermarkValue);
}
/** Returns local view of the node on who is currently compaction
coordinator. For test purposes only. */
@@ -238,8 +248,8 @@ public class CatalogCompactionRunner implements
IgniteComponent {
}
/** Called when the low watermark has been changed. */
- public CompletableFuture<Boolean> onLowWatermarkChanged(HybridTimestamp
newLowWatermark) {
- lowWatermark = newLowWatermark;
+ CompletableFuture<Boolean> onLowWatermarkChanged(HybridTimestamp
newLowWatermark) {
+ lowWatermarkValue = newLowWatermark;
triggerCompaction(newLowWatermark);
@@ -709,7 +719,7 @@ public class CatalogCompactionRunner implements
IgniteComponent {
}
private void handleMinimumTimesRequest(InternalClusterNode sender,
Long correlationId) {
- HybridTimestamp lwm = lowWatermark;
+ HybridTimestamp lwm = lowWatermarkValue;
LocalMinTime minLocalTime;
if (lwm != null) {
diff --git
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
index 19e360e8ec9..24afdffde7e 100644
---
a/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
+++
b/modules/catalog-compaction/src/test/java/org/apache/ignite/internal/catalog/compaction/CatalogCompactionRunnerSelfTest.java
@@ -99,6 +99,7 @@ import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.index.IndexNodeFinishedRwTransactionsChecker;
import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.InternalClusterNode;
@@ -1248,6 +1249,7 @@ public class CatalogCompactionRunnerSelfTest extends
AbstractCatalogCompactionTe
clockService,
schemaSyncService,
topologyService,
+ new TestLowWatermark(),
clockService::nowLong,
minTimeCollector,
rebalanceMinimumRequiredTimeProvider
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index cef5ae464b6..c784f5ef218 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -23,7 +23,6 @@ import static
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREAT
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_REMOVED;
import static org.apache.ignite.internal.event.EventListener.fromConsumer;
import static
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent.LOW_WATERMARK_CHANGED;
-import static
org.apache.ignite.internal.partition.replicator.SafeLowWatermarkUtils.catalogSafeLowWatermark;
import static
org.apache.ignite.internal.table.distributed.index.IndexUtils.registerIndexToTable;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -330,7 +329,7 @@ public class IndexManager implements IgniteComponent {
/** Recover deferred destroy events. */
private void recoverDestructionQueue() {
// LWM starts updating only after the node is restored.
- HybridTimestamp lwm = catalogSafeLowWatermark(lowWatermark,
catalogService);
+ HybridTimestamp lwm = lowWatermark.getLowWatermark();
int earliestCatalogVersion = lwm == null
? catalogService.earliestCatalogVersion()
diff --git
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
index 3164619628b..3184458ab53 100644
---
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
+++
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermark.java
@@ -51,6 +51,17 @@ public interface LowWatermark extends
EventProducer<LowWatermarkEvent, LowWaterm
*/
void updateLowWatermark(HybridTimestamp newLowWatermark);
+ /**
+ * Sets the low watermark during node recovery.
+ *
+ * <p>This method sets the low watermark immediately and unconditionally,
without checking if the new value is higher than
+ * the current one. Listeners are not notified about the change; even
more, they must not be registered at this point.
+ * It is intended to be used only during node recovery.
+ *
+ * @param newLowWatermark New low watermark.
+ */
+ void setLowWatermarkOnRecovery(HybridTimestamp newLowWatermark);
+
/**
* Locks the low watermark at the provided timestamp (prevents it from
being updated to a value higher than the provided one).
*
diff --git
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
index b9ea21be866..a82849b7d88 100644
---
a/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
+++
b/modules/low-watermark/src/main/java/org/apache/ignite/internal/lowwatermark/LowWatermarkImpl.java
@@ -164,7 +164,10 @@ public class LowWatermarkImpl extends
AbstractEventProducer<LowWatermarkEvent, L
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
return inBusyLockAsync(busyLock, () -> {
- setLowWatermarkOnRecovery(readLowWatermarkFromVault());
+ HybridTimestamp lwmFromVault = readLowWatermarkFromVault();
+ if (lwmFromVault != null) {
+ setLowWatermarkOnRecovery(lwmFromVault);
+ }
messagingService.addMessageHandler(LowWatermarkMessageGroup.class,
this::onReceiveNetworkMessage);
@@ -291,11 +294,14 @@ public class LowWatermarkImpl extends
AbstractEventProducer<LowWatermarkEvent, L
lowWatermark = newLowWatermark;
}
- private void setLowWatermarkOnRecovery(@Nullable HybridTimestamp
newLowWatermark) {
+ @Override
+ public void setLowWatermarkOnRecovery(HybridTimestamp newLowWatermark) {
updateLowWatermarkLock.writeLock().lock();
try {
lowWatermark = newLowWatermark;
+
+ saveWatermarkToVault(newLowWatermark);
} finally {
updateLowWatermarkLock.writeLock().unlock();
}
@@ -383,9 +389,9 @@ public class LowWatermarkImpl extends
AbstractEventProducer<LowWatermarkEvent, L
CompletableFuture<Void> updateAndNotify(HybridTimestamp newLowWatermark) {
return inBusyLockAsync(busyLock, () -> {
- vaultManager.put(LOW_WATERMARK_VAULT_KEY,
newLowWatermark.toBytes());
+ saveWatermarkToVault(newLowWatermark);
- return waitForLocksAndSetLowWatermark(newLowWatermark)
+ return waitForLocksAndSetLowWatermark(newLowWatermark)
.thenComposeAsync(unused2 -> fireEvent(
LOW_WATERMARK_CHANGED,
new
ChangeLowWatermarkEventParameters(newLowWatermark)), scheduledThreadPool)
@@ -404,6 +410,10 @@ public class LowWatermarkImpl extends
AbstractEventProducer<LowWatermarkEvent, L
);
}
+ private void saveWatermarkToVault(HybridTimestamp newLowWatermark) {
+ vaultManager.put(LOW_WATERMARK_VAULT_KEY, newLowWatermark.toBytes());
+ }
+
private CompletableFuture<Void>
waitForLocksAndSetLowWatermark(HybridTimestamp newLowWatermark) {
return inBusyLockAsync(busyLock, () -> {
// Write lock so no new LWM locks can be added.
diff --git
a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
index 66c6798fbcc..8af39abc56a 100644
---
a/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
+++
b/modules/low-watermark/src/testFixtures/java/org/apache/ignite/internal/lowwatermark/TestLowWatermark.java
@@ -85,6 +85,11 @@ public class TestLowWatermark extends
AbstractEventProducer<LowWatermarkEvent, L
}
}
+ @Override
+ public void setLowWatermarkOnRecovery(HybridTimestamp newLowWatermark) {
+ setLowWatermark(newLowWatermark);
+ }
+
@Override
public boolean tryLock(UUID lockId, HybridTimestamp lockTs) {
updateLowWatermarkLock.readLock().lock();
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index b4e6cb0506c..00099da8df1 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -100,8 +100,6 @@ import
org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermarkImpl;
-import
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
-import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -688,15 +686,13 @@ public class Node {
clockService,
schemaSyncService,
clusterService.topologyService(),
+ lowWatermark,
clockService::nowLong,
minTimeCollectorService,
new
RebalanceMinimumRequiredTimeProviderImpl(metaStorageManager, catalogManager));
metaStorageManager.addElectionListener(catalogCompactionRunner::updateCoordinator);
- lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
- params ->
catalogCompactionRunner.onLowWatermarkChanged(((ChangeLowWatermarkEventParameters)
params).newLowWatermark()));
-
SystemDistributedConfiguration systemDistributedConfiguration =
clusterConfigRegistry.getConfiguration(SystemDistributedExtensionConfiguration.KEY).system();
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index c4b66184794..742e7886382 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -59,7 +59,6 @@ import static
org.apache.ignite.internal.partition.replicator.LocalPartitionRepl
import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED;
import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.BEFORE_REPLICA_DESTROYED;
import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.BEFORE_REPLICA_STOPPED;
-import static
org.apache.ignite.internal.partition.replicator.SafeLowWatermarkUtils.catalogSafeLowWatermark;
import static
org.apache.ignite.internal.partitiondistribution.Assignments.assignmentListToString;
import static
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition;
import static
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignments;
@@ -477,8 +476,7 @@ public class PartitionReplicaLifecycleManager extends
handleResourcesForDroppedZonesOnRecovery();
- HybridTimestamp safeLwm = catalogSafeLowWatermark(lowWatermark,
catalogService);
- CompletableFuture<Void> processZonesAndAssignmentsOnStart =
processZonesOnStart(recoveryRevision, safeLwm)
+ CompletableFuture<Void> processZonesAndAssignmentsOnStart =
processZonesOnStart(recoveryRevision, lowWatermark.getLowWatermark())
.thenCompose(ignored ->
processAssignmentsOnRecovery(recoveryRevision));
metaStorageMgr.registerPrefixWatch(new
ByteArray(PENDING_ASSIGNMENTS_QUEUE_PREFIX_BYTES),
pendingAssignmentsRebalanceListener);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 91460355457..a4c140fbabe 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -157,8 +157,6 @@ import
org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermarkImpl;
-import
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
-import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.cache.IdempotentCacheVacuumizer;
@@ -1032,6 +1030,7 @@ public class IgniteImpl implements Ignite {
clockService,
schemaSyncService,
clusterSvc.topologyService(),
+ lowWatermark,
indexNodeFinishedRwTransactionsChecker,
minTimeCollectorService,
new RebalanceMinimumRequiredTimeProviderImpl(metaStorageMgr,
catalogManager)
@@ -1042,9 +1041,6 @@ public class IgniteImpl implements Ignite {
killCommandHandler = new KillCommandHandler(name,
logicalTopologyService, clusterSvc.messagingService());
- lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
- params ->
catalogCompactionRunner.onLowWatermarkChanged(((ChangeLowWatermarkEventParameters)
params).newLowWatermark()));
-
resourcesRegistry = new RemotelyTriggeredResourceRegistry();
var transactionInflights = new
TransactionInflights(placementDriverMgr.placementDriver(), clockService);
@@ -1604,6 +1600,7 @@ public class IgniteImpl implements Ignite {
lifecycleManager.startComponentsAsync(
componentContext,
catalogManager,
+ new LowWatermarkRectifier(lowWatermark,
catalogManager),
catalogCompactionRunner,
indexMetaStorage,
clusterCfgMgr,
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/SafeLowWatermarkUtils.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/LowWatermarkRectifier.java
similarity index 50%
rename from
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/SafeLowWatermarkUtils.java
rename to
modules/runner/src/main/java/org/apache/ignite/internal/app/LowWatermarkRectifier.java
index a7de6435fbb..205580ee98e 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/SafeLowWatermarkUtils.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/LowWatermarkRectifier.java
@@ -15,42 +15,45 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.partition.replicator;
+package org.apache.ignite.internal.app;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.manager.IgniteComponent;
import org.jetbrains.annotations.Nullable;
/**
- * Utility class for safe low watermark operations.
+ * Component that rectifies the low watermark on node startup if it is lower
than the earliest catalog timestamp.
*/
-public class SafeLowWatermarkUtils {
- /**
- * Returns a catalog-safe low watermark (that is, a timestamp that is not
lower than current LWM, but can be safely used
- * to access the catalog).
- * If the low watermark is {@code null}, {@code null} is returned. If the
low watermark is less than the earliest catalog timestamp,
- * the earliest catalog timestamp is returned. Otherwise, the original low
watermark is returned.
- *
- * @param watermark The low watermark.
- * @param catalogService The catalog service.
- * @return A catalog-safe low watermark or {@code null}.
- */
- public static @Nullable HybridTimestamp
catalogSafeLowWatermark(LowWatermark watermark, CatalogService catalogService) {
- HybridTimestamp lwmTimestamp = watermark.getLowWatermark();
-
- if (lwmTimestamp == null) {
- return null;
- }
+class LowWatermarkRectifier implements IgniteComponent {
+ private final LowWatermark lowWatermark;
+ private final CatalogService catalogService;
+
+ LowWatermarkRectifier(LowWatermark lowWatermark, CatalogService
catalogService) {
+ this.lowWatermark = lowWatermark;
+ this.catalogService = catalogService;
+ }
+ @Override
+ public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
HybridTimestamp earliestCatalogTimestamp =
hybridTimestamp(catalogService.earliestCatalog().time());
+ @Nullable HybridTimestamp lwm = lowWatermark.getLowWatermark();
- if (lwmTimestamp.compareTo(earliestCatalogTimestamp) < 0) {
- return earliestCatalogTimestamp;
- } else {
- return lwmTimestamp;
+ if (lwm != null && lwm.compareTo(earliestCatalogTimestamp) < 0) {
+ lowWatermark.setLowWatermarkOnRecovery(earliestCatalogTimestamp);
}
+
+ return nullCompletedFuture();
+ }
+
+ @Override
+ public CompletableFuture<Void> stopAsync(ComponentContext
componentContext) {
+ return nullCompletedFuture();
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplier.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplier.java
index 600c23ebd91..c7d97a8d647 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplier.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/CatalogStorageIndexDescriptorSupplier.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.table.distributed;
-import static
org.apache.ignite.internal.partition.replicator.SafeLowWatermarkUtils.catalogSafeLowWatermark;
-
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
@@ -49,7 +47,7 @@ class CatalogStorageIndexDescriptorSupplier implements
StorageIndexDescriptorSup
// Get the current Low Watermark value. Since this class is used only
on recovery, we expect that this value will not change
// concurrently.
- HybridTimestamp lowWatermarkTimestamp =
catalogSafeLowWatermark(lowWatermark, catalogService);
+ HybridTimestamp lowWatermarkTimestamp = lowWatermark.getLowWatermark();
int earliestCatalogVersion = lowWatermarkTimestamp == null
? catalogService.earliestCatalogVersion()
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 2743476c67e..bb649037bfe 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -33,7 +33,6 @@ import static
org.apache.ignite.internal.event.EventListener.fromConsumer;
import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED;
import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED;
import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED;
-import static
org.apache.ignite.internal.partition.replicator.SafeLowWatermarkUtils.catalogSafeLowWatermark;
import static
org.apache.ignite.internal.table.distributed.TableUtils.aliveTables;
import static
org.apache.ignite.internal.table.distributed.index.IndexUtils.registerIndexesToTable;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
@@ -532,9 +531,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
rebalanceRetryDelayConfiguration.init();
- @Nullable HybridTimestamp lwm =
catalogSafeLowWatermark(lowWatermark, catalogService);
-
- cleanUpResourcesForDroppedTablesOnRecoveryBusy(lwm);
+ cleanUpResourcesForDroppedTablesOnRecoveryBusy();
catalogService.listen(CatalogEvent.TABLE_CREATE,
onTableCreateListener);
catalogService.listen(CatalogEvent.TABLE_DROP,
onTableDropListener);
@@ -550,7 +547,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
long recoveryRevision = recoveryFinishFuture.join().revision();
- return recoverTables(recoveryRevision, lwm);
+ return recoverTables(recoveryRevision,
lowWatermark.getLowWatermark());
});
}
@@ -1860,10 +1857,10 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
}
- private void cleanUpResourcesForDroppedTablesOnRecoveryBusy(@Nullable
HybridTimestamp lwm) {
+ private void cleanUpResourcesForDroppedTablesOnRecoveryBusy() {
// TODO: IGNITE-20384 Clean up abandoned resources for dropped tables
from vault and metastore
- Set<Integer> aliveTableIds = aliveTables(catalogService, lwm);
+ Set<Integer> aliveTableIds = aliveTables(catalogService,
lowWatermark.getLowWatermark());
destroyMvStoragesForTablesNotIn(aliveTableIds);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
index 11d049d9e73..9a6afb2df4c 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/index/IndexMetaStorage.java
@@ -37,7 +37,6 @@ import static
org.apache.ignite.internal.metastorage.dsl.Conditions.value;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
-import static
org.apache.ignite.internal.partition.replicator.SafeLowWatermarkUtils.catalogSafeLowWatermark;
import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.READ_ONLY;
import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REMOVED;
import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.statusOnRemoveIndex;
@@ -323,7 +322,7 @@ public class IndexMetaStorage implements IgniteComponent {
}
private CompletableFuture<Void> recoverIndexMetas() {
- int lwmCatalogVersion =
lwmCatalogVersion(catalogSafeLowWatermark(lowWatermark, catalogService));
+ int lwmCatalogVersion =
lwmCatalogVersion(lowWatermark.getLowWatermark());
int latestCatalogVersion = catalogService.latestCatalogVersion();
int startCatalogVersion = Math.max(lwmCatalogVersion,
catalogService.earliestCatalogVersion());