This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a73e813327 IGNITE-20498 Fix potential catalog version order violations
(#2734)
a73e813327 is described below
commit a73e8133270ca7691c1abc276e5562d5e1069e3b
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Tue Oct 24 14:12:19 2023 +0300
IGNITE-20498 Fix potential catalog version order violations (#2734)
---
.../internal/catalog/CatalogManagerImpl.java | 7 +++---
.../internal/catalog/CatalogManagerSelfTest.java | 29 ++++++++++++++++++++++
.../internal/catalog/BaseCatalogManagerTest.java | 11 +++++++-
.../sql/engine/ClusterPerClassIntegrationTest.java | 10 +++++++-
.../internal/sql/engine/ItSystemViewsTest.java | 11 ++++++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 9 +++++--
6 files changed, 70 insertions(+), 7 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index 180057fd31..638d119ab7 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -88,9 +88,9 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
private static final int SYSTEM_VIEW_STRING_COLUMN_LENGTH =
Short.MAX_VALUE;
/** Safe time to wait before new Catalog version activation. */
- private static final int DEFAULT_DELAY_DURATION = 0;
+ static final int DEFAULT_DELAY_DURATION = 0;
- private static final int
DEFAULT_PARTITION_IDLE_SAFE_TIME_PROPAGATION_PERIOD = 0;
+ static final int DEFAULT_PARTITION_IDLE_SAFE_TIME_PROPAGATION_PERIOD = 0;
/** Initial update token for a catalog descriptor, this token is valid
only before the first call of
* {@link UpdateEntry#applyUpdate(Catalog, long)}.
@@ -525,10 +525,11 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
private static Catalog applyUpdateFinal(Catalog catalog, VersionedUpdate
update, HybridTimestamp metaStorageUpdateTimestamp) {
long activationTimestamp =
metaStorageUpdateTimestamp.addPhysicalTime(update.delayDurationMs()).longValue();
+ long prevVersionActivationTimestamp = catalog.time() + 1;
return new Catalog(
update.version(),
- activationTimestamp,
+ Math.max(activationTimestamp, prevVersionActivationTimestamp),
catalog.objectIdGenState(),
catalog.zones(),
catalog.schemas()
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index 152933fec5..577ec583ab 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -50,6 +50,7 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.apache.ignite.sql.ColumnType.DECIMAL;
import static org.apache.ignite.sql.ColumnType.INT32;
import static org.apache.ignite.sql.ColumnType.INT64;
@@ -57,6 +58,7 @@ import static org.apache.ignite.sql.ColumnType.NULL;
import static org.apache.ignite.sql.ColumnType.STRING;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.is;
@@ -1933,6 +1935,33 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
assertThat(fireEventFuture, willCompleteSuccessfully());
}
+ @Test
+ public void activationTimeIsStrictlyMonotonic() {
+ // Prepare schema changes.
+ ColumnParams column =
ColumnParams.builder().name("ID").type(INT32).build();
+ CatalogCommand cmd1 =
BaseCatalogManagerTest.createTableCommand(TABLE_NAME, List.of(column),
List.of("ID"), null);
+ CatalogCommand cmd2 =
BaseCatalogManagerTest.createTableCommand("test2", List.of(column),
List.of("ID"), null);
+
+ // Make first schema change with delay = 1000.
+ delayDuration.set(10_000);
+ CompletableFuture<Void> schemaChangeFuture0 = manager.execute(cmd1);
+
+ // Make second schema change with delay = 1.
+ delayDuration.set(1);
+ CompletableFuture<Void> schemaChangeFuture1 = manager.execute(cmd2);
+
+ // Move clock forward to avoid awaiting.
+ clock.update(clock.now().addPhysicalTime(11_000));
+
+ assertThat(schemaChangeFuture0, willSucceedFast());
+ assertThat(schemaChangeFuture1, willSucceedFast());
+
+ // Make sure that we are getting the latest version of the schema
using current timestamp.
+ int latestVer = manager.latestCatalogVersion();
+ int currentTsVer =
manager.activeCatalogVersion(clock.now().longValue());
+ assertThat(currentTsVer, equalTo(latestVer));
+ }
+
private CompletableFuture<Void> changeColumn(
String tab,
String col,
diff --git
a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
index fc335ff292..533529a181 100644
---
a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
+++
b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.spy;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.commands.CreateHashIndexCommand;
@@ -75,8 +76,11 @@ public abstract class BaseCatalogManagerTest extends
BaseIgniteAbstractTest {
protected CatalogManagerImpl manager;
+ protected AtomicLong delayDuration = new AtomicLong();
+
@BeforeEach
void setUp() {
+ delayDuration.set(CatalogManagerImpl.DEFAULT_DELAY_DURATION);
vault = new VaultManager(new InMemoryVaultService());
metastore = StandaloneMetaStorageManager.create(vault, new
SimpleInMemoryKeyValueStorage(NODE_NAME));
@@ -84,7 +88,12 @@ public abstract class BaseCatalogManagerTest extends
BaseIgniteAbstractTest {
updateLog = spy(new UpdateLogImpl(metastore));
clockWaiter = spy(new ClockWaiter(NODE_NAME, clock));
- manager = new CatalogManagerImpl(updateLog, clockWaiter);
+ manager = new CatalogManagerImpl(
+ updateLog,
+ clockWaiter,
+ delayDuration::get,
+ () ->
CatalogManagerImpl.DEFAULT_PARTITION_IDLE_SAFE_TIME_PROPAGATION_PERIOD
+ );
vault.start();
metastore.start();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
index d73953bc13..bafa888fc6 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
@@ -60,6 +60,7 @@ import
org.apache.ignite.internal.sql.engine.util.QueryCheckerFactory;
import org.apache.ignite.internal.sql.engine.util.SqlTestUtils;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.storage.index.StorageIndexDescriptor;
+import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
@@ -497,12 +498,19 @@ public abstract class ClusterPerClassIntegrationTest
extends IgniteIntegrationTe
}
/**
- * Returns internal {@code SqlQueryProcessor} for first cluster node.
+ * Returns internal {@code SqlQueryProcessor} for first cluster node.
*/
protected SqlQueryProcessor queryProcessor() {
return (SqlQueryProcessor) ((IgniteImpl)
CLUSTER_NODES.get(0)).queryEngine();
}
+ /**
+ * Returns internal {@code SystemViewManager} for first cluster node.
+ */
+ protected SystemViewManagerImpl systemViewManager() {
+ return (SystemViewManagerImpl) ((IgniteImpl)
CLUSTER_NODES.get(0)).systemViewManager();
+ }
+
/**
* Returns internal {@code TxManager} for first cluster node.
*/
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSystemViewsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSystemViewsTest.java
index 2a8cad2cd4..31d8d22faf 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSystemViewsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSystemViewsTest.java
@@ -22,8 +22,11 @@ import static
org.apache.ignite.internal.sql.engine.ItSystemViewsTest.KnownSyste
import static
org.apache.ignite.internal.sql.engine.ItSystemViewsTest.KnownSystemView.SYSTEM_VIEW_COLUMNS;
import org.apache.ignite.internal.sql.engine.util.MetadataMatcher;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.sql.ColumnType;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -49,6 +52,14 @@ public class ItSystemViewsTest extends
ClusterPerClassIntegrationTest {
}
}
+ @BeforeAll
+ @Override
+ void beforeAll(TestInfo testInfo) {
+ super.beforeAll(testInfo);
+
+ IgniteTestUtils.await(systemViewManager().completeRegistration());
+ }
+
@ParameterizedTest
@EnumSource(KnownSystemView.class)
public void systemViewWithGivenNameExists(KnownSystemView view) {
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 28dafd2961..4e4c870fee 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -131,6 +131,7 @@ import
org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.DataStorageModule;
import org.apache.ignite.internal.storage.DataStorageModules;
import org.apache.ignite.internal.systemview.SystemViewManagerImpl;
+import org.apache.ignite.internal.systemview.api.SystemViewManager;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
@@ -859,8 +860,6 @@ public class IgniteImpl implements Ignite {
return cmgMgr.onJoinReady();
}, startupExecutor)
- // TODO Remove waiting for schema update after
https://issues.apache.org/jira/browse/IGNITE-20498
- .thenComposeAsync(v ->
systemViewManager.completeRegistration())
.thenRunAsync(() -> {
try {
// Transfer the node to the STARTED state.
@@ -911,10 +910,16 @@ public class IgniteImpl implements Ignite {
return distributedTblMgr;
}
+ @TestOnly
public QueryProcessor queryEngine() {
return qryEngine;
}
+ @TestOnly
+ public SystemViewManager systemViewManager() {
+ return systemViewManager;
+ }
+
@TestOnly
public MetaStorageManager metaStorageManager() {
return metaStorageMgr;