This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d6d7b428781 IGNITE-28076 Use watch executor in
SchemaSafeTimeTrackerImpl (#7729)
d6d7b428781 is described below
commit d6d7b4287811d28c1a4d81829efda7446cb6e99e
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Mar 9 16:37:22 2026 +0400
IGNITE-28076 Use watch executor in SchemaSafeTimeTrackerImpl (#7729)
---
.../rebalance/ItRebalanceDistributedTest.java | 2 +-
.../metastorage/impl/MetaStorageManagerImpl.java | 7 ++++-
.../metastorage/impl/WatchProcessorAccess.java | 30 ++++++++++++++++++++++
.../server/AbstractKeyValueStorage.java | 6 +++++
.../metastorage/server/KeyValueStorage.java | 6 +++++
.../metastorage/server/WatchProcessor.java | 5 ++++
.../partition/replicator/fixtures/Node.java | 2 +-
.../runner/app/ItIgniteNodeRestartTest.java | 2 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 2 +-
.../internal/schema/SchemaSafeTimeTrackerImpl.java | 12 ++++++---
.../schema/SchemaSafeTimeTrackerImplTest.java | 9 ++++++-
11 files changed, 73 insertions(+), 10 deletions(-)
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index a2f4a1d4537..fc0e4728fa8 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1505,7 +1505,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
schemaManager = new SchemaManager(registry, catalogManager);
- schemaSafeTimeTracker = new
SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime());
+ schemaSafeTimeTracker = new
SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime(),
metaStorageManager.watchExecutor());
metaStorageManager.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
schemaSyncService = new
SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 20cc515ff44..eee80e1e468 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -128,7 +128,7 @@ import org.jetbrains.annotations.TestOnly;
* <li>Providing corresponding Meta storage service proxy interface</li>
* </ul>
*/
-public class MetaStorageManagerImpl implements MetaStorageManager,
MetastorageGroupMaintenance {
+public class MetaStorageManagerImpl implements MetaStorageManager,
MetastorageGroupMaintenance, WatchProcessorAccess {
private static final IgniteLogger LOG =
Loggers.forClass(MetaStorageManagerImpl.class);
private final ClusterService clusterService;
@@ -1409,4 +1409,9 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
public void markAsStopping() {
metaStorageSvcFut.thenAccept(MetaStorageServiceImpl::markAsStopping);
}
+
+ @Override
+ public Executor watchExecutor() {
+ return storage.watchExecutor();
+ }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/WatchProcessorAccess.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/WatchProcessorAccess.java
new file mode 100644
index 00000000000..8ad66e34dc6
--- /dev/null
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/WatchProcessorAccess.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.impl;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Provides access to watch processing required by internal components.
+ */
+public interface WatchProcessorAccess {
+ /**
+ * Returns executor for watch processing.
+ */
+ Executor watchExecutor();
+}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
index 292de6fe285..a41a8a092fb 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.apache.ignite.internal.failure.FailureProcessor;
@@ -396,4 +397,9 @@ public abstract class AbstractKeyValueStorage implements
KeyValueStorage {
notifyWatchProcessorEventsBeforeStartingWatches = null;
}
}
+
+ @Override
+ public Executor watchExecutor() {
+ return watchProcessor.watchExecutor();
+ }
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index ef9b9887d89..2afb4861cf4 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -21,6 +21,7 @@ import java.nio.file.Path;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metastorage.CommandId;
@@ -558,4 +559,9 @@ public interface KeyValueStorage extends ManuallyCloseable {
* @return Future that's completed when flushing of the data is completed.
*/
CompletableFuture<Void> flush();
+
+ /**
+ * Returns executor used to execute watches.
+ */
+ Executor watchExecutor();
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index 4e2691ed0d4..a11b5131b24 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -39,6 +39,7 @@ import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
@@ -513,4 +514,8 @@ public class WatchProcessor implements ManuallyCloseable {
IDEMPOTENT_COMMAND_PREFIX_BYTES, 0, prefixLength
);
}
+
+ Executor watchExecutor() {
+ return watchExecutor;
+ }
}
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 9339fd161a3..65b9de944a1 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -639,7 +639,7 @@ public class Node {
volatileLogStorageManagerCreator = new
VolatileLogStorageManagerCreator(name, workDir.resolve("volatile-log-spillout-"
+ name));
- schemaSafeTimeTracker = new
SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime());
+ schemaSafeTimeTracker = new
SchemaSafeTimeTrackerImpl(metaStorageManager.clusterTime(),
metaStorageManager.watchExecutor());
metaStorageManager.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
LongSupplier delayDurationMsSupplier = () -> DELAY_DURATION_MS;
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index d73fc0a2448..15bf03e44b7 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -602,7 +602,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
zoneId -> completedFuture(Set.of())
);
- var schemaSafeTimeTracker = new
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
+ var schemaSafeTimeTracker = new
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime(),
metaStorageMgr.watchExecutor());
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
LongSupplier delayDurationMsSupplier = () ->
TestIgnitionManager.DEFAULT_DELAY_DURATION_MS;
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 8cef74418ee..7ba2b4d4cd3 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -935,7 +935,7 @@ public class IgniteImpl implements Ignite {
volatileLogStorageManagerCreator
);
- schemaSafeTimeTracker = new
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
+ schemaSafeTimeTracker = new
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime(),
metaStorageMgr.watchExecutor());
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
SchemaSyncService schemaSyncService = new
SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImpl.java
index f05e79c0ada..9fb4beeada2 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImpl.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -39,6 +40,8 @@ import org.jetbrains.annotations.TestOnly;
public class SchemaSafeTimeTrackerImpl implements SchemaSafeTimeTracker,
IgniteComponent, NotificationEnqueuedListener {
private final ClusterTime clusterTime;
+ private final Executor watchExecutor;
+
private final PendingComparableValuesTracker<HybridTimestamp, Void>
schemaSafeTime =
new PendingComparableValuesTracker<>(HybridTimestamp.MIN_VALUE);
@@ -46,8 +49,9 @@ public class SchemaSafeTimeTrackerImpl implements
SchemaSafeTimeTracker, IgniteC
private final Object futureMutex = new Object();
- public SchemaSafeTimeTrackerImpl(ClusterTime clusterTime) {
+ public SchemaSafeTimeTrackerImpl(ClusterTime clusterTime, Executor
watchExecutor) {
this.clusterTime = clusterTime;
+ this.watchExecutor = watchExecutor;
}
@Override
@@ -74,16 +78,16 @@ public class SchemaSafeTimeTrackerImpl implements
SchemaSafeTimeTracker, IgniteC
// The update touches the Catalog (i.e. schemas), so we must
chain with the core notification future
// as Catalog listeners will be included in it (because we
need to wait for those listeners to finish execution
// before updating the schema safe time).
- newSchemaSafeTimeUpdateFuture =
schemaSafeTimeUpdateFuture.thenCompose(unused -> newNotificationFuture);
+ newSchemaSafeTimeUpdateFuture =
schemaSafeTimeUpdateFuture.thenComposeAsync(unused -> newNotificationFuture,
watchExecutor);
} else {
// The update does not concern the Catalog (schemas), so we
can update schema safe time as soon as previous updates to it
// get applied.
newSchemaSafeTimeUpdateFuture = schemaSafeTimeUpdateFuture;
}
- newSchemaSafeTimeUpdateFuture =
newSchemaSafeTimeUpdateFuture.thenRun(() -> {
+ newSchemaSafeTimeUpdateFuture =
newSchemaSafeTimeUpdateFuture.thenRunAsync(() -> {
schemaSafeTime.update(timestamp, null);
- });
+ }, watchExecutor);
schemaSafeTimeUpdateFuture = newSchemaSafeTimeUpdateFuture;
}
diff --git
a/modules/runner/src/test/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImplTest.java
b/modules/runner/src/test/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImplTest.java
index 5942e6ead89..fb3c411674a 100644
---
a/modules/runner/src/test/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImplTest.java
+++
b/modules/runner/src/test/java/org/apache/ignite/internal/schema/SchemaSafeTimeTrackerImplTest.java
@@ -25,11 +25,14 @@ import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.when;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.ExecutorServiceExtension;
+import org.apache.ignite.internal.testframework.InjectExecutorService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -37,15 +40,19 @@ import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
+@ExtendWith(ExecutorServiceExtension.class)
class SchemaSafeTimeTrackerImplTest extends BaseIgniteAbstractTest {
@Mock
private ClusterTime clusterTime;
+ @InjectExecutorService(threadCount = 1)
+ private ExecutorService executor;
+
private SchemaSafeTimeTrackerImpl tracker;
@BeforeEach
void createTracker() {
- tracker = new SchemaSafeTimeTrackerImpl(clusterTime);
+ tracker = new SchemaSafeTimeTrackerImpl(clusterTime, executor);
}
@Test