This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7f5bda1cb8 IGNITE-19209 Implement installing table schema updates
(#2228)
7f5bda1cb8 is described below
commit 7f5bda1cb88e5ed854343a8534d917f3207318cf
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Jun 23 16:38:12 2023 +0400
IGNITE-19209 Implement installing table schema updates (#2228)
---
modules/api/build.gradle | 1 +
.../testframework/IntegrationTestBase.java | 2 +-
.../testframework/TestIgnitionManager.java | 39 ++++
.../internal/catalog/CatalogServiceImpl.java | 97 +++++----
.../ignite/internal/catalog/ClockWaiter.java | 171 ++++++++++++++++
.../SchemaSynchronizationConfigurationSchema.java | 3 +-
.../ignite/internal/catalog/storage/UpdateLog.java | 4 +-
.../internal/catalog/storage/UpdateLogImpl.java | 31 ++-
.../internal/catalog/storage/VersionedUpdate.java | 15 +-
.../internal/catalog/CatalogServiceSelfTest.java | 216 ++++++++++++---------
.../ignite/internal/catalog/ClockWaiterTest.java | 97 +++++++++
.../catalog/storage/UpdateLogImplTest.java | 24 +--
.../internal/cli/commands/ItConfigCommandTest.java | 2 +-
.../internal/rest/ItGeneratedRestClientTest.java | 2 +-
.../{HybridClock.java => ClockUpdateListener.java} | 27 +--
.../apache/ignite/internal/hlc/HybridClock.java | 14 ++
.../ignite/internal/hlc/HybridClockImpl.java | 23 ++-
.../ignite/internal/hlc/HybridTimestamp.java | 8 +
.../apache/ignite/internal/HybridClockTest.java | 60 ++++++
.../apache/ignite/internal/TestHybridClock.java | 24 ++-
.../apache/ignite/jdbc/AbstractJdbcSelfTest.java | 2 +-
.../internal/metastorage/MetaStorageManager.java | 20 ++
.../ignite/internal/metastorage/WatchEvent.java | 22 ++-
.../metastorage/impl/MetaStorageManagerImpl.java | 26 +++
.../metastorage/server/KeyValueStorage.java | 8 +
.../metastorage/server/WatchProcessor.java | 8 +-
.../server/persistence/RocksDbKeyValueStorage.java | 3 +-
.../server/SimpleInMemoryKeyValueStorage.java | 8 +
modules/rest-api/openapi/openapi.yaml | 2 +-
.../java/org/apache/ignite/internal/Cluster.java | 4 +-
.../benchmark/AbstractOneNodeBenchmark.java | 2 +-
.../cluster/management/ItClusterInitTest.java | 4 +-
.../component/ItRestAddressReportTest.java | 3 +-
.../storage/ItRebalanceDistributedTest.java | 19 +-
.../raftsnapshot/ItTableRaftSnapshotsTest.java | 2 +-
.../runner/app/AbstractSchemaChangeTest.java | 2 +-
.../internal/runner/app/ItDataSchemaSyncTest.java | 2 +-
.../app/ItIgniteInMemoryNodeRestartTest.java | 2 +-
.../runner/app/ItIgniteNodeRestartTest.java | 13 +-
.../internal/runner/app/ItTableCreationTest.java | 2 +-
.../internal/runner/app/ItTablesApiTest.java | 2 +-
.../runner/app/PlatformTestNodeRunner.java | 2 +-
.../app/client/ItAbstractThinClientTest.java | 2 +-
.../sql/engine/ClusterPerClassIntegrationTest.java | 2 +-
.../ignite/internal/sqllogic/ItSqlLogicTest.java | 2 +-
.../org/apache/ignite/internal/ssl/ItSslTest.java | 4 +-
.../ignite/internal/table/ItRoReadsTest.java | 2 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 17 +-
.../DistributedConfigurationCatchUpTest.java | 3 +-
49 files changed, 823 insertions(+), 227 deletions(-)
diff --git a/modules/api/build.gradle b/modules/api/build.gradle
index 0155eb0ba7..c53932e48f 100644
--- a/modules/api/build.gradle
+++ b/modules/api/build.gradle
@@ -34,6 +34,7 @@ dependencies {
testFixturesAnnotationProcessor libs.micronaut.inject.annotation.processor
testFixturesImplementation project(":ignite-core")
testFixturesImplementation testFixtures(project(":ignite-core"))
+ testFixturesImplementation libs.typesafe.config
testFixturesImplementation libs.hamcrest.core
testFixturesImplementation libs.micronaut.junit5
testFixturesImplementation libs.jetbrains.annotations
diff --git
a/modules/api/src/testFixtures/java/org/apache/ignite/internal/testframework/IntegrationTestBase.java
b/modules/api/src/testFixtures/java/org/apache/ignite/internal/testframework/IntegrationTestBase.java
index 860e07b637..0aa46d654e 100644
---
a/modules/api/src/testFixtures/java/org/apache/ignite/internal/testframework/IntegrationTestBase.java
+++
b/modules/api/src/testFixtures/java/org/apache/ignite/internal/testframework/IntegrationTestBase.java
@@ -129,7 +129,7 @@ public class IntegrationTestBase extends
BaseIgniteAbstractTest {
configureInitParameters(builder);
- IgnitionManager.init(builder.build());
+ TestIgnitionManager.init(builder.build());
awaitClusterInitialized();
}
diff --git
a/modules/api/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java
b/modules/api/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java
index 06244f15b9..dc4d66f463 100644
---
a/modules/api/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java
+++
b/modules/api/src/testFixtures/java/org/apache/ignite/internal/testframework/TestIgnitionManager.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.testframework;
+import com.typesafe.config.parser.ConfigDocument;
+import com.typesafe.config.parser.ConfigDocumentFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -24,6 +26,8 @@ import java.nio.file.StandardOpenOption;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.InitParameters;
+import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.lang.IgniteException;
import org.jetbrains.annotations.Nullable;
@@ -77,4 +81,39 @@ public class TestIgnitionManager {
throw new IgniteException("Couldn't write node config.", e);
}
}
+
+ /**
+ * Initializes a cluster using test defaults for cluster configuration
values that are not
+ * specified explicitly.
+ *
+ * @param parameters Init parameters.
+ * @see IgnitionManager#init(InitParameters)
+ */
+ public static void init(InitParameters parameters) {
+ IgnitionManager.init(applyTestDefaultsToClusterConfig(parameters));
+ }
+
+ private static InitParameters
applyTestDefaultsToClusterConfig(InitParameters params) {
+ InitParametersBuilder builder = new InitParametersBuilder()
+ .clusterName(params.clusterName())
+ .destinationNodeName(params.nodeName())
+ .metaStorageNodeNames(params.metaStorageNodeNames())
+ .cmgNodeNames(params.cmgNodeNames());
+
+ if (params.clusterConfiguration() == null) {
+ builder.clusterConfiguration("{ schemaSync.delayDuration: 0 }");
+ } else {
+ ConfigDocument configDocument =
ConfigDocumentFactory.parseString(params.clusterConfiguration());
+
+ String delayDurationPath = "schemaSync.delayDuration";
+
+ if (!configDocument.hasPath(delayDurationPath)) {
+ ConfigDocument updatedDocument =
configDocument.withValueText(delayDurationPath, "0");
+
+ builder.clusterConfiguration(updatedDocument.render());
+ }
+ }
+
+ return builder.build();
+ }
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index 66ea991585..ace40aa938 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -33,6 +33,7 @@ import java.util.NavigableMap;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.function.LongSupplier;
import java.util.function.Predicate;
import org.apache.ignite.internal.catalog.commands.AlterColumnParams;
import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnParams;
@@ -72,7 +73,7 @@ import org.apache.ignite.internal.catalog.storage.UpdateEntry;
import org.apache.ignite.internal.catalog.storage.UpdateLog;
import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler;
import org.apache.ignite.internal.catalog.storage.VersionedUpdate;
-import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.Producer;
@@ -118,24 +119,31 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
private final PendingComparableValuesTracker<Integer, Void> versionTracker
= new PendingComparableValuesTracker<>(0);
- private final HybridClock clock;
+ private final ClockWaiter clockWaiter;
- private final long delayDurationMs;
+ private final LongSupplier delayDurationMsSupplier;
/**
* Constructor.
*/
- public CatalogServiceImpl(UpdateLog updateLog, HybridClock clock) {
- this(updateLog, clock, DEFAULT_DELAY_DURATION);
+ public CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter) {
+ this(updateLog, clockWaiter, DEFAULT_DELAY_DURATION);
}
/**
* Constructor.
*/
- public CatalogServiceImpl(UpdateLog updateLog, HybridClock clock, long
delayDurationMs) {
+ CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter, long
delayDurationMs) {
+ this(updateLog, clockWaiter, () -> delayDurationMs);
+ }
+
+ /**
+ * Constructor.
+ */
+ public CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter,
LongSupplier delayDurationMsSupplier) {
this.updateLog = updateLog;
- this.clock = clock;
- this.delayDurationMs = delayDurationMs;
+ this.clockWaiter = clockWaiter;
+ this.delayDurationMsSupplier = delayDurationMsSupplier;
}
@Override
@@ -256,7 +264,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
@Override
public CompletableFuture<Void> createTable(CreateTableParams params) {
- return saveUpdate(catalog -> {
+ return saveUpdateAndWaitForActivation(catalog -> {
CatalogSchemaDescriptor schema = getSchema(catalog,
params.schemaName());
if (schema.table(params.tableName()) != null) {
@@ -278,7 +286,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
@Override
public CompletableFuture<Void> dropTable(DropTableParams params) {
- return saveUpdate(catalog -> {
+ return saveUpdateAndWaitForActivation(catalog -> {
CatalogSchemaDescriptor schema = getSchema(catalog,
params.schemaName());
CatalogTableDescriptor table = getTable(schema,
params.tableName());
@@ -301,7 +309,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
return completedFuture(null);
}
- return saveUpdate(catalog -> {
+ return saveUpdateAndWaitForActivation(catalog -> {
CatalogSchemaDescriptor schema = getSchema(catalog,
params.schemaName());
CatalogTableDescriptor table = getTable(schema,
params.tableName());
@@ -328,7 +336,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
return completedFuture(null);
}
- return saveUpdate(catalog -> {
+ return saveUpdateAndWaitForActivation(catalog -> {
CatalogSchemaDescriptor schema = getSchema(catalog,
params.schemaName());
CatalogTableDescriptor table = getTable(schema,
params.tableName());
@@ -343,7 +351,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
@Override
public CompletableFuture<Void> alterColumn(AlterColumnParams params) {
- return saveUpdate(catalog -> {
+ return saveUpdateAndWaitForActivation(catalog -> {
CatalogSchemaDescriptor schema = getSchema(catalog,
params.schemaName());
CatalogTableDescriptor table = getTable(schema,
params.tableName());
@@ -369,7 +377,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
@Override
public CompletableFuture<Void> createIndex(CreateHashIndexParams params) {
- return saveUpdate(catalog -> {
+ return saveUpdateAndWaitForActivation(catalog -> {
CatalogSchemaDescriptor schema = getSchema(catalog,
params.schemaName());
if (schema.index(params.indexName()) != null) {
@@ -391,7 +399,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
@Override
public CompletableFuture<Void> createIndex(CreateSortedIndexParams params)
{
- return saveUpdate(catalog -> {
+ return saveUpdateAndWaitForActivation(catalog -> {
CatalogSchemaDescriptor schema = getSchema(catalog,
params.schemaName());
if (schema.index(params.indexName()) != null) {
@@ -413,7 +421,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
@Override
public CompletableFuture<Void> dropIndex(DropIndexParams params) {
- return saveUpdate(catalog -> {
+ return saveUpdateAndWaitForActivation(catalog -> {
CatalogSchemaDescriptor schema = getSchema(catalog,
params.schemaName());
CatalogIndexDescriptor index = schema.index(params.indexName());
@@ -430,7 +438,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
@Override
public CompletableFuture<Void> createDistributionZone(CreateZoneParams
params) {
- return saveUpdate(catalog -> {
+ return saveUpdateAndWaitForActivation(catalog -> {
validateCreateZoneParams(params);
String zoneName = Objects.requireNonNull(params.zoneName(),
"zone");
@@ -450,7 +458,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
@Override
public CompletableFuture<Void> dropDistributionZone(DropZoneParams params)
{
- return saveUpdate(catalog -> {
+ return saveUpdateAndWaitForActivation(catalog -> {
CatalogZoneDescriptor zone = getZone(catalog, params.zoneName());
if (zone.name().equals(DEFAULT_ZONE_NAME)) {
@@ -476,7 +484,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
@Override
public CompletableFuture<Void> renameDistributionZone(RenameZoneParams
params) {
- return saveUpdate(catalog -> {
+ return saveUpdateAndWaitForActivation(catalog -> {
CatalogZoneDescriptor zone = getZone(catalog, params.zoneName());
if (catalog.zone(params.newZoneName()) != null) {
@@ -507,7 +515,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
@Override
public CompletableFuture<Void> alterDistributionZone(AlterZoneParams
params) {
- return saveUpdate(catalog -> {
+ return saveUpdateAndWaitForActivation(catalog -> {
CatalogZoneDescriptor zone = getZone(catalog, params.zoneName());
Integer dataNodesAutoAdjust = params.dataNodesAutoAdjust();
@@ -552,11 +560,29 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
catalogByTs.put(newCatalog.time(), newCatalog);
}
- private CompletableFuture<Void> saveUpdate(UpdateProducer updateProducer) {
- return saveUpdate(updateProducer, 0);
+ private CompletableFuture<Void>
saveUpdateAndWaitForActivation(UpdateProducer updateProducer) {
+ return saveUpdate(updateProducer, 0)
+ .thenCompose(newVersion -> {
+ Catalog catalog = catalogByVer.get(newVersion);
+
+ HybridTimestamp activationTs =
HybridTimestamp.hybridTimestamp(catalog.time());
+ HybridTimestamp clusterWideEnsuredActivationTs =
activationTs.addPhysicalTime(
+ HybridTimestamp.maxClockSkew()
+ );
+
+ return clockWaiter.waitFor(clusterWideEnsuredActivationTs);
+ });
}
- private CompletableFuture<Void> saveUpdate(UpdateProducer updateProducer,
int attemptNo) {
+ /**
+ * Attempts to save a versioned update using a CAS-like logic. If the
attempt fails, makes more attempts
+ * until the max retry count is reached.
+ *
+ * @param updateProducer Supplies simple updates to include into a
versioned update to install.
+ * @param attemptNo Ordinal number of an attempt.
+ * @return Future that completes with the new Catalog version (if update
was saved successfully) or an exception, otherwise.
+ */
+ private CompletableFuture<Integer> saveUpdate(UpdateProducer
updateProducer, int attemptNo) {
if (attemptNo >= MAX_RETRY_COUNT) {
return failedFuture(new
IgniteInternalException(Common.INTERNAL_ERR, "Max retry limit exceeded: " +
attemptNo));
}
@@ -571,18 +597,16 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
}
if (updates.isEmpty()) {
- return completedFuture(null);
+ return completedFuture(catalog.version());
}
int newVersion = catalog.version() + 1;
- //TODO https://issues.apache.org/jira/browse/IGNITE-19209 Make
activation time in the MS entry strictly equal to MS entry ts+DD
- long activationTimestamp = activationTimestamp();
- return updateLog.append(new VersionedUpdate(newVersion,
activationTimestamp, updates))
+ return updateLog.append(new VersionedUpdate(newVersion,
delayDurationMsSupplier.getAsLong(), updates))
.thenCompose(result ->
versionTracker.waitFor(newVersion).thenApply(none -> result))
.thenCompose(result -> {
if (result) {
- return completedFuture(null);
+ return completedFuture(newVersion);
}
return saveUpdate(updateProducer, attemptNo + 1);
@@ -591,7 +615,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
class OnUpdateHandlerImpl implements OnUpdateHandler {
@Override
- public void handle(VersionedUpdate update) {
+ public void handle(VersionedUpdate update, HybridTimestamp
metaStorageUpdateTimestamp) {
int version = update.version();
Catalog catalog = catalogByVer.get(version - 1);
@@ -601,7 +625,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
catalog = entry.applyUpdate(catalog);
}
- catalog = applyUpdateFinal(catalog, update);
+ catalog = applyUpdateFinal(catalog, update,
metaStorageUpdateTimestamp);
registerCatalog(catalog);
@@ -630,13 +654,6 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
}
}
- /**
- * Calculate catalog activation timestamp.
- */
- private long activationTimestamp() {
- return clock.now().addPhysicalTime(delayDurationMs).longValue();
- }
-
private static void throwUnsupportedDdl(String msg, Object... params) {
throw new SqlException(UNSUPPORTED_DDL_OPERATION_ERR, msg, params);
}
@@ -646,10 +663,12 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
List<UpdateEntry> get(Catalog catalog);
}
- private static Catalog applyUpdateFinal(Catalog catalog, VersionedUpdate
update) {
+ private static Catalog applyUpdateFinal(Catalog catalog, VersionedUpdate
update, HybridTimestamp metaStorageUpdateTimestamp) {
+ long activationTimestamp =
metaStorageUpdateTimestamp.addPhysicalTime(update.delayDurationMs()).longValue();
+
return new Catalog(
update.version(),
- update.activationTimestamp(),
+ activationTimestamp,
catalog.objectIdGenState(),
catalog.zones(),
catalog.schemas()
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java
new file mode 100644
index 0000000000..8c5c65df22
--- /dev/null
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/ClockWaiter.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.TrackerClosedException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Allows to wait for the supplied clock to reach a required timesdtamp. It
only uses the clock itself,
+ * no SafeTime mechanisms are involved.
+ */
+public class ClockWaiter implements IgniteComponent {
+ private static final IgniteLogger LOG =
Loggers.forClass(ClockWaiter.class);
+
+ private final String nodeName;
+ private final HybridClock clock;
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean stopGuard = new AtomicBoolean(false);
+
+ private final PendingComparableValuesTracker<Long, Void> nowTracker = new
PendingComparableValuesTracker<>(
+ HybridTimestamp.MIN_VALUE.longValue()
+ );
+
+ private final ClockUpdateListener updateListener = this::onUpdate;
+
+ private volatile ScheduledExecutorService scheduler;
+
+ public ClockWaiter(String nodeName, HybridClock clock) {
+ this.nodeName = nodeName;
+ this.clock = clock;
+ }
+
+ @Override
+ public void start() {
+ clock.addUpdateListener(updateListener);
+
+ scheduler = Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory(nodeName + "-clock-waiter", LOG));
+ }
+
+ @Override
+ public void stop() throws Exception {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ clock.removeUpdateListener(updateListener);
+
+ nowTracker.close();
+
+ // We do shutdownNow() right away without doing usual
shutdown()+awaitTermination() because
+ // this would make us wait till all the scheduled tasks get executed
(which might take a lot
+ // of time). An alternative would be to track all ScheduledFutures
(which are not same as the
+ // user-facing futures we return from the tracker), but we don't need
them for anything else,
+ // so it's simpler to just use shutdownNow().
+ scheduler.shutdownNow();
+ scheduler.awaitTermination(10, TimeUnit.SECONDS);
+ }
+
+ private void onUpdate(long newTs) {
+ if (!busyLock.enterBusy()) {
+ return;
+ }
+
+ try {
+ nowTracker.update(newTs, null);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Wait for the clock to reach the given timestamp.
+ *
+ * @param targetTimestamp Timestamp to wait for.
+ * @return A future that completes when the timestamp is reached by the
clock's time.
+ */
+ public CompletableFuture<Void> waitFor(HybridTimestamp targetTimestamp) {
+ if (!busyLock.enterBusy()) {
+ return failedFuture(new NodeStoppingException());
+ }
+
+ try {
+ return doWaitFor(targetTimestamp);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ private CompletableFuture<Void> doWaitFor(HybridTimestamp targetTimestamp)
{
+ CompletableFuture<Void> future =
nowTracker.waitFor(targetTimestamp.longValue());
+
+ ScheduledFuture<?> scheduledFuture;
+
+ if (!future.isDone()) {
+ // This triggers a clock update.
+ HybridTimestamp now = clock.now();
+
+ if (targetTimestamp.compareTo(now) <= 0) {
+ assert future.isDone();
+
+ scheduledFuture = null;
+ } else {
+ // Adding 1 to account for a possible non-null logical part of
the targetTimestamp.
+ long millisToWait = targetTimestamp.getPhysical() -
now.getPhysical() + 1;
+
+ scheduledFuture = scheduler.schedule(this::triggerClockUpdate,
millisToWait, TimeUnit.MILLISECONDS);
+ }
+ } else {
+ scheduledFuture = null;
+ }
+
+ return future.handle((res, ex) -> {
+ if (scheduledFuture != null) {
+ scheduledFuture.cancel(true);
+ }
+
+ if (ex != null) {
+ // Let's replace a TrackerClosedException with a
CancellationException as the latter makes more sense for the clients.
+ if (ex instanceof TrackerClosedException) {
+ throw new CancellationException();
+ } else {
+ throw new CompletionException(ex);
+ }
+ }
+
+ return res;
+ });
+ }
+
+ private void triggerClockUpdate() {
+ clock.now();
+ }
+}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/configuration/SchemaSynchronizationConfigurationSchema.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/configuration/SchemaSynchronizationConfigurationSchema.java
index f5be7596b6..0cbbda6444 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/configuration/SchemaSynchronizationConfigurationSchema.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/configuration/SchemaSynchronizationConfigurationSchema.java
@@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.ignite.configuration.annotation.ConfigurationRoot;
import org.apache.ignite.configuration.annotation.ConfigurationType;
import org.apache.ignite.configuration.annotation.Value;
-import org.apache.ignite.configuration.validation.Immutable;
import org.apache.ignite.configuration.validation.Range;
/**
@@ -33,7 +32,7 @@ import org.apache.ignite.configuration.validation.Range;
public class SchemaSynchronizationConfigurationSchema {
/** Delay Duration (ms), see the spec for details. */
@Value(hasDefault = true)
- @Immutable
@Range(min = 0)
+ // TODO: IGNITE-19792 - make @Immutable when it gets being handled
property for distributed config.
public long delayDuration = TimeUnit.SECONDS.toMillis(1);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
index 8c5e1a886b..94124dae55 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.catalog.storage;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.lang.IgniteInternalException;
@@ -67,7 +68,8 @@ public interface UpdateLog extends IgniteComponent {
* An actual handler that will be invoked when new update is appended
to the log.
*
* @param update A new update.
+ * @param metaStorageUpdateTimestamp Timestamp assigned to the update
by the Metastorage.
*/
- void handle(VersionedUpdate update);
+ void handle(VersionedUpdate update, HybridTimestamp
metaStorageUpdateTimestamp);
}
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index 3652a89fa6..5427173ef8 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -27,8 +27,10 @@ import static
org.apache.ignite.internal.metastorage.dsl.Statements.iif;
import static org.apache.ignite.internal.util.ByteUtils.fromBytes;
import static org.apache.ignite.internal.util.ByteUtils.intToBytes;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
@@ -39,8 +41,6 @@ import
org.apache.ignite.internal.metastorage.dsl.StatementResult;
import org.apache.ignite.internal.metastorage.dsl.Update;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
-import org.apache.ignite.internal.vault.VaultEntry;
-import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.ErrorGroups.Common;
import org.apache.ignite.lang.IgniteException;
@@ -56,7 +56,6 @@ public class UpdateLogImpl implements UpdateLog {
private final AtomicBoolean stopGuard = new AtomicBoolean();
private final MetaStorageManager metastore;
- private final VaultManager vault;
private volatile OnUpdateHandler onUpdateHandler;
private volatile @Nullable UpdateListener listener = null;
@@ -65,13 +64,8 @@ public class UpdateLogImpl implements UpdateLog {
* Creates the object.
*
* @param metastore A metastore is used to store and distribute updates
across the cluster.
- * @param vault A vault is used to recover state and replay updates on
start.
*/
- public UpdateLogImpl(
- MetaStorageManager metastore,
- VaultManager vault
- ) {
- this.vault = vault;
+ public UpdateLogImpl(MetaStorageManager metastore) {
this.metastore = metastore;
}
@@ -94,7 +88,7 @@ public class UpdateLogImpl implements UpdateLog {
restoreStateFromVault(handler);
- UpdateListener listener = new UpdateListener(handler);
+ UpdateListener listener = new UpdateListener(onUpdateHandler);
this.listener = listener;
metastore.registerPrefixWatch(CatalogKey.updatePrefix(), listener);
@@ -156,19 +150,22 @@ public class UpdateLogImpl implements UpdateLog {
}
private void restoreStateFromVault(OnUpdateHandler handler) {
+ long appliedRevision = metastore.appliedRevision();
+
int ver = 1;
// TODO: IGNITE-19790 Read range from metastore
while (true) {
- VaultEntry entry = vault.get(CatalogKey.update(ver++)).join();
+ ByteArray key = CatalogKey.update(ver++);
+ Entry entry = metastore.getLocally(key.bytes(), appliedRevision);
- if (entry == null) {
+ if (entry.empty() || entry.tombstone()) {
break;
}
- VersionedUpdate update = fromBytes(entry.value());
+ VersionedUpdate update =
fromBytes(Objects.requireNonNull(entry.value()));
- handler.handle(update);
+ handler.handle(update,
metastore.timestampByRevision(entry.revision()));
}
}
@@ -193,11 +190,10 @@ public class UpdateLogImpl implements UpdateLog {
private static class UpdateListener implements WatchListener {
private final OnUpdateHandler onUpdateHandler;
- UpdateListener(OnUpdateHandler onUpdateHandler) {
+ private UpdateListener(OnUpdateHandler onUpdateHandler) {
this.onUpdateHandler = onUpdateHandler;
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
for (EntryEvent eventEntry : event.entryEvents()) {
@@ -210,13 +206,12 @@ public class UpdateLogImpl implements UpdateLog {
VersionedUpdate update = fromBytes(payload);
- onUpdateHandler.handle(update);
+ onUpdateHandler.handle(update, event.timestamp());
}
return CompletableFuture.completedFuture(null);
}
- /** {@inheritDoc} */
@Override
public void onError(Throwable e) {
assert false;
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java
index 437534ccf9..8ad4da749b 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/VersionedUpdate.java
@@ -31,7 +31,7 @@ public class VersionedUpdate implements Serializable {
private final int version;
- private final long activationTimestamp;
+ private final long delayDurationMs;
@IgniteToStringInclude
private final List<UpdateEntry> entries;
@@ -40,12 +40,13 @@ public class VersionedUpdate implements Serializable {
* Constructs the object.
*
* @param version A version the changes relate to.
- * @param activationTimestamp Timestamp given changes become active at.
+ * @param delayDurationMs Delay duration that, when added to the update's
entry timestamp assigned by the MetaStorage, will produce the
+ * activation timestamp (milliseconds).
* @param entries A list of changes.
*/
- public VersionedUpdate(int version, long activationTimestamp,
List<UpdateEntry> entries) {
+ public VersionedUpdate(int version, long delayDurationMs,
List<UpdateEntry> entries) {
this.version = version;
- this.activationTimestamp = activationTimestamp;
+ this.delayDurationMs = delayDurationMs;
this.entries = List.copyOf(
Objects.requireNonNull(entries, "entries")
);
@@ -56,9 +57,9 @@ public class VersionedUpdate implements Serializable {
return version;
}
- /** Returns activation timestamp. */
- public long activationTimestamp() {
- return activationTimestamp;
+ /** Returns Delay Duration for this update (in milliseconds). */
+ public long delayDurationMs() {
+ return delayDurationMs;
}
/** Returns list of changes. */
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
index 2c72ace9b4..cb4b7ad325 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
@@ -26,6 +26,9 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -37,6 +40,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
@@ -48,6 +53,7 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.commands.AlterColumnParams;
@@ -90,6 +96,7 @@ import
org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.catalog.storage.VersionedUpdate;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.manager.EventListener;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
@@ -117,7 +124,6 @@ import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.EnumSource.Mode;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
-import org.mockito.Mockito;
/**
* Catalog service self test.
@@ -135,23 +141,28 @@ public class CatalogServiceSelfTest {
private VaultManager vault;
+ private UpdateLog updateLog;
+
private CatalogServiceImpl service;
private HybridClock clock;
+ private ClockWaiter clockWaiter;
+
@BeforeEach
void setUp() {
vault = new VaultManager(new InMemoryVaultService());
- metastore = StandaloneMetaStorageManager.create(
- vault, new SimpleInMemoryKeyValueStorage("test")
- );
+ metastore = StandaloneMetaStorageManager.create(vault, new
SimpleInMemoryKeyValueStorage("test"));
clock = new HybridClockImpl();
- service = new CatalogServiceImpl(new UpdateLogImpl(metastore, vault),
clock, 0L);
+ clockWaiter = spy(new ClockWaiter("test", clock));
+ updateLog = spy(new UpdateLogImpl(metastore));
+ service = new CatalogServiceImpl(updateLog, clockWaiter);
vault.start();
metastore.start();
+ clockWaiter.start();
service.start();
assertThat("Watches were not deployed", metastore.deployWatches(),
willCompleteSuccessfully());
@@ -160,6 +171,7 @@ public class CatalogServiceSelfTest {
@AfterEach
public void tearDown() throws Exception {
service.stop();
+ clockWaiter.stop();
metastore.stop();
vault.stop();
}
@@ -211,7 +223,7 @@ public class CatalogServiceSelfTest {
CompletableFuture<Void> fut = service.createTable(params);
- assertThat(fut, willBe((Object) null));
+ assertThat(fut, willBe(nullValue()));
// Validate catalog version from the past.
CatalogSchemaDescriptor schema = service.schema(0);
@@ -245,7 +257,7 @@ public class CatalogServiceSelfTest {
assertEquals(1L, table.zoneId());
// Validate another table creation.
- assertThat(service.createTable(simpleTable(TABLE_NAME_2)),
willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME_2)),
willBe(nullValue()));
// Validate actual catalog has both tables.
schema = service.schema(2);
@@ -290,14 +302,14 @@ public class CatalogServiceSelfTest {
@Test
public void testDropTable() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
- assertThat(service.createTable(simpleTable(TABLE_NAME_2)),
willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(service.createTable(simpleTable(TABLE_NAME_2)),
willBe(nullValue()));
long beforeDropTimestamp = clock.nowLong();
DropTableParams dropTableParams =
DropTableParams.builder().schemaName(SCHEMA_NAME).tableName(TABLE_NAME).build();
- assertThat(service.dropTable(dropTableParams), willBe((Object) null));
+ assertThat(service.dropTable(dropTableParams), willBe(nullValue()));
// Validate catalog version from the past.
CatalogSchemaDescriptor schema = service.schema(2);
@@ -337,7 +349,7 @@ public class CatalogServiceSelfTest {
@Test
public void testAddColumn() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
AlterTableAddColumnParams params = AlterTableAddColumnParams.builder()
.schemaName(SCHEMA_NAME)
@@ -353,7 +365,7 @@ public class CatalogServiceSelfTest {
long beforeAddedTimestamp = clock.nowLong();
- assertThat(service.addColumn(params), willBe((Object) null));
+ assertThat(service.addColumn(params), willBe(nullValue()));
// Validate catalog version from the past.
CatalogSchemaDescriptor schema =
service.activeSchema(beforeAddedTimestamp);
@@ -384,7 +396,7 @@ public class CatalogServiceSelfTest {
@Test
public void testDropColumn() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
// Validate dropping column
AlterTableDropColumnParams params =
AlterTableDropColumnParams.builder()
@@ -395,7 +407,7 @@ public class CatalogServiceSelfTest {
long beforeAddedTimestamp = clock.nowLong();
- assertThat(service.dropColumn(params), willBe((Object) null));
+ assertThat(service.dropColumn(params), willBe(nullValue()));
// Validate catalog version from the past.
CatalogSchemaDescriptor schema =
service.activeSchema(beforeAddedTimestamp);
@@ -437,8 +449,8 @@ public class CatalogServiceSelfTest {
@Test
public void testDropIndexedColumn() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
- assertThat(service.createIndex(simpleIndex(INDEX_NAME, TABLE_NAME)),
willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(service.createIndex(simpleIndex(INDEX_NAME, TABLE_NAME)),
willBe(nullValue()));
// Try to drop indexed column
AlterTableDropColumnParams params =
AlterTableDropColumnParams.builder()
@@ -470,7 +482,7 @@ public class CatalogServiceSelfTest {
@Test
public void testAddDropMultipleColumns() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
// Add duplicate column.
AlterTableAddColumnParams addColumnParams =
AlterTableAddColumnParams.builder()
@@ -499,7 +511,7 @@ public class CatalogServiceSelfTest {
))
.build();
- assertThat(service.addColumn(addColumnParams), willBe((Object) null));
+ assertThat(service.addColumn(addColumnParams), willBe(nullValue()));
// Validate both columns added.
schema = service.activeSchema(clock.nowLong());
@@ -514,7 +526,7 @@ public class CatalogServiceSelfTest {
.columns(Set.of(NEW_COLUMN_NAME, NEW_COLUMN_NAME_2))
.build();
- assertThat(service.dropColumn(dropColumnParams), willBe((Object)
null));
+ assertThat(service.dropColumn(dropColumnParams), willBe(nullValue()));
// Validate both columns dropped.
schema = service.activeSchema(clock.nowLong());
@@ -544,7 +556,7 @@ public class CatalogServiceSelfTest {
*/
@Test
public void testAlterColumnDefault() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
int schemaVer = 1;
assertNotNull(service.schema(schemaVer));
@@ -552,27 +564,27 @@ public class CatalogServiceSelfTest {
// NULL-> NULL : No-op.
assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () ->
DefaultValue.constant(null)),
- willBe((Object) null));
+ willBe(nullValue()));
assertNull(service.schema(schemaVer + 1));
// NULL -> 1 : Ok.
assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () ->
DefaultValue.constant(1)),
- willBe((Object) null));
+ willBe(nullValue()));
assertNotNull(service.schema(++schemaVer));
// 1 -> 1 : No-op.
assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () ->
DefaultValue.constant(1)),
- willBe((Object) null));
+ willBe(nullValue()));
assertNull(service.schema(schemaVer + 1));
// 1 -> 2 : Ok.
assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () ->
DefaultValue.constant(2)),
- willBe((Object) null));
+ willBe(nullValue()));
assertNotNull(service.schema(++schemaVer));
// 2 -> NULL : Ok.
assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () ->
DefaultValue.constant(null)),
- willBe((Object) null));
+ willBe(nullValue()));
assertNotNull(service.schema(++schemaVer));
}
@@ -586,7 +598,7 @@ public class CatalogServiceSelfTest {
*/
@Test
public void testAlterColumnNotNull() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
int schemaVer = 1;
assertNotNull(service.schema(schemaVer));
@@ -594,12 +606,12 @@ public class CatalogServiceSelfTest {
// NULLABLE -> NULLABLE : No-op.
// NOT NULL -> NOT NULL : No-op.
- assertThat(changeColumn(TABLE_NAME, "VAL", null, false, null),
willBe((Object) null));
- assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", null, true, null),
willBe((Object) null));
+ assertThat(changeColumn(TABLE_NAME, "VAL", null, false, null),
willBe(nullValue()));
+ assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", null, true, null),
willBe(nullValue()));
assertNull(service.schema(schemaVer + 1));
// NOT NULL -> NULlABLE : Ok.
- assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", null, false,
null), willBe((Object) null));
+ assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", null, false,
null), willBe(nullValue()));
assertNotNull(service.schema(++schemaVer));
// DROP NOT NULL for PK : Error.
@@ -629,7 +641,7 @@ public class CatalogServiceSelfTest {
ColumnParams pkCol =
ColumnParams.builder().name("ID").type(ColumnType.INT32).build();
ColumnParams col = ColumnParams.builder().name("COL_" +
type).type(type).build();
- assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col))), willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col))), willBe(nullValue()));
int schemaVer = 1;
assertNotNull(service.schema(schemaVer));
@@ -637,20 +649,20 @@ public class CatalogServiceSelfTest {
// ANY-> UNDEFINED PRECISION : No-op.
assertThat(changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type()), null, null),
- willBe((Object) null));
+ willBe(nullValue()));
assertNull(service.schema(schemaVer + 1));
// UNDEFINED PRECISION -> 10 : Ok.
assertThat(
changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type(), 10, null, null), null, null),
- willBe((Object) null)
+ willBe(nullValue())
);
assertNotNull(service.schema(++schemaVer));
// 10 -> 11 : Ok.
assertThat(
changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type(), 11, null, null), null, null),
- willBe((Object) null)
+ willBe(nullValue())
);
CatalogSchemaDescriptor schema = service.schema(++schemaVer);
@@ -676,7 +688,7 @@ public class CatalogServiceSelfTest {
ColumnParams col =
ColumnParams.builder().name("COL").type(type).build();
ColumnParams colWithPrecision =
ColumnParams.builder().name("COL_PRECISION").type(type).precision(10).build();
- assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col, colWithPrecision))), willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col, colWithPrecision))), willBe(nullValue()));
int schemaVer = 1;
assertNotNull(service.schema(schemaVer));
@@ -686,7 +698,7 @@ public class CatalogServiceSelfTest {
willThrowFast(SqlException.class, "Cannot change precision for
column '" + col.name() + "'"));
assertThat(changeColumn(TABLE_NAME, colWithPrecision.name(), new
TestColumnTypeParams(type, 10, null, null), null, null),
- willBe((Object) null));
+ willBe(nullValue()));
assertThat(changeColumn(TABLE_NAME, colWithPrecision.name(), new
TestColumnTypeParams(type, 9, null, null), null, null),
willThrowFast(SqlException.class, "Cannot change precision for
column '" + colWithPrecision.name() + "'"));
@@ -711,7 +723,7 @@ public class CatalogServiceSelfTest {
ColumnParams pkCol =
ColumnParams.builder().name("ID").type(ColumnType.INT32).build();
ColumnParams col = ColumnParams.builder().name("COL_" +
type).type(type).build();
- assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col))), willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col))), willBe(nullValue()));
int schemaVer = 1;
assertNotNull(service.schema(schemaVer));
@@ -719,20 +731,20 @@ public class CatalogServiceSelfTest {
// ANY-> UNDEFINED LENGTH : No-op.
assertThat(changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type()), null, null),
- willBe((Object) null));
+ willBe(nullValue()));
assertNull(service.schema(schemaVer + 1));
// UNDEFINED LENGTH -> 10 : Ok.
assertThat(
changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type(), null, 10, null), null, null),
- willBe((Object) null)
+ willBe(nullValue())
);
assertNotNull(service.schema(++schemaVer));
// 10 -> 11 : Ok.
assertThat(
changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type(), null, 11, null), null, null),
- willBe((Object) null)
+ willBe(nullValue())
);
CatalogSchemaDescriptor schema = service.schema(++schemaVer);
@@ -758,7 +770,7 @@ public class CatalogServiceSelfTest {
ColumnParams col =
ColumnParams.builder().name("COL").type(type).build();
ColumnParams colWithLength =
ColumnParams.builder().name("COL_PRECISION").type(type).length(10).build();
- assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col, colWithLength))), willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col, colWithLength))), willBe(nullValue()));
int schemaVer = 1;
assertNotNull(service.schema(schemaVer));
@@ -768,7 +780,7 @@ public class CatalogServiceSelfTest {
willThrowFast(SqlException.class, "Cannot change length for
column '" + col.name() + "'"));
assertThat(changeColumn(TABLE_NAME, colWithLength.name(), new
TestColumnTypeParams(type, null, 10, null), null, null),
- willBe((Object) null));
+ willBe(nullValue()));
assertThat(changeColumn(TABLE_NAME, colWithLength.name(), new
TestColumnTypeParams(type, null, 9, null), null, null),
willThrowFast(SqlException.class, "Cannot change length for
column '" + colWithLength.name() + "'"));
@@ -784,7 +796,7 @@ public class CatalogServiceSelfTest {
public void testAlterColumnTypeScaleIsRejected(ColumnType type) {
ColumnParams pkCol =
ColumnParams.builder().name("ID").type(ColumnType.INT32).build();
ColumnParams col = ColumnParams.builder().name("COL_" +
type).type(type).scale(3).build();
- assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col))), willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col))), willBe(nullValue()));
int schemaVer = 1;
assertNotNull(service.schema(schemaVer));
@@ -792,12 +804,12 @@ public class CatalogServiceSelfTest {
// ANY-> UNDEFINED SCALE : No-op.
assertThat(changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type()), null, null),
- willBe((Object) null));
+ willBe(nullValue()));
assertNull(service.schema(schemaVer + 1));
// 3 -> 3 : No-op.
assertThat(changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type(), null, null, 3), null, null),
- willBe((Object) null));
+ willBe(nullValue()));
assertNull(service.schema(schemaVer + 1));
// 3 -> 4 : Error.
@@ -836,7 +848,7 @@ public class CatalogServiceSelfTest {
CreateTableParams createTableParams = simpleTable(TABLE_NAME,
tableColumns);
- assertThat(service.createTable(createTableParams), willBe((Object)
null));
+ assertThat(service.createTable(createTableParams),
willBe(nullValue()));
int schemaVer = 1;
assertNotNull(service.schema(schemaVer));
@@ -847,7 +859,7 @@ public class CatalogServiceSelfTest {
boolean sameType = col.type() == target;
if (sameType ||
CatalogUtils.isSupportedColumnTypeChange(col.type(), target)) {
- matcher = willBe((Object) null);
+ matcher = willBe(nullValue());
schemaVer += sameType ? 0 : 1;
} else {
matcher = willThrowFast(SqlException.class,
@@ -864,7 +876,7 @@ public class CatalogServiceSelfTest {
@Test
public void testAlterColumnTypeRejectedForPrimaryKey() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
assertThat(changeColumn(TABLE_NAME, "ID", new
TestColumnTypeParams(ColumnType.INT64), null, null),
willThrowFast(SqlException.class, "Cannot change data type for
primary key column 'ID'."));
@@ -876,7 +888,7 @@ public class CatalogServiceSelfTest {
*/
@Test
public void testAlterColumnMultipleChanges() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
int schemaVer = 1;
assertNotNull(service.schema(schemaVer));
@@ -887,7 +899,7 @@ public class CatalogServiceSelfTest {
TestColumnTypeParams typeParams = new
TestColumnTypeParams(ColumnType.INT64);
// Ensures that 3 different actions applied.
- assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams,
notNull, dflt), willBe((Object) null));
+ assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams,
notNull, dflt), willBe(nullValue()));
CatalogSchemaDescriptor schema = service.schema(++schemaVer);
assertNotNull(schema);
@@ -899,14 +911,14 @@ public class CatalogServiceSelfTest {
// Ensures that only one of three actions applied.
dflt = () -> DefaultValue.constant(2);
- assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams,
notNull, dflt), willBe((Object) null));
+ assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams,
notNull, dflt), willBe(nullValue()));
schema = service.schema(++schemaVer);
assertNotNull(schema);
assertEquals(DefaultValue.constant(2),
schema.table(TABLE_NAME).column("VAL_NOT_NULL").defaultValue());
// Ensures that no action will be applied.
- assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams,
notNull, dflt), willBe((Object) null));
+ assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams,
notNull, dflt), willBe(nullValue()));
assertNull(service.schema(schemaVer + 1));
}
@@ -929,14 +941,14 @@ public class CatalogServiceSelfTest {
.columns(List.of("VAL"))
.build();
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
- assertThat(service.createIndex(params), willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(service.createIndex(params), willBe(nullValue()));
long beforeDropTimestamp = clock.nowLong();
DropTableParams dropTableParams =
DropTableParams.builder().schemaName("PUBLIC").tableName(TABLE_NAME).build();
- assertThat(service.dropTable(dropTableParams), willBe((Object) null));
+ assertThat(service.dropTable(dropTableParams), willBe(nullValue()));
// Validate catalog version from the past.
CatalogSchemaDescriptor schema = service.schema(2);
@@ -971,7 +983,7 @@ public class CatalogServiceSelfTest {
@Test
public void testCreateHashIndex() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
CreateHashIndexParams params = CreateHashIndexParams.builder()
.indexName(INDEX_NAME)
@@ -979,7 +991,7 @@ public class CatalogServiceSelfTest {
.columns(List.of("VAL", "ID"))
.build();
- assertThat(service.createIndex(params), willBe((Object) null));
+ assertThat(service.createIndex(params), willBe(nullValue()));
// Validate catalog version from the past.
CatalogSchemaDescriptor schema = service.schema(1);
@@ -1010,7 +1022,7 @@ public class CatalogServiceSelfTest {
@Test
public void testCreateSortedIndex() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
CreateSortedIndexParams params = CreateSortedIndexParams.builder()
.indexName(INDEX_NAME)
@@ -1020,7 +1032,7 @@ public class CatalogServiceSelfTest {
.collations(List.of(CatalogColumnCollation.DESC_NULLS_FIRST,
CatalogColumnCollation.ASC_NULLS_LAST))
.build();
- assertThat(service.createIndex(params), willBe((Object) null));
+ assertThat(service.createIndex(params), willBe(nullValue()));
// Validate catalog version from the past.
CatalogSchemaDescriptor schema = service.schema(1);
@@ -1054,7 +1066,7 @@ public class CatalogServiceSelfTest {
@Test
public void testCreateIndexWithSameName() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
CreateHashIndexParams params = CreateHashIndexParams.builder()
.indexName(INDEX_NAME)
@@ -1062,13 +1074,13 @@ public class CatalogServiceSelfTest {
.columns(List.of("VAL"))
.build();
- assertThat(service.createIndex(params), willBe((Object) null));
+ assertThat(service.createIndex(params), willBe(nullValue()));
assertThat(service.createIndex(params),
willThrow(IndexAlreadyExistsException.class));
}
@Test
public void testCreateIndexOnDuplicateColumns() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
CreateHashIndexParams params = CreateHashIndexParams.builder()
.indexName(INDEX_NAME)
@@ -1088,7 +1100,7 @@ public class CatalogServiceSelfTest {
doNothing().when(updateLogMock).registerUpdateHandler(updateHandlerCapture.capture());
- CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock,
clock);
+ CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock,
clockWaiter);
service.start();
when(updateLogMock.append(any())).thenAnswer(invocation -> {
@@ -1099,11 +1111,11 @@ public class CatalogServiceSelfTest {
VersionedUpdate update = new VersionedUpdate(
updateFromInvocation.version(),
- updateFromInvocation.activationTimestamp(),
+ updateFromInvocation.delayDurationMs(),
List.of(new ObjectIdGenUpdateEntry(1))
);
- updateHandlerCapture.getValue().handle(update);
+ updateHandlerCapture.getValue().handle(update, clock.now());
return completedFuture(false);
});
@@ -1118,20 +1130,12 @@ public class CatalogServiceSelfTest {
@Test
public void catalogActivationTime() throws Exception {
- final int delayDuration = 3_000;
+ final long delayDuration = TimeUnit.DAYS.toMillis(365);
- InMemoryVaultService vaultService = new InMemoryVaultService();
- VaultManager vault = new VaultManager(vaultService);
- StandaloneMetaStorageManager metaStorageManager =
StandaloneMetaStorageManager.create(vault);
- UpdateLog updateLogMock = Mockito.spy(new
UpdateLogImpl(metaStorageManager, vault));
- CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock,
clock, delayDuration);
+ CatalogServiceImpl service = new CatalogServiceImpl(updateLog,
clockWaiter, delayDuration);
- vault.start();
- metaStorageManager.start();
service.start();
- assertThat("Watches were not deployed",
metaStorageManager.deployWatches(), willCompleteSuccessfully());
-
try {
CreateTableParams params = CreateTableParams.builder()
.schemaName(SCHEMA_NAME)
@@ -1143,11 +1147,13 @@ public class CatalogServiceSelfTest {
.primaryKeyColumns(List.of("key"))
.build();
- CompletableFuture<Void> fut = service.createTable(params);
+ service.createTable(params);
+
+ verify(updateLog).append(any());
+ // TODO IGNITE-19400: recheck createTable future completion
guarantees
- verify(updateLogMock).append(any());
- // TODO IGNITE-19400: recheck future completion guarantees
- assertThat(fut, willBe((Object) null));
+ // This waits till the new Catalog version lands in the internal
structures.
+ verify(clockWaiter, timeout(10_000)).waitFor(any());
assertSame(service.schema(0),
service.activeSchema(clock.nowLong()));
assertNull(service.table(TABLE_NAME, clock.nowLong()));
@@ -1158,8 +1164,6 @@ public class CatalogServiceSelfTest {
assertNotNull(service.table(TABLE_NAME, clock.nowLong()));
} finally {
service.stop();
- metaStorageManager.stop();
- vault.stop();
}
}
@@ -1167,7 +1171,7 @@ public class CatalogServiceSelfTest {
public void catalogServiceManagesUpdateLogLifecycle() throws Exception {
UpdateLog updateLogMock = mock(UpdateLog.class);
- CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock,
mock(HybridClock.class));
+ CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock,
clockWaiter);
service.start();
@@ -1201,10 +1205,10 @@ public class CatalogServiceSelfTest {
service.listen(CatalogEvent.TABLE_CREATE, eventListener);
service.listen(CatalogEvent.TABLE_DROP, eventListener);
- assertThat(service.createTable(createTableParams), willBe((Object)
null));
+ assertThat(service.createTable(createTableParams),
willBe(nullValue()));
verify(eventListener).notify(any(CreateTableEventParameters.class),
ArgumentMatchers.isNull());
- assertThat(service.dropTable(dropTableparams), willBe((Object) null));
+ assertThat(service.dropTable(dropTableparams), willBe(nullValue()));
verify(eventListener).notify(any(DropTableEventParameters.class),
ArgumentMatchers.isNull());
verifyNoMoreInteractions(eventListener);
@@ -1558,14 +1562,14 @@ public class CatalogServiceSelfTest {
verifyNoInteractions(eventListener);
// Create table.
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
// Add column.
- assertThat(service.addColumn(addColumnParams), willBe((Object) null));
+ assertThat(service.addColumn(addColumnParams), willBe(nullValue()));
verify(eventListener).notify(any(AddColumnEventParameters.class),
ArgumentMatchers.isNull());
// Drop column.
- assertThat(service.dropColumn(dropColumnParams), willBe((Object)
null));
+ assertThat(service.dropColumn(dropColumnParams), willBe(nullValue()));
verify(eventListener).notify(any(DropColumnEventParameters.class),
ArgumentMatchers.isNull());
// Try drop column once again.
@@ -1574,6 +1578,44 @@ public class CatalogServiceSelfTest {
verifyNoMoreInteractions(eventListener);
}
+ @Test
+ public void userFutureCompletesAfterClusterWideActivationHappens() throws
Exception {
+ final long delayDuration = TimeUnit.DAYS.toMillis(365);
+
+ HybridTimestamp startTs = clock.now();
+
+ CatalogServiceImpl service = new CatalogServiceImpl(updateLog,
clockWaiter, delayDuration);
+
+ service.start();
+
+ try {
+ CreateTableParams params = CreateTableParams.builder()
+ .schemaName(SCHEMA_NAME)
+ .tableName(TABLE_NAME)
+ .columns(List.of(
+
ColumnParams.builder().name("key").type(ColumnType.INT32).build(),
+
ColumnParams.builder().name("val").type(ColumnType.INT32).nullable(true).build()
+ ))
+ .primaryKeyColumns(List.of("key"))
+ .build();
+
+ CompletableFuture<Void> future = service.createTable(params);
+
+ assertThat(future.isDone(), is(false));
+
+ ArgumentCaptor<HybridTimestamp> tsCaptor =
ArgumentCaptor.forClass(HybridTimestamp.class);
+
+ verify(clockWaiter, timeout(10_000)).waitFor(tsCaptor.capture());
+ HybridTimestamp userWaitTs = tsCaptor.getValue();
+ assertThat(
+ userWaitTs.getPhysical() - startTs.getPhysical(),
+ greaterThanOrEqualTo(delayDuration +
HybridTimestamp.maxClockSkew())
+ );
+ } finally {
+ service.stop();
+ }
+ }
+
@Test
void testGetCatalogEntityInCatalogEvent() {
CompletableFuture<Void> result = new CompletableFuture<>();
@@ -1592,7 +1634,7 @@ public class CatalogServiceSelfTest {
}
});
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
assertThat(result, willCompleteSuccessfully());
}
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java
new file mode 100644
index 0000000000..e970e4027c
--- /dev/null
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/ClockWaiterTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class ClockWaiterTest {
+ private ClockWaiter waiter;
+
+ private final HybridClock clock = new HybridClockImpl();
+
+ @BeforeEach
+ void createWaiter() {
+ waiter = new ClockWaiter("test", clock);
+
+ waiter.start();
+ }
+
+ @AfterEach
+ void cleanup() throws Exception {
+ if (waiter != null) {
+ waiter.stop();
+ }
+ }
+
+ @Test
+ void futureCompletesImmediatelyOnPassedMoment() {
+ CompletableFuture<Void> future = waiter.waitFor(clock.now());
+
+ assertThat(future.isDone(), is(true));
+ }
+
+ @Test
+ void futureCompletesWhenClockGetsUpdatedToSufficientTimestamp() {
+ HybridTimestamp oneYearAhead = getOneYearAhead();
+
+ CompletableFuture<Void> future = waiter.waitFor(oneYearAhead);
+
+ assertThat(future.isDone(), is(false));
+
+ clock.update(oneYearAhead);
+
+ assertThat(future.isDone(), is(true));
+ }
+
+ private HybridTimestamp getOneYearAhead() {
+ return clock.now().addPhysicalTime(TimeUnit.DAYS.toMillis(365));
+ }
+
+ @Test
+ void futureCompletesWithoutClockUpdates() {
+ HybridTimestamp littleAhead = clock.now().addPhysicalTime(200);
+
+ CompletableFuture<Void> future = waiter.waitFor(littleAhead);
+
+ assertThat(future, willSucceedIn(10, TimeUnit.SECONDS));
+ }
+
+ @Test
+ void futureGetsCancelledOnStop() throws Exception {
+ HybridTimestamp oneYearAhead = getOneYearAhead();
+
+ CompletableFuture<Void> future = waiter.waitFor(oneYearAhead);
+
+ waiter.stop();
+
+ assertThat(future, willThrow(CancellationException.class));
+ }
+}
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
index 706ca47bb3..b7e6af23c2 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
@@ -57,9 +57,7 @@ class UpdateLogImplTest {
void setUp() {
vault = new VaultManager(new InMemoryVaultService());
- metastore = StandaloneMetaStorageManager.create(
- vault, new SimpleInMemoryKeyValueStorage("test")
- );
+ metastore = StandaloneMetaStorageManager.create(vault, new
SimpleInMemoryKeyValueStorage("test"));
vault.start();
metastore.start();
@@ -74,11 +72,11 @@ class UpdateLogImplTest {
@Test
public void logReplayedOnStart() throws Exception {
// first, let's append a few entries to the log
- UpdateLogImpl updateLog = new UpdateLogImpl(metastore, vault);
+ UpdateLogImpl updateLog = createUpdateLogImpl();
long revisionBefore = metastore.appliedRevision();
- updateLog.registerUpdateHandler(update -> {/* no-op */});
+ updateLog.registerUpdateHandler((update, ts) -> {/* no-op */});
updateLog.start();
assertThat("Watches were not deployed", metastore.deployWatches(),
willCompleteSuccessfully());
@@ -104,18 +102,22 @@ class UpdateLogImplTest {
// now let's create new component over a stuffed vault/metastore
// and check if log is replayed on start
- updateLog = new UpdateLogImpl(metastore, vault);
+ updateLog = createUpdateLogImpl();
List<VersionedUpdate> actualLog = new ArrayList<>();
- updateLog.registerUpdateHandler(actualLog::add);
+ updateLog.registerUpdateHandler((update, ts) -> actualLog.add(update));
updateLog.start();
assertEquals(expectedLog, actualLog);
}
+ private UpdateLogImpl createUpdateLogImpl() {
+ return new UpdateLogImpl(metastore);
+ }
+
@Test
public void exceptionIsThrownOnStartIfHandlerHasNotBeenRegistered() {
- UpdateLogImpl updateLog = new UpdateLogImpl(metastore, vault);
+ UpdateLogImpl updateLog = createUpdateLogImpl();
IgniteInternalException ex = assertThrows(
IgniteInternalException.class,
@@ -131,10 +133,10 @@ class UpdateLogImplTest {
@ParameterizedTest
@ValueSource(ints = {1, 2, 4, 8})
public void appendAcceptsUpdatesInOrder(int startVersion) throws Exception
{
- UpdateLogImpl updateLog = new UpdateLogImpl(metastore, vault);
+ UpdateLogImpl updateLog = createUpdateLogImpl();
List<Integer> appliedVersions = new ArrayList<>();
- updateLog.registerUpdateHandler(update ->
appliedVersions.add(update.version()));
+ updateLog.registerUpdateHandler((update, ts) ->
appliedVersions.add(update.version()));
updateLog.start();
@@ -174,7 +176,7 @@ class UpdateLogImplTest {
}
private static VersionedUpdate singleEntryUpdateOfVersion(int version) {
- return new VersionedUpdate(version, version, List.of(new
TestUpdateEntry("foo_" + version)));
+ return new VersionedUpdate(version, 1, List.of(new
TestUpdateEntry("foo_" + version)));
}
static class TestUpdateEntry implements UpdateEntry {
diff --git
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConfigCommandTest.java
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConfigCommandTest.java
index 4e32ac25da..b575c93415 100644
---
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConfigCommandTest.java
+++
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConfigCommandTest.java
@@ -62,7 +62,7 @@ public class ItConfigCommandTest extends AbstractCliTest {
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
assertThat(future, willCompleteSuccessfully());
diff --git
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/rest/ItGeneratedRestClientTest.java
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/rest/ItGeneratedRestClientTest.java
index e10b3471c7..5862014837 100644
---
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/rest/ItGeneratedRestClientTest.java
+++
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/rest/ItGeneratedRestClientTest.java
@@ -155,7 +155,7 @@ public class ItGeneratedRestClientTest {
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
for (CompletableFuture<Ignite> future : futures) {
assertThat(future, willCompleteSuccessfully());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java
similarity index 63%
copy from
modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
copy to
modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java
index 3dc56f4d52..e550a9b486 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockUpdateListener.java
@@ -18,28 +18,15 @@
package org.apache.ignite.internal.hlc;
/**
- * A Hybrid Logical Clock.
+ * Used to track updates of a {@link HybridClock}: it gets notified each time
the clock 'ticks', including
+ * adjustments caused by external events.
*/
-public interface HybridClock {
+@FunctionalInterface
+public interface ClockUpdateListener {
/**
- * Creates a timestamp for new event.
+ * Called when the clock's current time advances.
*
- * @return The hybrid timestamp.
+ * @param newTs New timestamp on the clock (represented as a long value,
see {@link HybridTimestamp#longValue()}.
*/
- long nowLong();
-
- /**
- * Creates a timestamp for new event.
- *
- * @return The hybrid timestamp.
- */
- HybridTimestamp now();
-
- /**
- * Creates a timestamp for a received event.
- *
- * @param requestTime Timestamp from request.
- * @return The hybrid timestamp.
- */
- HybridTimestamp update(HybridTimestamp requestTime);
+ void onUpdate(long newTs);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
index 3dc56f4d52..b451c1219e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClock.java
@@ -42,4 +42,18 @@ public interface HybridClock {
* @return The hybrid timestamp.
*/
HybridTimestamp update(HybridTimestamp requestTime);
+
+ /**
+ * Adds an update listener to self.
+ *
+ * @param listener Update listener to add.
+ */
+ void addUpdateListener(ClockUpdateListener listener);
+
+ /**
+ * Removes an update listener from self.
+ *
+ * @param listener Listener to remove.
+ */
+ void removeUpdateListener(ClockUpdateListener listener);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
index 32d5839dbe..4209d31674 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridClockImpl.java
@@ -24,6 +24,8 @@ import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.ignite.internal.tostring.S;
/**
@@ -45,6 +47,8 @@ public class HybridClockImpl implements HybridClock {
private volatile long latestTime;
+ private final List<ClockUpdateListener> updateListeners = new
CopyOnWriteArrayList<>();
+
/**
* The constructor which initializes the latest time to current time by
system clock.
*/
@@ -67,11 +71,17 @@ public class HybridClockImpl implements HybridClock {
long newLatestTime = max(oldLatestTime + 1, now);
if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime))
{
+ notifyUpdateListeners(newLatestTime);
+
return newLatestTime;
}
}
}
+ private void notifyUpdateListeners(long newTs) {
+ updateListeners.forEach(listener -> listener.onUpdate(newTs));
+ }
+
@Override
public HybridTimestamp now() {
return hybridTimestamp(nowLong());
@@ -94,12 +104,23 @@ public class HybridClockImpl implements HybridClock {
long newLatestTime = max(requestTime.longValue() + 1, max(now,
oldLatestTime + 1));
if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime))
{
+ notifyUpdateListeners(newLatestTime);
+
return hybridTimestamp(newLatestTime);
}
}
}
- /** {@inheritDoc} */
+ @Override
+ public void addUpdateListener(ClockUpdateListener listener) {
+ updateListeners.add(listener);
+ }
+
+ @Override
+ public void removeUpdateListener(ClockUpdateListener listener) {
+ updateListeners.remove(listener);
+ }
+
@Override
public String toString() {
return S.toString(HybridClock.class, this);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
index 047c536cc9..a40f0e44fd 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/HybridTimestamp.java
@@ -243,4 +243,12 @@ public final class HybridTimestamp implements
Comparable<HybridTimestamp>, Seria
return new HybridTimestamp(time + (mills << LOGICAL_TIME_BITS_SIZE));
}
+
+ /**
+ * Returns max clock skew for the cluster (in millis).
+ */
+ // TODO: IGNITE-19809 - Convert this to a cluster-wide config property.
+ public static long maxClockSkew() {
+ return CLOCK_SKEW;
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
index c5a5633e81..41fd6174ac 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/HybridClockTest.java
@@ -19,26 +19,38 @@ package org.apache.ignite.internal;
import static
org.apache.ignite.internal.hlc.HybridClockTestUtils.mockToEpochMilli;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import java.time.Clock;
+import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoExtension;
/**
* Tests of a Hybrid Logical Clock implementation.
* {@link HybridClock}
*/
+@ExtendWith(MockitoExtension.class)
class HybridClockTest {
/**
* Mock of a system clock.
*/
private static MockedStatic<Clock> clockMock;
+ @Mock
+ private ClockUpdateListener updateListener;
+
@AfterEach
public void afterEach() {
closeClockMock();
@@ -106,4 +118,52 @@ class HybridClockTest {
clockMock.close();
}
}
+
+ @Test
+ void updateListenerGetsNotifiedOnUpdateCausedByNowCall() {
+ HybridClock clock = new HybridClockImpl();
+
+ clock.addUpdateListener(updateListener);
+
+ HybridTimestamp ts = clock.now();
+
+ verify(updateListener).onUpdate(ts.longValue());
+ }
+
+ @Test
+ void updateListenerGetsNotifiedOnUpdateCausedByNowLongCall() {
+ HybridClock clock = new HybridClockImpl();
+
+ clock.addUpdateListener(updateListener);
+
+ long ts = clock.nowLong();
+
+ verify(updateListener).onUpdate(ts);
+ }
+
+ @Test
+ void updateListenerGetsNotifiedOnExternalUpdate() {
+ HybridClock clock = new HybridClockImpl();
+
+ clock.addUpdateListener(updateListener);
+
+ HybridTimestamp ts =
clock.now().addPhysicalTime(TimeUnit.DAYS.toMillis(365));
+
+ HybridTimestamp afterUpdate = clock.update(ts);
+
+ verify(updateListener).onUpdate(afterUpdate.longValue());
+ }
+
+ @Test
+ void updateListenerIsNotUpdatedAfterRemoval() {
+ HybridClock clock = new HybridClockImpl();
+
+ clock.addUpdateListener(updateListener);
+
+ clock.removeUpdateListener(updateListener);
+
+ clock.now();
+
+ verify(updateListener, never()).onUpdate(anyLong());
+ }
}
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
index 4dd2a0f860..aa2d5cec37 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/TestHybridClock.java
@@ -23,7 +23,10 @@ import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.LongSupplier;
+import org.apache.ignite.internal.hlc.ClockUpdateListener;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.tostring.S;
@@ -38,6 +41,8 @@ public class TestHybridClock implements HybridClock {
/** Latest time. */
private volatile long latestTime;
+ private final List<ClockUpdateListener> updateListeners = new
CopyOnWriteArrayList<>();
+
/**
* Var handle for {@link #latestTime}.
*/
@@ -71,11 +76,17 @@ public class TestHybridClock implements HybridClock {
long newLatestTime = max(oldLatestTime + 1, now);
if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime))
{
+ notifyUpdateListeners(newLatestTime);
+
return newLatestTime;
}
}
}
+ private void notifyUpdateListeners(long newLatestTime) {
+ updateListeners.forEach(listener -> listener.onUpdate(newLatestTime));
+ }
+
@Override
public HybridTimestamp now() {
return hybridTimestamp(nowLong());
@@ -98,12 +109,23 @@ public class TestHybridClock implements HybridClock {
long newLatestTime = max(requestTime.longValue() + 1, max(now,
oldLatestTime + 1));
if (LATEST_TIME.compareAndSet(this, oldLatestTime, newLatestTime))
{
+ notifyUpdateListeners(newLatestTime);
+
return hybridTimestamp(newLatestTime);
}
}
}
- /** {@inheritDoc} */
+ @Override
+ public void addUpdateListener(ClockUpdateListener listener) {
+ updateListeners.add(listener);
+ }
+
+ @Override
+ public void removeUpdateListener(ClockUpdateListener listener) {
+ updateListeners.remove(listener);
+ }
+
@Override
public String toString() {
return S.toString(HybridClock.class, this);
diff --git
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/AbstractJdbcSelfTest.java
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/AbstractJdbcSelfTest.java
index 4cbc43fb3b..6b54e77f20 100644
---
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/AbstractJdbcSelfTest.java
+++
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/AbstractJdbcSelfTest.java
@@ -87,7 +87,7 @@ public class AbstractJdbcSelfTest extends
BaseIgniteAbstractTest {
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
assertThat(future, willCompleteSuccessfully());
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index c2850211d0..a82f322611 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -24,6 +24,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Iif;
@@ -75,6 +76,25 @@ public interface MetaStorageManager extends IgniteComponent {
@Deprecated
List<Entry> getLocally(byte[] key, long revLowerBound, long revUpperBound);
+ /**
+ * Returns an entry by the given key and bounded by the given revision.
The entry is obtained
+ * from the local storage.
+ *
+ * @param key The key.
+ * @param revUpperBound The upper bound of revision.
+ * @return Value corresponding to the given key.
+ */
+ Entry getLocally(byte[] key, long revUpperBound);
+
+ /**
+ * Looks up a timestamp by a revision. This should only be invoked if it
is guaranteed that the
+ * revision is available in the local storage. This method always operates
locally.
+ *
+ * @param revision Revision by which to do a lookup.
+ * @return Timestamp corresponding to the revision.
+ */
+ HybridTimestamp timestampByRevision(long revision);
+
/**
* Retrieves entries for given keys.
*/
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchEvent.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchEvent.java
index 33130cb77f..9c8a9aea2f 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchEvent.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchEvent.java
@@ -19,8 +19,10 @@ package org.apache.ignite.internal.metastorage;
import java.util.Collection;
import java.util.List;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.TestOnly;
/**
* Watch event contains all entry updates done under one revision. Each
particular entry update in this revision is represented by {@link
@@ -33,15 +35,20 @@ public class WatchEvent {
private final long revision;
+ /** Timestamp assigned by the MetaStorage to the event's revision. */
+ private final HybridTimestamp timestamp;
+
/**
* Constructs an watch event with given entry events collection.
*
* @param entryEvts Events for entries corresponding to an update under
one revision.
* @param revision Revision of the updated entries.
+ * @param timestamp Timestamp assigned by the MetaStorage to the event's
revision.
*/
- public WatchEvent(Collection<EntryEvent> entryEvts, long revision) {
+ public WatchEvent(Collection<EntryEvent> entryEvts, long revision,
HybridTimestamp timestamp) {
this.entryEvts = List.copyOf(entryEvts);
this.revision = revision;
+ this.timestamp = timestamp;
}
/**
@@ -49,8 +56,10 @@ public class WatchEvent {
*
* @param entryEvt Entry event.
*/
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19820 -
remove/rework.
+ @TestOnly
public WatchEvent(EntryEvent entryEvt) {
- this(List.of(entryEvt), entryEvt.newEntry().revision());
+ this(List.of(entryEvt), entryEvt.newEntry().revision(),
HybridTimestamp.MAX_VALUE);
}
/**
@@ -91,6 +100,15 @@ public class WatchEvent {
return revision;
}
+ /**
+ * Returns the timestamp assigned by the MetaStorage to this event's
revision.
+ *
+ * @return Timestamp assigned by the MetaStorage to this event's revision.
+ */
+ public HybridTimestamp timestamp() {
+ return timestamp;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 8daf22341e..e0e7afde88 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -334,6 +334,32 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
}
}
+ @Override
+ public Entry getLocally(byte[] key, long revUpperBound) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteException(new NodeStoppingException());
+ }
+
+ try {
+ return storage.get(key, revUpperBound);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ @Override
+ public HybridTimestamp timestampByRevision(long revision) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteException(new NodeStoppingException());
+ }
+
+ try {
+ return storage.timestampByRevision(revision);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
@Override
public CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray>
keys) {
if (!busyLock.enterBusy()) {
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index cc2edef81b..c5350e0e63 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -284,4 +284,12 @@ public interface KeyValueStorage extends ManuallyCloseable
{
* present in the storage.
*/
byte @Nullable [] nextKey(byte[] key);
+
+ /**
+ * Looks up a timestamp by a revision.
+ *
+ * @param revision Revision by which to do a lookup.
+ * @return Timestamp corresponding to the revision.
+ */
+ HybridTimestamp timestampByRevision(long revision);
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index 234a14dd08..859040a9b5 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -130,7 +130,7 @@ public class WatchProcessor implements ManuallyCloseable {
// Notify all watches in parallel, then aggregate the
entries that they have processed.
CompletableFuture<List<EntryEvent>>[] notificationFutures
= watches.stream()
- .map(watch -> notifyWatch(watch, updatedEntries,
newRevision))
+ .map(watch -> notifyWatch(watch, updatedEntries,
newRevision, time))
.toArray(CompletableFuture[]::new);
return allOf(notificationFutures)
@@ -138,7 +138,7 @@ public class WatchProcessor implements ManuallyCloseable {
}, watchExecutor);
}
- private CompletableFuture<List<EntryEvent>> notifyWatch(Watch watch,
List<Entry> updatedEntries, long revision) {
+ private CompletableFuture<List<EntryEvent>> notifyWatch(Watch watch,
List<Entry> updatedEntries, long revision, HybridTimestamp time) {
CompletableFuture<List<EntryEvent>> eventFuture = supplyAsync(() -> {
List<EntryEvent> entryEvents = List.of();
@@ -165,7 +165,7 @@ public class WatchProcessor implements ManuallyCloseable {
.thenCompose(entryEvents -> {
CompletableFuture<Void> eventNotificationFuture =
entryEvents.isEmpty()
? watch.onRevisionUpdated(revision)
- : watch.onUpdate(new WatchEvent(entryEvents,
revision));
+ : watch.onUpdate(new WatchEvent(entryEvents,
revision, time));
return eventNotificationFuture.thenApply(v -> entryEvents);
})
@@ -199,7 +199,7 @@ public class WatchProcessor implements ManuallyCloseable {
acceptedEntries.addAll(future.join());
}
- var event = new WatchEvent(acceptedEntries, revision);
+ var event = new WatchEvent(acceptedEntries, revision, time);
return revisionCallback.onRevisionApplied(event, time)
.whenComplete((ignored, e) -> {
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 9458da0828..8a5858f3ea 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -1550,7 +1550,8 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
finishReplay();
}
- private HybridTimestamp timestampByRevision(long revision) {
+ @Override
+ public HybridTimestamp timestampByRevision(long revision) {
try {
byte[] tsBytes = revisionToTs.get(longToBytes(revision));
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 98656f1133..85b241cb25 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -33,6 +33,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.Objects;
import java.util.OptionalLong;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -409,6 +410,13 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
return incrementPrefix(key);
}
+ @Override
+ public HybridTimestamp timestampByRevision(long revision) {
+ synchronized (mux) {
+ return Objects.requireNonNull(revToTsMap.get(revision), "Revision
" + revision + " not found");
+ }
+ }
+
@Override
public void watchRange(byte[] keyFrom, byte @Nullable [] keyTo, long rev,
WatchListener listener) {
assert keyFrom != null : "keyFrom couldn't be null.";
diff --git a/modules/rest-api/openapi/openapi.yaml
b/modules/rest-api/openapi/openapi.yaml
index 358a88f66a..f7c1ac06d5 100644
--- a/modules/rest-api/openapi/openapi.yaml
+++ b/modules/rest-api/openapi/openapi.yaml
@@ -775,7 +775,7 @@ components:
description: Unique tag that identifies the cluster.
DeploymentStatus:
type: string
- description: REST presentation of the deployment statuses.
+ description: Status of deployment process.
enum:
- UPLOADING
- DEPLOYED
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
index 509c95045d..4aea1bca3d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
@@ -173,7 +173,7 @@ public class Cluster {
* with this call.
* @param initParametersConfigurator Configure {@link InitParameters}
before initializing the cluster.
*/
- public void startAndInit(
+ private void startAndInit(
int nodeCount,
int[] cmgNodes,
String nodeBootstrapConfigTemplate,
@@ -196,7 +196,7 @@ public class Cluster {
initParametersConfigurator.accept(builder);
- IgnitionManager.init(builder.build());
+ TestIgnitionManager.init(builder.build());
for (CompletableFuture<IgniteImpl> future : futures) {
assertThat(future, willCompleteSuccessfully());
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
index f578776261..88f177aa8b 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractOneNodeBenchmark.java
@@ -78,7 +78,7 @@ public class AbstractOneNodeBenchmark {
var fut = TestIgnitionManager.start(NODE_NAME, config,
workDir.resolve(NODE_NAME));
- IgnitionManager.init(new InitParametersBuilder()
+ TestIgnitionManager.init(new InitParametersBuilder()
.clusterName("cluster")
.destinationNodeName(NODE_NAME)
.cmgNodeNames(List.of(NODE_NAME))
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterInitTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterInitTest.java
index accda25e4d..0c5eac07c2 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterInitTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/cluster/management/ItClusterInitTest.java
@@ -67,12 +67,12 @@ public class ItClusterInitTest extends IgniteAbstractTest {
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
assertThat(allOf(nodesByName.values().toArray(CompletableFuture[]::new)),
willCompleteSuccessfully());
// init is idempotent
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
InitParameters initParametersWithWrongNodesList1 =
InitParameters.builder()
.destinationNodeName(nodeName)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/component/ItRestAddressReportTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/component/ItRestAddressReportTest.java
index db649c9bde..23f07deb2d 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/component/ItRestAddressReportTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/component/ItRestAddressReportTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.InitParameters;
import org.apache.ignite.app.IgniteRunner;
import org.apache.ignite.internal.IgniteIntegrationTest;
import org.apache.ignite.internal.runner.app.IgniteRunnerTest;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.junit.jupiter.api.DisplayName;
@@ -76,7 +77,7 @@ public class ItRestAddressReportTest extends
IgniteIntegrationTest {
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
// Then node is started
assertThat(ign, willCompleteSuccessfully());
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index a51e9ac216..5483bc84e6 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -70,6 +70,7 @@ import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogServiceImpl;
+import org.apache.ignite.internal.catalog.ClockWaiter;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
@@ -96,6 +97,7 @@ import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
import org.apache.ignite.internal.network.configuration.NetworkConfiguration;
@@ -595,6 +597,8 @@ public class ItRebalanceDistributedTest {
private final CatalogManager catalogManager;
+ private final ClockWaiter clockWaiter;
+
private List<IgniteComponent> nodeComponents;
private final ConfigurationTreeGenerator nodeCfgGenerator;
@@ -678,15 +682,16 @@ public class ItRebalanceDistributedTest {
LogicalTopologyServiceImpl logicalTopologyService = new
LogicalTopologyServiceImpl(logicalTopology, cmgManager);
+ KeyValueStorage keyValueStorage =
testInfo.getTestMethod().get().isAnnotationPresent(UseRocksMetaStorage.class)
+ ? new RocksDbKeyValueStorage(nodeName, resolveDir(dir,
"metaStorage"))
+ : new SimpleInMemoryKeyValueStorage(nodeName);
metaStorageManager = new MetaStorageManagerImpl(
vaultManager,
clusterService,
cmgManager,
logicalTopologyService,
raftManager,
-
testInfo.getTestMethod().get().isAnnotationPresent(UseRocksMetaStorage.class)
- ? new RocksDbKeyValueStorage(nodeName,
resolveDir(dir, "metaStorage"))
- : new SimpleInMemoryKeyValueStorage(nodeName),
+ keyValueStorage,
hybridClock
);
@@ -757,7 +762,12 @@ public class ItRebalanceDistributedTest {
metaStorageManager,
clusterService);
- catalogManager = new CatalogServiceImpl(new
UpdateLogImpl(metaStorageManager, vaultManager), hybridClock);
+ clockWaiter = new ClockWaiter("test", hybridClock);
+
+ catalogManager = new CatalogServiceImpl(
+ new UpdateLogImpl(metaStorageManager),
+ clockWaiter
+ );
schemaManager = new SchemaManager(registry, tablesCfg,
metaStorageManager);
@@ -852,6 +862,7 @@ public class ItRebalanceDistributedTest {
cmgManager,
metaStorageManager,
clusterCfgMgr,
+ clockWaiter,
catalogManager,
distributionZoneManager,
replicaManager,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index f6d0bc3fe2..bf6e993ac5 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -115,7 +115,7 @@ class ItTableRaftSnapshotsTest extends
IgniteIntegrationTest {
+ " netClusterNodes: [ {} ]\n"
+ " }\n"
+ " },\n"
- + " raft.rpcInstallSnapshotTimeout: 10000,"
+ + " raft.rpcInstallSnapshotTimeout: 10000,\n"
+ " clientConnector.port: {}\n"
+ "}";
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
index 43d1624430..e46f65bea6 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/AbstractSchemaChangeTest.java
@@ -189,7 +189,7 @@ abstract class AbstractSchemaChangeTest extends
IgniteIntegrationTest {
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
await(CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])));
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
index 42c877fef3..275c4c2333 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
@@ -122,7 +122,7 @@ public class ItDataSchemaSyncTest extends
IgniteAbstractTest {
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
for (CompletableFuture<Ignite> future : futures) {
assertThat(future, willCompleteSuccessfully());
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index cd8d3a1102..192592d2ce 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -111,7 +111,7 @@ public class ItIgniteInMemoryNodeRestartTest extends
BaseIgniteRestartTest {
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
}
assertThat(future, willCompleteSuccessfully());
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 8bec4299f1..f80687fd09 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.BaseIgniteRestartTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.catalog.CatalogServiceImpl;
+import org.apache.ignite.internal.catalog.ClockWaiter;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
@@ -323,7 +324,12 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
new RaftGroupEventsClientListener()
);
- var catalogManager = new CatalogServiceImpl(new
UpdateLogImpl(metaStorageMgr, vault), hybridClock);
+ var clockWaiter = new ClockWaiter("test", hybridClock);
+
+ var catalogManager = new CatalogServiceImpl(
+ new UpdateLogImpl(metaStorageMgr),
+ clockWaiter
+ );
TableManager tableManager = new TableManager(
name,
@@ -393,6 +399,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
metaStorageMgr,
clusterCfgMgr,
dataStorageManager,
+ clockWaiter,
catalogManager,
schemaManager,
distributionZoneManager,
@@ -462,7 +469,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
.metaStorageNodeNames(List.of(nodeName))
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
}
assertThat(future, willCompleteSuccessfully());
@@ -523,7 +530,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
.metaStorageNodeNames(List.of(nodeName))
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
}
return futures.stream()
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableCreationTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableCreationTest.java
index 44b6ad60e5..0f2e007aab 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableCreationTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTableCreationTest.java
@@ -195,7 +195,7 @@ class ItTableCreationTest extends IgniteIntegrationTest {
.metaStorageNodeNames(List.of(metaStorageNode))
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
for (CompletableFuture<Ignite> future : futures) {
assertThat(future, willCompleteSuccessfully());
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index 72bf12d6ea..cb96e88300 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -120,7 +120,7 @@ public class ItTablesApiTest extends IgniteAbstractTest {
.metaStorageNodeNames(List.of(metaStorageNodeName))
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
for (CompletableFuture<Ignite> future : futures) {
assertThat(future, willCompleteSuccessfully());
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index 31e19d7cc9..c7eb34f143 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -243,7 +243,7 @@ public class PlatformTestNodeRunner {
.metaStorageNodeNames(List.of(metaStorageNodeName))
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
System.out.println("Initialization complete");
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
index 296663e899..47032c827f 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
@@ -107,7 +107,7 @@ public abstract class ItAbstractThinClientTest extends
IgniteAbstractTest {
.metaStorageNodeNames(List.of(metaStorageNode))
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
for (CompletableFuture<Ignite> future : futures) {
assertThat(future, willCompleteSuccessfully());
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
index 65fe5d3de5..4347405dee 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
@@ -161,7 +161,7 @@ public abstract class ClusterPerClassIntegrationTest
extends IgniteIntegrationTe
.metaStorageNodeNames(List.of(metaStorageNodeName))
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
for (CompletableFuture<Ignite> future : futures) {
assertThat(future, willCompleteSuccessfully());
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/ItSqlLogicTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/ItSqlLogicTest.java
index 97bd0673c7..aec92abc83 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/ItSqlLogicTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/ItSqlLogicTest.java
@@ -335,7 +335,7 @@ public class ItSqlLogicTest extends IgniteIntegrationTest {
.metaStorageNodeNames(List.of(metaStorageNodeName))
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
for (CompletableFuture<Ignite> future : futures) {
assertThat(future, willCompleteSuccessfully());
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
index 5ad0371190..653449f663 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ssl/ItSslTest.java
@@ -35,7 +35,6 @@ import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import org.apache.ignite.IgnitionManager;
import org.apache.ignite.InitParameters;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.client.IgniteClientConnectionException;
@@ -43,6 +42,7 @@ import org.apache.ignite.client.SslConfiguration;
import org.apache.ignite.internal.Cluster;
import org.apache.ignite.internal.IgniteIntegrationTest;
import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.AfterAll;
@@ -517,7 +517,7 @@ public class ItSslTest extends IgniteIntegrationTest {
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
// First node will initialize the cluster with single node
successfully since the second node can't connect to it.
assertThat(node1, willCompleteSuccessfully());
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
index 01d4ab0183..b41540f488 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItRoReadsTest.java
@@ -133,7 +133,7 @@ public class ItRoReadsTest extends BaseIgniteAbstractTest {
.clusterName("cluster")
.build();
- IgnitionManager.init(initParameters);
+ TestIgnitionManager.init(initParameters);
assertThat(future, willCompleteSuccessfully());
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index c8d20a5375..3d76fbb7df 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -42,6 +42,8 @@ import org.apache.ignite.configuration.ConfigurationModule;
import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogServiceImpl;
+import org.apache.ignite.internal.catalog.ClockWaiter;
+import
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
@@ -276,6 +278,8 @@ public class IgniteImpl implements Ignite {
/** A hybrid logical clock. */
private final HybridClock clock;
+ private final ClockWaiter clockWaiter;
+
private final OutgoingSnapshotsManager outgoingSnapshotsManager;
private final RestAddressReporter restAddressReporter;
@@ -346,6 +350,8 @@ public class IgniteImpl implements Ignite {
clock = new HybridClockImpl();
+ clockWaiter = new ClockWaiter(name, clock);
+
RaftConfiguration raftConfiguration =
nodeConfigRegistry.getConfiguration(RaftConfiguration.KEY);
// TODO https://issues.apache.org/jira/browse/IGNITE-19051
@@ -485,7 +491,15 @@ public class IgniteImpl implements Ignite {
outgoingSnapshotsManager = new
OutgoingSnapshotsManager(clusterSvc.messagingService());
- catalogManager = new CatalogServiceImpl(new
UpdateLogImpl(metaStorageMgr, vaultMgr), clock);
+ SchemaSynchronizationConfiguration schemaSyncConfig =
clusterConfigRegistry.getConfiguration(
+ SchemaSynchronizationConfiguration.KEY
+ );
+
+ catalogManager = new CatalogServiceImpl(
+ new UpdateLogImpl(metaStorageMgr),
+ clockWaiter,
+ () -> schemaSyncConfig.delayDuration().value()
+ );
distributedTblMgr = new TableManager(
name,
@@ -675,6 +689,7 @@ public class IgniteImpl implements Ignite {
// Start the components that are required to join the cluster.
lifecycleManager.startComponents(
+ clockWaiter,
nettyBootstrapFactory,
clusterSvc,
restComponent,
diff --git
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
index 485f6c2f86..4e47a634e6 100644
---
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
+++
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
@@ -42,6 +42,7 @@ import
org.apache.ignite.internal.configuration.TestConfigurationChanger;
import org.apache.ignite.internal.configuration.tree.ConfigurationSource;
import org.apache.ignite.internal.configuration.tree.ConstructableTreeNode;
import
org.apache.ignite.internal.configuration.validation.ConfigurationValidatorImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.metastorage.EntryEvent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
@@ -214,7 +215,7 @@ public class DistributedConfigurationCatchUpTest {
new EntryEvent(null, new EntryImpl(MASTER_KEY.bytes(),
null, newRevision, -1)),
// Add a mock entry to simulate a configuration update.
new EntryEvent(null, new EntryImpl((DISTRIBUTED_PREFIX
+ "foobar").getBytes(UTF_8), null, newRevision, -1))
- ), newRevision));
+ ), newRevision, HybridTimestamp.MAX_VALUE));
return true;
});