This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c67ad9d99a IGNITE-20039 Add functionality to switch the IndexManager
to catalog events #2 (#2354)
c67ad9d99a is described below
commit c67ad9d99acdbf3cb96129eef7421552836f8b98
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Jul 25 18:01:57 2023 +0300
IGNITE-20039 Add functionality to switch the IndexManager to catalog events
#2 (#2354)
---
.../apache/ignite/internal/catalog/Catalog.java | 14 +-
...logServiceImpl.java => CatalogManagerImpl.java} | 41 +-
.../ignite/internal/catalog/CatalogService.java | 24 +-
.../descriptors/CatalogSchemaDescriptor.java | 5 +-
.../internal/catalog/storage/UpdateLogImpl.java | 19 +-
...ceSelfTest.java => CatalogManagerSelfTest.java} | 628 +++++++++++----------
.../catalog/storage/UpdateLogImplTest.java | 94 +--
.../storage/ItRebalanceDistributedTest.java | 4 +-
.../runner/app/ItIgniteNodeRestartTest.java | 4 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 4 +-
10 files changed, 470 insertions(+), 367 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
index 2a6c06b5e3..80057fcc0c 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/Catalog.java
@@ -34,6 +34,7 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.CollectionUtils;
+import org.jetbrains.annotations.Nullable;
/**
* Catalog descriptor represents database schema snapshot.
@@ -115,14 +116,22 @@ public class Catalog {
return schemasByName.values();
}
- public CatalogTableDescriptor table(int tableId) {
+ public @Nullable CatalogTableDescriptor table(int tableId) {
return tablesById.get(tableId);
}
- public CatalogIndexDescriptor index(int indexId) {
+ public Collection<CatalogTableDescriptor> tables() {
+ return tablesById.values();
+ }
+
+ public @Nullable CatalogIndexDescriptor index(int indexId) {
return indexesById.get(indexId);
}
+ public Collection<CatalogIndexDescriptor> indexes() {
+ return indexesById.values();
+ }
+
public CatalogZoneDescriptor zone(String name) {
return zonesByName.get(name);
}
@@ -135,7 +144,6 @@ public class Catalog {
return zonesByName.values();
}
- /** {@inheritDoc} */
@Override
public String toString() {
return S.toString(this);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
similarity index 96%
rename from
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
rename to
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index a1981a8924..a6b29af3cb 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -25,6 +25,7 @@ import static
org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -101,14 +102,14 @@ import org.jetbrains.annotations.Nullable;
/**
* Catalog service implementation.
*/
-public class CatalogServiceImpl extends Producer<CatalogEvent,
CatalogEventParameters> implements CatalogManager {
+public class CatalogManagerImpl extends Producer<CatalogEvent,
CatalogEventParameters> implements CatalogManager {
private static final int MAX_RETRY_COUNT = 10;
/** Safe time to wait before new Catalog version activation. */
private static final int DEFAULT_DELAY_DURATION = 0;
/** The logger. */
- private static final IgniteLogger LOG =
Loggers.forClass(CatalogServiceImpl.class);
+ private static final IgniteLogger LOG =
Loggers.forClass(CatalogManagerImpl.class);
/** Versioned catalog descriptors. */
private final NavigableMap<Integer, Catalog> catalogByVer = new
ConcurrentSkipListMap<>();
@@ -127,21 +128,21 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
/**
* Constructor.
*/
- public CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter) {
+ public CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter) {
this(updateLog, clockWaiter, DEFAULT_DELAY_DURATION);
}
/**
* Constructor.
*/
- CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter, long
delayDurationMs) {
+ CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter, long
delayDurationMs) {
this(updateLog, clockWaiter, () -> delayDurationMs);
}
/**
* Constructor.
*/
- public CatalogServiceImpl(UpdateLog updateLog, ClockWaiter clockWaiter,
LongSupplier delayDurationMsSupplier) {
+ public CatalogManagerImpl(UpdateLog updateLog, ClockWaiter clockWaiter,
LongSupplier delayDurationMsSupplier) {
this.updateLog = updateLog;
this.clockWaiter = clockWaiter;
this.delayDurationMsSupplier = delayDurationMsSupplier;
@@ -183,30 +184,45 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
}
@Override
- public CatalogTableDescriptor table(String tableName, long timestamp) {
+ public @Nullable CatalogTableDescriptor table(String tableName, long
timestamp) {
return
catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME).table(tableName);
}
@Override
- public CatalogTableDescriptor table(int tableId, long timestamp) {
+ public @Nullable CatalogTableDescriptor table(int tableId, long timestamp)
{
return catalogAt(timestamp).table(tableId);
}
@Override
- public CatalogTableDescriptor table(int tableId, int catalogVersion) {
+ public @Nullable CatalogTableDescriptor table(int tableId, int
catalogVersion) {
return catalog(catalogVersion).table(tableId);
}
@Override
- public CatalogIndexDescriptor index(String indexName, long timestamp) {
+ public Collection<CatalogTableDescriptor> tables(int catalogVersion) {
+ return catalog(catalogVersion).tables();
+ }
+
+ @Override
+ public @Nullable CatalogIndexDescriptor index(String indexName, long
timestamp) {
return
catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME).index(indexName);
}
@Override
- public CatalogIndexDescriptor index(int indexId, long timestamp) {
+ public @Nullable CatalogIndexDescriptor index(int indexId, long timestamp)
{
return catalogAt(timestamp).index(indexId);
}
+ @Override
+ public @Nullable CatalogIndexDescriptor index(int indexId, int
catalogVersion) {
+ return catalog(catalogVersion).index(indexId);
+ }
+
+ @Override
+ public Collection<CatalogIndexDescriptor> indexes(int catalogVersion) {
+ return catalog(catalogVersion).indexes();
+ }
+
@Override
public @Nullable CatalogSchemaDescriptor schema(int version) {
Catalog catalog = catalog(version);
@@ -254,6 +270,11 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
return catalogAt(timestamp).version();
}
+ @Override
+ public int latestCatalogVersion() {
+ return catalogByVer.lastEntry().getKey();
+ }
+
private Catalog catalog(int version) {
return catalogByVer.get(version);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index c7364ebea3..c1d30f1b4c 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.catalog;
+import java.util.Collection;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
@@ -40,15 +41,21 @@ public interface CatalogService {
String DEFAULT_ZONE_NAME = "Default";
- CatalogTableDescriptor table(String tableName, long timestamp);
+ @Nullable CatalogTableDescriptor table(String tableName, long timestamp);
- CatalogTableDescriptor table(int tableId, long timestamp);
+ @Nullable CatalogTableDescriptor table(int tableId, long timestamp);
- CatalogTableDescriptor table(int tableId, int catalogVersion);
+ @Nullable CatalogTableDescriptor table(int tableId, int catalogVersion);
- CatalogIndexDescriptor index(String indexName, long timestamp);
+ Collection<CatalogTableDescriptor> tables(int catalogVersion);
- CatalogIndexDescriptor index(int indexId, long timestamp);
+ @Nullable CatalogIndexDescriptor index(String indexName, long timestamp);
+
+ @Nullable CatalogIndexDescriptor index(int indexId, long timestamp);
+
+ @Nullable CatalogIndexDescriptor index(int indexId, int catalogVersion);
+
+ Collection<CatalogIndexDescriptor> indexes(int catalogVersion);
CatalogSchemaDescriptor schema(int version);
@@ -64,5 +71,12 @@ public interface CatalogService {
int activeCatalogVersion(long timestamp);
+ /**
+ * Returns the latest registered version of the catalog.
+ *
+ * <p>NOTE: This method should only be used at the start of components
that may be removed or moved in the future.
+ */
+ int latestCatalogVersion();
+
void listen(CatalogEvent evt, EventListener<CatalogEventParameters>
closure);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSchemaDescriptor.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSchemaDescriptor.java
index 9b2e6bd0ed..d314a3bf17 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSchemaDescriptor.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSchemaDescriptor.java
@@ -26,6 +26,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
import org.apache.ignite.internal.tostring.S;
+import org.jetbrains.annotations.Nullable;
/**
* Schema definition contains database schema objects.
@@ -74,11 +75,11 @@ public class CatalogSchemaDescriptor extends
CatalogObjectDescriptor {
return indexes;
}
- public CatalogTableDescriptor table(String name) {
+ public @Nullable CatalogTableDescriptor table(String name) {
return tablesMap.get(name);
}
- public CatalogIndexDescriptor index(String name) {
+ public @Nullable CatalogIndexDescriptor index(String name) {
return indexesMap.get(name);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index 1b7994fabc..4d0e00198c 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -58,7 +58,7 @@ public class UpdateLogImpl implements UpdateLog {
private final MetaStorageManager metastore;
private volatile OnUpdateHandler onUpdateHandler;
- private volatile @Nullable UpdateListener listener = null;
+ private volatile @Nullable UpdateListener listener;
/**
* Creates the object.
@@ -69,7 +69,6 @@ public class UpdateLogImpl implements UpdateLog {
this.metastore = metastore;
}
- /** {@inheritDoc} */
@Override
public void start() {
if (!busyLock.enterBusy()) {
@@ -86,19 +85,17 @@ public class UpdateLogImpl implements UpdateLog {
);
}
- restoreStateFromVault(handler);
+ recoveryStateFromMetastore(handler);
UpdateListener listener = new UpdateListener(onUpdateHandler);
this.listener = listener;
metastore.registerPrefixWatch(CatalogKey.updatePrefix(), listener);
-
} finally {
busyLock.leaveBusy();
}
}
- /** {@inheritDoc} */
@Override
public void stop() throws Exception {
if (!stopGuard.compareAndSet(false, true)) {
@@ -115,13 +112,11 @@ public class UpdateLogImpl implements UpdateLog {
}
}
- /** {@inheritDoc} */
@Override
public void registerUpdateHandler(OnUpdateHandler handler) {
onUpdateHandler = handler;
}
- /** {@inheritDoc} */
@Override
public CompletableFuture<Boolean> append(VersionedUpdate update) {
if (!busyLock.enterBusy()) {
@@ -149,15 +144,19 @@ public class UpdateLogImpl implements UpdateLog {
}
}
- private void restoreStateFromVault(OnUpdateHandler handler) {
- long appliedRevision = metastore.appliedRevision();
+ private void recoveryStateFromMetastore(OnUpdateHandler handler) {
+ CompletableFuture<Long> recoveryFinishedFuture =
metastore.recoveryFinishedFuture();
+
+ assert recoveryFinishedFuture.isDone();
+
+ long recoveryRevision = recoveryFinishedFuture.join();
int ver = 1;
// TODO: IGNITE-19790 Read range from metastore
while (true) {
ByteArray key = CatalogKey.update(ver++);
- Entry entry = metastore.getLocally(key, appliedRevision);
+ Entry entry = metastore.getLocally(key, recoveryRevision);
if (entry.empty() || entry.tombstone()) {
break;
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
similarity index 77%
rename from
modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
rename to
modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index f0e5e0e300..44a5e29fa5 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -26,7 +26,9 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -106,6 +108,7 @@ import org.apache.ignite.internal.manager.EventListener;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
import org.apache.ignite.lang.ColumnAlreadyExistsException;
@@ -132,7 +135,7 @@ import org.mockito.ArgumentCaptor;
/**
* Catalog service self test.
*/
-public class CatalogServiceSelfTest {
+public class CatalogManagerSelfTest {
private static final String SCHEMA_NAME = DEFAULT_SCHEMA_NAME;
private static final String ZONE_NAME = DEFAULT_ZONE_NAME;
private static final String TABLE_NAME = "myTable";
@@ -147,9 +150,9 @@ public class CatalogServiceSelfTest {
private UpdateLog updateLog;
- private CatalogServiceImpl service;
+ private CatalogManagerImpl manager;
- private HybridClock clock;
+ private final HybridClock clock = new HybridClockImpl();
private ClockWaiter clockWaiter;
@@ -159,38 +162,39 @@ public class CatalogServiceSelfTest {
metastore = StandaloneMetaStorageManager.create(vault, new
SimpleInMemoryKeyValueStorage("test"));
- clock = new HybridClockImpl();
clockWaiter = spy(new ClockWaiter("test", clock));
updateLog = spy(new UpdateLogImpl(metastore));
- service = new CatalogServiceImpl(updateLog, clockWaiter);
+ manager = new CatalogManagerImpl(updateLog, clockWaiter);
vault.start();
metastore.start();
clockWaiter.start();
- service.start();
+ manager.start();
assertThat("Watches were not deployed", metastore.deployWatches(),
willCompleteSuccessfully());
}
@AfterEach
public void tearDown() throws Exception {
- service.stop();
- clockWaiter.stop();
- metastore.stop();
- vault.stop();
+ IgniteUtils.closeAll(
+ manager == null ? null : manager::stop,
+ clockWaiter == null ? null : clockWaiter::stop,
+ metastore == null ? null : metastore::stop,
+ vault == null ? null : vault::stop
+ );
}
@Test
public void testEmptyCatalog() {
- CatalogSchemaDescriptor schema = service.schema(DEFAULT_SCHEMA_NAME,
0);
+ CatalogSchemaDescriptor schema = manager.schema(DEFAULT_SCHEMA_NAME,
0);
assertNotNull(schema);
- assertSame(schema, service.activeSchema(DEFAULT_SCHEMA_NAME,
clock.nowLong()));
- assertSame(schema, service.schema(0));
- assertSame(schema, service.activeSchema(clock.nowLong()));
+ assertSame(schema, manager.activeSchema(DEFAULT_SCHEMA_NAME,
clock.nowLong()));
+ assertSame(schema, manager.schema(0));
+ assertSame(schema, manager.activeSchema(clock.nowLong()));
- assertNull(service.schema(1));
- assertThrows(IllegalStateException.class, () ->
service.activeSchema(-1L));
+ assertNull(manager.schema(1));
+ assertThrows(IllegalStateException.class, () ->
manager.activeSchema(-1L));
// Validate default schema.
assertEquals(DEFAULT_SCHEMA_NAME, schema.name());
@@ -199,7 +203,7 @@ public class CatalogServiceSelfTest {
assertEquals(0, schema.indexes().length);
// Default distribution zone must exists.
- CatalogZoneDescriptor zone = service.zone(DEFAULT_ZONE_NAME,
clock.nowLong());
+ CatalogZoneDescriptor zone = manager.zone(DEFAULT_ZONE_NAME,
clock.nowLong());
assertEquals(DEFAULT_ZONE_NAME, zone.name());
assertEquals(CreateZoneParams.DEFAULT_PARTITION_COUNT,
zone.partitions());
assertEquals(CreateZoneParams.DEFAULT_REPLICA_COUNT, zone.replicas());
@@ -224,50 +228,51 @@ public class CatalogServiceSelfTest {
.colocationColumns(List.of("key2"))
.build();
- assertThat(service.createTable(params), willBe(nullValue()));
+ assertThat(manager.createTable(params), willBe(nullValue()));
// Validate catalog version from the past.
- CatalogSchemaDescriptor schema = service.schema(0);
+ CatalogSchemaDescriptor schema = manager.schema(0);
assertNotNull(schema);
assertEquals(SCHEMA_NAME, schema.name());
- assertSame(schema, service.activeSchema(0L));
- assertSame(schema, service.activeSchema(123L));
+ assertSame(schema, manager.activeSchema(0L));
+ assertSame(schema, manager.activeSchema(123L));
assertNull(schema.table(TABLE_NAME));
- assertNull(service.table(TABLE_NAME, 123L));
- assertNull(service.index(createPkIndexName(TABLE_NAME), 123L));
+ assertNull(manager.table(TABLE_NAME, 123L));
+ assertNull(manager.index(createPkIndexName(TABLE_NAME), 123L));
// Validate actual catalog
- schema = service.schema(SCHEMA_NAME, 1);
+ schema = manager.schema(SCHEMA_NAME, 1);
CatalogTableDescriptor table = schema.table(TABLE_NAME);
CatalogHashIndexDescriptor pkIndex = (CatalogHashIndexDescriptor)
schema.index(createPkIndexName(TABLE_NAME));
assertNotNull(schema);
assertEquals(SCHEMA_NAME, schema.name());
- assertSame(schema, service.activeSchema(clock.nowLong()));
+ assertSame(schema, manager.activeSchema(clock.nowLong()));
- assertSame(table, service.table(TABLE_NAME, clock.nowLong()));
- assertSame(table, service.table(table.id(), clock.nowLong()));
+ assertSame(table, manager.table(TABLE_NAME, clock.nowLong()));
+ assertSame(table, manager.table(table.id(), clock.nowLong()));
- assertSame(pkIndex, service.index(createPkIndexName(TABLE_NAME),
clock.nowLong()));
- assertSame(pkIndex, service.index(pkIndex.id(), clock.nowLong()));
+ assertSame(pkIndex, manager.index(createPkIndexName(TABLE_NAME),
clock.nowLong()));
+ assertSame(pkIndex, manager.index(pkIndex.id(), clock.nowLong()));
// Validate newly created table
assertEquals(TABLE_NAME, table.name());
- assertEquals(service.zone(ZONE_NAME, clock.nowLong()).id(),
table.zoneId());
+ assertEquals(manager.zone(ZONE_NAME, clock.nowLong()).id(),
table.zoneId());
// Validate newly created pk index
+ assertEquals(3L, pkIndex.id());
assertEquals(createPkIndexName(TABLE_NAME), pkIndex.name());
assertEquals(table.id(), pkIndex.tableId());
assertEquals(table.primaryKeyColumns(), pkIndex.columns());
assertTrue(pkIndex.unique());
// Validate another table creation.
- assertThat(service.createTable(simpleTable(TABLE_NAME_2)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME_2)),
willBe(nullValue()));
// Validate actual catalog has both tables.
- schema = service.schema(2);
+ schema = manager.schema(2);
table = schema.table(TABLE_NAME);
pkIndex = (CatalogHashIndexDescriptor)
schema.index(createPkIndexName(TABLE_NAME));
CatalogTableDescriptor table2 = schema.table(TABLE_NAME_2);
@@ -275,43 +280,43 @@ public class CatalogServiceSelfTest {
assertNotNull(schema);
assertEquals(SCHEMA_NAME, schema.name());
- assertSame(schema, service.activeSchema(clock.nowLong()));
+ assertSame(schema, manager.activeSchema(clock.nowLong()));
- assertSame(table, service.table(TABLE_NAME, clock.nowLong()));
- assertSame(table, service.table(table.id(), clock.nowLong()));
+ assertSame(table, manager.table(TABLE_NAME, clock.nowLong()));
+ assertSame(table, manager.table(table.id(), clock.nowLong()));
- assertSame(pkIndex, service.index(createPkIndexName(TABLE_NAME),
clock.nowLong()));
- assertSame(pkIndex, service.index(pkIndex.id(), clock.nowLong()));
+ assertSame(pkIndex, manager.index(createPkIndexName(TABLE_NAME),
clock.nowLong()));
+ assertSame(pkIndex, manager.index(pkIndex.id(), clock.nowLong()));
- assertSame(table2, service.table(TABLE_NAME_2, clock.nowLong()));
- assertSame(table2, service.table(table2.id(), clock.nowLong()));
+ assertSame(table2, manager.table(TABLE_NAME_2, clock.nowLong()));
+ assertSame(table2, manager.table(table2.id(), clock.nowLong()));
- assertSame(pkIndex2, service.index(createPkIndexName(TABLE_NAME_2),
clock.nowLong()));
- assertSame(pkIndex2, service.index(pkIndex2.id(), clock.nowLong()));
+ assertSame(pkIndex2, manager.index(createPkIndexName(TABLE_NAME_2),
clock.nowLong()));
+ assertSame(pkIndex2, manager.index(pkIndex2.id(), clock.nowLong()));
assertNotSame(table, table2);
assertNotSame(pkIndex, pkIndex2);
// Try to create another table with same name.
- assertThat(service.createTable(simpleTable(TABLE_NAME_2)),
willThrowFast(TableAlreadyExistsException.class));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME_2)),
willThrowFast(TableAlreadyExistsException.class));
// Validate schema wasn't changed.
- assertSame(schema, service.activeSchema(clock.nowLong()));
+ assertSame(schema, manager.activeSchema(clock.nowLong()));
}
@Test
public void testDropTable() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
- assertThat(service.createTable(simpleTable(TABLE_NAME_2)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME_2)),
willBe(nullValue()));
long beforeDropTimestamp = clock.nowLong();
DropTableParams dropTableParams =
DropTableParams.builder().schemaName(SCHEMA_NAME).tableName(TABLE_NAME).build();
- assertThat(service.dropTable(dropTableParams), willBe(nullValue()));
+ assertThat(manager.dropTable(dropTableParams), willBe(nullValue()));
// Validate catalog version from the past.
- CatalogSchemaDescriptor schema = service.schema(2);
+ CatalogSchemaDescriptor schema = manager.schema(2);
CatalogTableDescriptor table1 = schema.table(TABLE_NAME);
CatalogTableDescriptor table2 = schema.table(TABLE_NAME_2);
CatalogIndexDescriptor pkIndex1 =
schema.index(createPkIndexName(TABLE_NAME));
@@ -322,51 +327,51 @@ public class CatalogServiceSelfTest {
assertNotNull(schema);
assertEquals(SCHEMA_NAME, schema.name());
- assertSame(schema, service.activeSchema(beforeDropTimestamp));
+ assertSame(schema, manager.activeSchema(beforeDropTimestamp));
- assertSame(table1, service.table(TABLE_NAME, beforeDropTimestamp));
- assertSame(table1, service.table(table1.id(), beforeDropTimestamp));
+ assertSame(table1, manager.table(TABLE_NAME, beforeDropTimestamp));
+ assertSame(table1, manager.table(table1.id(), beforeDropTimestamp));
- assertSame(pkIndex1, service.index(createPkIndexName(TABLE_NAME),
beforeDropTimestamp));
- assertSame(pkIndex1, service.index(pkIndex1.id(),
beforeDropTimestamp));
+ assertSame(pkIndex1, manager.index(createPkIndexName(TABLE_NAME),
beforeDropTimestamp));
+ assertSame(pkIndex1, manager.index(pkIndex1.id(),
beforeDropTimestamp));
- assertSame(table2, service.table(TABLE_NAME_2, beforeDropTimestamp));
- assertSame(table2, service.table(table2.id(), beforeDropTimestamp));
+ assertSame(table2, manager.table(TABLE_NAME_2, beforeDropTimestamp));
+ assertSame(table2, manager.table(table2.id(), beforeDropTimestamp));
- assertSame(pkIndex2, service.index(createPkIndexName(TABLE_NAME_2),
beforeDropTimestamp));
- assertSame(pkIndex2, service.index(pkIndex2.id(),
beforeDropTimestamp));
+ assertSame(pkIndex2, manager.index(createPkIndexName(TABLE_NAME_2),
beforeDropTimestamp));
+ assertSame(pkIndex2, manager.index(pkIndex2.id(),
beforeDropTimestamp));
// Validate actual catalog
- schema = service.schema(3);
+ schema = manager.schema(3);
assertNotNull(schema);
assertEquals(SCHEMA_NAME, schema.name());
- assertSame(schema, service.activeSchema(clock.nowLong()));
+ assertSame(schema, manager.activeSchema(clock.nowLong()));
assertNull(schema.table(TABLE_NAME));
- assertNull(service.table(TABLE_NAME, clock.nowLong()));
- assertNull(service.table(table1.id(), clock.nowLong()));
+ assertNull(manager.table(TABLE_NAME, clock.nowLong()));
+ assertNull(manager.table(table1.id(), clock.nowLong()));
assertNull(schema.index(createPkIndexName(TABLE_NAME)));
- assertNull(service.index(createPkIndexName(TABLE_NAME),
clock.nowLong()));
- assertNull(service.index(pkIndex1.id(), clock.nowLong()));
+ assertNull(manager.index(createPkIndexName(TABLE_NAME),
clock.nowLong()));
+ assertNull(manager.index(pkIndex1.id(), clock.nowLong()));
- assertSame(table2, service.table(TABLE_NAME_2, clock.nowLong()));
- assertSame(table2, service.table(table2.id(), clock.nowLong()));
+ assertSame(table2, manager.table(TABLE_NAME_2, clock.nowLong()));
+ assertSame(table2, manager.table(table2.id(), clock.nowLong()));
- assertSame(pkIndex2, service.index(createPkIndexName(TABLE_NAME_2),
clock.nowLong()));
- assertSame(pkIndex2, service.index(pkIndex2.id(), clock.nowLong()));
+ assertSame(pkIndex2, manager.index(createPkIndexName(TABLE_NAME_2),
clock.nowLong()));
+ assertSame(pkIndex2, manager.index(pkIndex2.id(), clock.nowLong()));
// Try to drop table once again.
- assertThat(service.dropTable(dropTableParams),
willThrowFast(TableNotFoundException.class));
+ assertThat(manager.dropTable(dropTableParams),
willThrowFast(TableNotFoundException.class));
// Validate schema wasn't changed.
- assertSame(schema, service.activeSchema(clock.nowLong()));
+ assertSame(schema, manager.activeSchema(clock.nowLong()));
}
@Test
public void testAddColumn() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
AlterTableAddColumnParams params = AlterTableAddColumnParams.builder()
.schemaName(SCHEMA_NAME)
@@ -382,17 +387,17 @@ public class CatalogServiceSelfTest {
long beforeAddedTimestamp = clock.nowLong();
- assertThat(service.addColumn(params), willBe(nullValue()));
+ assertThat(manager.addColumn(params), willBe(nullValue()));
// Validate catalog version from the past.
- CatalogSchemaDescriptor schema =
service.activeSchema(beforeAddedTimestamp);
+ CatalogSchemaDescriptor schema =
manager.activeSchema(beforeAddedTimestamp);
assertNotNull(schema);
assertNotNull(schema.table(TABLE_NAME));
assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME));
// Validate actual catalog
- schema = service.activeSchema(clock.nowLong());
+ schema = manager.activeSchema(clock.nowLong());
assertNotNull(schema);
assertNotNull(schema.table(TABLE_NAME));
@@ -413,7 +418,7 @@ public class CatalogServiceSelfTest {
@Test
public void testDropColumn() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
// Validate dropping column
AlterTableDropColumnParams params =
AlterTableDropColumnParams.builder()
@@ -424,17 +429,17 @@ public class CatalogServiceSelfTest {
long beforeAddedTimestamp = clock.nowLong();
- assertThat(service.dropColumn(params), willBe(nullValue()));
+ assertThat(manager.dropColumn(params), willBe(nullValue()));
// Validate catalog version from the past.
- CatalogSchemaDescriptor schema =
service.activeSchema(beforeAddedTimestamp);
+ CatalogSchemaDescriptor schema =
manager.activeSchema(beforeAddedTimestamp);
assertNotNull(schema);
assertNotNull(schema.table(TABLE_NAME));
assertNotNull(schema.table(TABLE_NAME).column("VAL"));
// Validate actual catalog
- schema = service.activeSchema(clock.nowLong());
+ schema = manager.activeSchema(clock.nowLong());
assertNotNull(schema);
assertNotNull(schema.table(TABLE_NAME));
@@ -443,7 +448,7 @@ public class CatalogServiceSelfTest {
@Test
public void testCreateDropColumnIfTableNotExists() {
- assertNull(service.table(TABLE_NAME, clock.nowLong()));
+ assertNull(manager.table(TABLE_NAME, clock.nowLong()));
// Try to add a new column.
AlterTableAddColumnParams addColumnParams =
AlterTableAddColumnParams.builder()
@@ -452,7 +457,7 @@ public class CatalogServiceSelfTest {
.columns(List.of(ColumnParams.builder().name(NEW_COLUMN_NAME).type(ColumnType.INT32).nullable(true).build()))
.build();
- assertThat(service.addColumn(addColumnParams),
willThrow(TableNotFoundException.class));
+ assertThat(manager.addColumn(addColumnParams),
willThrow(TableNotFoundException.class));
// Try to drop column.
AlterTableDropColumnParams dropColumnParams =
AlterTableDropColumnParams.builder()
@@ -461,13 +466,13 @@ public class CatalogServiceSelfTest {
.columns(Set.of("VAL"))
.build();
- assertThat(service.dropColumn(dropColumnParams),
willThrow(TableNotFoundException.class));
+ assertThat(manager.dropColumn(dropColumnParams),
willThrow(TableNotFoundException.class));
}
@Test
public void testDropIndexedColumn() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
- assertThat(service.createIndex(simpleIndex(INDEX_NAME, TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createIndex(simpleIndex(INDEX_NAME, TABLE_NAME)),
willBe(nullValue()));
// Try to drop indexed column
AlterTableDropColumnParams params =
AlterTableDropColumnParams.builder()
@@ -476,7 +481,7 @@ public class CatalogServiceSelfTest {
.columns(Set.of("VAL"))
.build();
- assertThat(service.dropColumn(params), willThrow(SqlException.class,
+ assertThat(manager.dropColumn(params), willThrow(SqlException.class,
"Can't drop indexed column: [columnName=VAL,
indexName=myIndex]"));
// Try to drop PK column
@@ -486,13 +491,13 @@ public class CatalogServiceSelfTest {
.columns(Set.of("ID"))
.build();
- assertThat(service.dropColumn(params), willThrow(SqlException.class,
"Can't drop primary key column: [name=ID]"));
+ assertThat(manager.dropColumn(params), willThrow(SqlException.class,
"Can't drop primary key column: [name=ID]"));
// Validate actual catalog
- CatalogSchemaDescriptor schema = service.activeSchema(clock.nowLong());
+ CatalogSchemaDescriptor schema = manager.activeSchema(clock.nowLong());
assertNotNull(schema);
assertNotNull(schema.table(TABLE_NAME));
- assertSame(service.schema(2), schema);
+ assertSame(manager.schema(2), schema);
assertNotNull(schema.table(TABLE_NAME).column("ID"));
assertNotNull(schema.table(TABLE_NAME).column("VAL"));
@@ -500,7 +505,7 @@ public class CatalogServiceSelfTest {
@Test
public void testAddDropMultipleColumns() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
// Add duplicate column.
AlterTableAddColumnParams addColumnParams =
AlterTableAddColumnParams.builder()
@@ -512,10 +517,10 @@ public class CatalogServiceSelfTest {
))
.build();
- assertThat(service.addColumn(addColumnParams),
willThrow(ColumnAlreadyExistsException.class));
+ assertThat(manager.addColumn(addColumnParams),
willThrow(ColumnAlreadyExistsException.class));
// Validate no column added.
- CatalogSchemaDescriptor schema = service.activeSchema(clock.nowLong());
+ CatalogSchemaDescriptor schema = manager.activeSchema(clock.nowLong());
assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME));
@@ -529,10 +534,10 @@ public class CatalogServiceSelfTest {
))
.build();
- assertThat(service.addColumn(addColumnParams), willBe(nullValue()));
+ assertThat(manager.addColumn(addColumnParams), willBe(nullValue()));
// Validate both columns added.
- schema = service.activeSchema(clock.nowLong());
+ schema = manager.activeSchema(clock.nowLong());
assertNotNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME));
assertNotNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME_2));
@@ -544,10 +549,10 @@ public class CatalogServiceSelfTest {
.columns(Set.of(NEW_COLUMN_NAME, NEW_COLUMN_NAME_2))
.build();
- assertThat(service.dropColumn(dropColumnParams), willBe(nullValue()));
+ assertThat(manager.dropColumn(dropColumnParams), willBe(nullValue()));
// Validate both columns dropped.
- schema = service.activeSchema(clock.nowLong());
+ schema = manager.activeSchema(clock.nowLong());
assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME));
assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME_2));
@@ -559,10 +564,10 @@ public class CatalogServiceSelfTest {
.columns(Set.of(NEW_COLUMN_NAME, "VAL"))
.build();
- assertThat(service.dropColumn(dropColumnParams),
willThrow(ColumnNotFoundException.class));
+ assertThat(manager.dropColumn(dropColumnParams),
willThrow(ColumnNotFoundException.class));
// Validate no column dropped.
- schema = service.activeSchema(clock.nowLong());
+ schema = manager.activeSchema(clock.nowLong());
assertNotNull(schema.table(TABLE_NAME).column("VAL"));
}
@@ -574,36 +579,36 @@ public class CatalogServiceSelfTest {
*/
@Test
public void testAlterColumnDefault() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
int schemaVer = 1;
- assertNotNull(service.schema(schemaVer));
- assertNull(service.schema(schemaVer + 1));
+ assertNotNull(manager.schema(schemaVer));
+ assertNull(manager.schema(schemaVer + 1));
// NULL-> NULL : No-op.
assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () ->
DefaultValue.constant(null)),
willBe(nullValue()));
- assertNull(service.schema(schemaVer + 1));
+ assertNull(manager.schema(schemaVer + 1));
// NULL -> 1 : Ok.
assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () ->
DefaultValue.constant(1)),
willBe(nullValue()));
- assertNotNull(service.schema(++schemaVer));
+ assertNotNull(manager.schema(++schemaVer));
// 1 -> 1 : No-op.
assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () ->
DefaultValue.constant(1)),
willBe(nullValue()));
- assertNull(service.schema(schemaVer + 1));
+ assertNull(manager.schema(schemaVer + 1));
// 1 -> 2 : Ok.
assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () ->
DefaultValue.constant(2)),
willBe(nullValue()));
- assertNotNull(service.schema(++schemaVer));
+ assertNotNull(manager.schema(++schemaVer));
// 2 -> NULL : Ok.
assertThat(changeColumn(TABLE_NAME, "VAL", null, null, () ->
DefaultValue.constant(null)),
willBe(nullValue()));
- assertNotNull(service.schema(++schemaVer));
+ assertNotNull(manager.schema(++schemaVer));
}
/**
@@ -616,21 +621,21 @@ public class CatalogServiceSelfTest {
*/
@Test
public void testAlterColumnNotNull() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
int schemaVer = 1;
- assertNotNull(service.schema(schemaVer));
- assertNull(service.schema(schemaVer + 1));
+ assertNotNull(manager.schema(schemaVer));
+ assertNull(manager.schema(schemaVer + 1));
// NULLABLE -> NULLABLE : No-op.
// NOT NULL -> NOT NULL : No-op.
assertThat(changeColumn(TABLE_NAME, "VAL", null, false, null),
willBe(nullValue()));
assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", null, true, null),
willBe(nullValue()));
- assertNull(service.schema(schemaVer + 1));
+ assertNull(manager.schema(schemaVer + 1));
// NOT NULL -> NULlABLE : Ok.
assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", null, false,
null), willBe(nullValue()));
- assertNotNull(service.schema(++schemaVer));
+ assertNotNull(manager.schema(++schemaVer));
// DROP NOT NULL for PK : PK column can't be `null`.
assertThat(changeColumn(TABLE_NAME, "ID", null, false, null),
@@ -642,7 +647,7 @@ public class CatalogServiceSelfTest {
assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", null, true, null),
willThrowFast(SqlException.class, "Cannot set NOT NULL for
column 'VAL_NOT_NULL'."));
- assertNull(service.schema(schemaVer + 1));
+ assertNull(manager.schema(schemaVer + 1));
}
/**
@@ -658,32 +663,32 @@ public class CatalogServiceSelfTest {
ColumnParams pkCol =
ColumnParams.builder().name("ID").type(ColumnType.INT32).build();
ColumnParams col =
ColumnParams.builder().name("COL_DECIMAL").type(ColumnType.DECIMAL).precision(10).build();
- assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col))), willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col))), willBe(nullValue()));
int schemaVer = 1;
- assertNotNull(service.schema(schemaVer));
- assertNull(service.schema(schemaVer + 1));
+ assertNotNull(manager.schema(schemaVer));
+ assertNull(manager.schema(schemaVer + 1));
// 10 ->11 : Ok.
assertThat(
changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type(), 11, null, null), null, null),
willBe(nullValue())
);
- assertNotNull(service.schema(++schemaVer));
+ assertNotNull(manager.schema(++schemaVer));
// No change.
assertThat(
changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type(), 11, null, null), null, null),
willBe(nullValue())
);
- assertNull(service.schema(schemaVer + 1));
+ assertNull(manager.schema(schemaVer + 1));
// 11 -> 10 : Forbidden because this change lead to incompatible
schemas.
assertThat(
changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type(), 10, null, null), null, null),
willThrowFast(SqlException.class, "Cannot decrease precision
to 10 for column '" + col.name() + "'.")
);
- assertNull(service.schema(schemaVer + 1));
+ assertNull(manager.schema(schemaVer + 1));
}
/**
@@ -696,11 +701,11 @@ public class CatalogServiceSelfTest {
ColumnParams col =
ColumnParams.builder().name("COL").type(type).build();
ColumnParams colWithPrecision =
ColumnParams.builder().name("COL_PRECISION").type(type).precision(3).build();
- assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col, colWithPrecision))), willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col, colWithPrecision))), willBe(nullValue()));
int schemaVer = 1;
- assertNotNull(service.schema(schemaVer));
- assertNull(service.schema(schemaVer + 1));
+ assertNotNull(manager.schema(schemaVer));
+ assertNull(manager.schema(schemaVer + 1));
assertThat(changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(type, 3, null, null), null, null),
willThrowFast(SqlException.class, "Cannot change precision for
column '" + col.name() + "'"));
@@ -714,7 +719,7 @@ public class CatalogServiceSelfTest {
assertThat(changeColumn(TABLE_NAME, colWithPrecision.name(), new
TestColumnTypeParams(type, 4, null, null), null, null),
willThrowFast(SqlException.class, "Cannot change precision for
column '" + colWithPrecision.name() + "'"));
- assertNull(service.schema(schemaVer + 1));
+ assertNull(manager.schema(schemaVer + 1));
}
/**
@@ -731,11 +736,11 @@ public class CatalogServiceSelfTest {
ColumnParams pkCol =
ColumnParams.builder().name("ID").type(ColumnType.INT32).build();
ColumnParams col = ColumnParams.builder().name("COL_" +
type).length(10).type(type).build();
- assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col))), willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col))), willBe(nullValue()));
int schemaVer = 1;
- assertNotNull(service.schema(schemaVer));
- assertNull(service.schema(schemaVer + 1));
+ assertNotNull(manager.schema(schemaVer));
+ assertNull(manager.schema(schemaVer + 1));
// 10 -> 11 : Ok.
assertThat(
@@ -743,7 +748,7 @@ public class CatalogServiceSelfTest {
willBe(nullValue())
);
- CatalogSchemaDescriptor schema = service.schema(++schemaVer);
+ CatalogSchemaDescriptor schema = manager.schema(++schemaVer);
assertNotNull(schema);
// 11 -> 10 : Error.
@@ -751,26 +756,26 @@ public class CatalogServiceSelfTest {
changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type(), null, 10, null), null, null),
willThrowFast(SqlException.class, "Cannot decrease length to
10 for column '" + col.name() + "'.")
);
- assertNull(service.schema(schemaVer + 1));
+ assertNull(manager.schema(schemaVer + 1));
// 11 -> 11 : No-op.
assertThat(
changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type(), null, 11, null), null, null),
willBe(nullValue())
);
- assertNull(service.schema(schemaVer + 1));
+ assertNull(manager.schema(schemaVer + 1));
// No change.
assertThat(changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type()), null, null),
willBe(nullValue()));
- assertNull(service.schema(schemaVer + 1));
+ assertNull(manager.schema(schemaVer + 1));
// 11 -> 10 : failed.
assertThat(
changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type(), null, 10, null), null, null),
willThrowFast(SqlException.class)
);
- assertNull(service.schema(schemaVer + 1));
+ assertNull(manager.schema(schemaVer + 1));
}
/**
@@ -783,11 +788,11 @@ public class CatalogServiceSelfTest {
ColumnParams col =
ColumnParams.builder().name("COL").type(type).build();
ColumnParams colWithLength =
ColumnParams.builder().name("COL_PRECISION").type(type).length(10).build();
- assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col, colWithLength))), willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col, colWithLength))), willBe(nullValue()));
int schemaVer = 1;
- assertNotNull(service.schema(schemaVer));
- assertNull(service.schema(schemaVer + 1));
+ assertNotNull(manager.schema(schemaVer));
+ assertNull(manager.schema(schemaVer + 1));
assertThat(changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(type, null, 10, null), null, null),
willThrowFast(SqlException.class, "Cannot change length for
column '" + col.name() + "'"));
@@ -801,7 +806,7 @@ public class CatalogServiceSelfTest {
assertThat(changeColumn(TABLE_NAME, colWithLength.name(), new
TestColumnTypeParams(type, null, 11, null), null, null),
willThrowFast(SqlException.class, "Cannot change length for
column '" + colWithLength.name() + "'"));
- assertNull(service.schema(schemaVer + 1));
+ assertNull(manager.schema(schemaVer + 1));
}
/**
@@ -812,31 +817,31 @@ public class CatalogServiceSelfTest {
public void testAlterColumnTypeScaleIsRejected(ColumnType type) {
ColumnParams pkCol =
ColumnParams.builder().name("ID").type(ColumnType.INT32).build();
ColumnParams col = ColumnParams.builder().name("COL_" +
type).type(type).scale(3).build();
- assertThat(service.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col))), willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME, List.of(pkCol,
col))), willBe(nullValue()));
int schemaVer = 1;
- assertNotNull(service.schema(schemaVer));
- assertNull(service.schema(schemaVer + 1));
+ assertNotNull(manager.schema(schemaVer));
+ assertNull(manager.schema(schemaVer + 1));
// ANY-> UNDEFINED SCALE : No-op.
assertThat(changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type()), null, null),
willBe(nullValue()));
- assertNull(service.schema(schemaVer + 1));
+ assertNull(manager.schema(schemaVer + 1));
// 3 -> 3 : No-op.
assertThat(changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type(), null, null, 3), null, null),
willBe(nullValue()));
- assertNull(service.schema(schemaVer + 1));
+ assertNull(manager.schema(schemaVer + 1));
// 3 -> 4 : Error.
assertThat(changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type(), null, null, 4), null, null),
willThrowFast(SqlException.class, "Cannot change scale for
column '" + col.name() + "'."));
- assertNull(service.schema(schemaVer + 1));
+ assertNull(manager.schema(schemaVer + 1));
// 3 -> 2 : Error.
assertThat(changeColumn(TABLE_NAME, col.name(), new
TestColumnTypeParams(col.type(), null, null, 2), null, null),
willThrowFast(SqlException.class, "Cannot change scale for
column '" + col.name() + "'."));
- assertNull(service.schema(schemaVer + 1));
+ assertNull(manager.schema(schemaVer + 1));
}
/**
@@ -864,11 +869,11 @@ public class CatalogServiceSelfTest {
CreateTableParams createTableParams = simpleTable(TABLE_NAME,
tableColumns);
- assertThat(service.createTable(createTableParams),
willBe(nullValue()));
+ assertThat(manager.createTable(createTableParams),
willBe(nullValue()));
int schemaVer = 1;
- assertNotNull(service.schema(schemaVer));
- assertNull(service.schema(schemaVer + 1));
+ assertNotNull(manager.schema(schemaVer));
+ assertNull(manager.schema(schemaVer + 1));
for (ColumnParams col : testColumns) {
TypeSafeMatcher<CompletableFuture<?>> matcher;
@@ -885,14 +890,14 @@ public class CatalogServiceSelfTest {
TestColumnTypeParams tyoeParams = new TestColumnTypeParams(target);
assertThat(col.type() + " -> " + target, changeColumn(TABLE_NAME,
col.name(), tyoeParams, null, null), matcher);
- assertNotNull(service.schema(schemaVer));
- assertNull(service.schema(schemaVer + 1));
+ assertNotNull(manager.schema(schemaVer));
+ assertNull(manager.schema(schemaVer + 1));
}
}
@Test
public void testAlterColumnTypeRejectedForPrimaryKey() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
assertThat(changeColumn(TABLE_NAME, "ID", new
TestColumnTypeParams(ColumnType.INT64), null, null),
willThrowFast(SqlException.class, "Cannot change data type for
primary key column 'ID'."));
@@ -904,11 +909,11 @@ public class CatalogServiceSelfTest {
*/
@Test
public void testAlterColumnMultipleChanges() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
int schemaVer = 1;
- assertNotNull(service.schema(schemaVer));
- assertNull(service.schema(schemaVer + 1));
+ assertNotNull(manager.schema(schemaVer));
+ assertNull(manager.schema(schemaVer + 1));
Supplier<DefaultValue> dflt = () -> DefaultValue.constant(null);
boolean notNull = false;
@@ -917,7 +922,7 @@ public class CatalogServiceSelfTest {
// Ensures that 3 different actions applied.
assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams,
notNull, dflt), willBe(nullValue()));
- CatalogSchemaDescriptor schema = service.schema(++schemaVer);
+ CatalogSchemaDescriptor schema = manager.schema(++schemaVer);
assertNotNull(schema);
CatalogTableColumnDescriptor desc =
schema.table(TABLE_NAME).column("VAL_NOT_NULL");
@@ -929,24 +934,24 @@ public class CatalogServiceSelfTest {
dflt = () -> DefaultValue.constant(2);
assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams,
notNull, dflt), willBe(nullValue()));
- schema = service.schema(++schemaVer);
+ schema = manager.schema(++schemaVer);
assertNotNull(schema);
assertEquals(DefaultValue.constant(2),
schema.table(TABLE_NAME).column("VAL_NOT_NULL").defaultValue());
// Ensures that no action will be applied.
assertThat(changeColumn(TABLE_NAME, "VAL_NOT_NULL", typeParams,
notNull, dflt), willBe(nullValue()));
- assertNull(service.schema(schemaVer + 1));
+ assertNull(manager.schema(schemaVer + 1));
}
@Test
public void testAlterColumnForNonExistingTableRejected() {
- assertNotNull(service.schema(0));
- assertNull(service.schema(1));
+ assertNotNull(manager.schema(0));
+ assertNull(manager.schema(1));
assertThat(changeColumn(TABLE_NAME, "ID", null, null, null),
willThrowFast(TableNotFoundException.class));
- assertNotNull(service.schema(0));
- assertNull(service.schema(1));
+ assertNotNull(manager.schema(0));
+ assertNull(manager.schema(1));
}
@Test
@@ -958,49 +963,49 @@ public class CatalogServiceSelfTest {
.columns(List.of("VAL"))
.build();
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
- assertThat(service.createIndex(params), willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createIndex(params), willBe(nullValue()));
long beforeDropTimestamp = clock.nowLong();
DropTableParams dropTableParams =
DropTableParams.builder().schemaName(SCHEMA_NAME).tableName(TABLE_NAME).build();
- assertThat(service.dropTable(dropTableParams), willBe(nullValue()));
+ assertThat(manager.dropTable(dropTableParams), willBe(nullValue()));
// Validate catalog version from the past.
- CatalogSchemaDescriptor schema = service.schema(2);
+ CatalogSchemaDescriptor schema = manager.schema(2);
CatalogTableDescriptor table = schema.table(TABLE_NAME);
CatalogIndexDescriptor index = schema.index(INDEX_NAME);
assertNotNull(schema);
assertEquals(SCHEMA_NAME, schema.name());
- assertSame(schema, service.activeSchema(beforeDropTimestamp));
+ assertSame(schema, manager.activeSchema(beforeDropTimestamp));
- assertSame(table, service.table(TABLE_NAME, beforeDropTimestamp));
- assertSame(table, service.table(table.id(), beforeDropTimestamp));
+ assertSame(table, manager.table(TABLE_NAME, beforeDropTimestamp));
+ assertSame(table, manager.table(table.id(), beforeDropTimestamp));
- assertSame(index, service.index(INDEX_NAME, beforeDropTimestamp));
- assertSame(index, service.index(index.id(), beforeDropTimestamp));
+ assertSame(index, manager.index(INDEX_NAME, beforeDropTimestamp));
+ assertSame(index, manager.index(index.id(), beforeDropTimestamp));
// Validate actual catalog
- schema = service.schema(3);
+ schema = manager.schema(3);
assertNotNull(schema);
assertEquals(SCHEMA_NAME, schema.name());
- assertSame(schema, service.activeSchema(clock.nowLong()));
+ assertSame(schema, manager.activeSchema(clock.nowLong()));
assertNull(schema.table(TABLE_NAME));
- assertNull(service.table(TABLE_NAME, clock.nowLong()));
- assertNull(service.table(table.id(), clock.nowLong()));
+ assertNull(manager.table(TABLE_NAME, clock.nowLong()));
+ assertNull(manager.table(table.id(), clock.nowLong()));
assertNull(schema.index(INDEX_NAME));
- assertNull(service.index(INDEX_NAME, clock.nowLong()));
- assertNull(service.index(index.id(), clock.nowLong()));
+ assertNull(manager.index(INDEX_NAME, clock.nowLong()));
+ assertNull(manager.index(index.id(), clock.nowLong()));
}
@Test
public void testCreateHashIndex() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
CreateHashIndexParams params = CreateHashIndexParams.builder()
.schemaName(SCHEMA_NAME)
@@ -1009,25 +1014,26 @@ public class CatalogServiceSelfTest {
.columns(List.of("VAL", "ID"))
.build();
- assertThat(service.createIndex(params), willBe(nullValue()));
+ assertThat(manager.createIndex(params), willBe(nullValue()));
// Validate catalog version from the past.
- CatalogSchemaDescriptor schema = service.schema(1);
+ CatalogSchemaDescriptor schema = manager.schema(1);
assertNotNull(schema);
assertNull(schema.index(INDEX_NAME));
- assertNull(service.index(INDEX_NAME, 123L));
+ assertNull(manager.index(INDEX_NAME, 123L));
// Validate actual catalog
- schema = service.schema(2);
+ schema = manager.schema(2);
CatalogHashIndexDescriptor index = (CatalogHashIndexDescriptor)
schema.index(INDEX_NAME);
assertNotNull(schema);
- assertSame(index, service.index(INDEX_NAME, clock.nowLong()));
- assertSame(index, service.index(index.id(), clock.nowLong()));
+ assertSame(index, manager.index(INDEX_NAME, clock.nowLong()));
+ assertSame(index, manager.index(index.id(), clock.nowLong()));
// Validate newly created hash index
+ assertEquals(4L, index.id());
assertEquals(INDEX_NAME, index.name());
assertEquals(schema.table(TABLE_NAME).id(), index.tableId());
assertEquals(List.of("VAL", "ID"), index.columns());
@@ -1037,7 +1043,7 @@ public class CatalogServiceSelfTest {
@Test
public void testCreateSortedIndex() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
CreateSortedIndexParams params = CreateSortedIndexParams.builder()
.schemaName(SCHEMA_NAME)
@@ -1048,26 +1054,27 @@ public class CatalogServiceSelfTest {
.collations(List.of(CatalogColumnCollation.DESC_NULLS_FIRST,
CatalogColumnCollation.ASC_NULLS_LAST))
.build();
- assertThat(service.createIndex(params), willBe(nullValue()));
+ assertThat(manager.createIndex(params), willBe(nullValue()));
// Validate catalog version from the past.
- CatalogSchemaDescriptor schema = service.schema(1);
+ CatalogSchemaDescriptor schema = manager.schema(1);
assertNotNull(schema);
assertNull(schema.index(INDEX_NAME));
- assertNull(service.index(INDEX_NAME, 123L));
- assertNull(service.index(4, 123L));
+ assertNull(manager.index(INDEX_NAME, 123L));
+ assertNull(manager.index(4, 123L));
// Validate actual catalog
- schema = service.schema(2);
+ schema = manager.schema(2);
CatalogSortedIndexDescriptor index = (CatalogSortedIndexDescriptor)
schema.index(INDEX_NAME);
assertNotNull(schema);
- assertSame(index, service.index(INDEX_NAME, clock.nowLong()));
- assertSame(index, service.index(index.id(), clock.nowLong()));
+ assertSame(index, manager.index(INDEX_NAME, clock.nowLong()));
+ assertSame(index, manager.index(index.id(), clock.nowLong()));
// Validate newly created sorted index
+ assertEquals(4L, index.id());
assertEquals(INDEX_NAME, index.name());
assertEquals(schema.table(TABLE_NAME).id(), index.tableId());
assertEquals("VAL", index.columns().get(0).name());
@@ -1080,7 +1087,7 @@ public class CatalogServiceSelfTest {
@Test
public void testCreateIndexWithSameName() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
CreateHashIndexParams params = CreateHashIndexParams.builder()
.indexName(INDEX_NAME)
@@ -1088,13 +1095,13 @@ public class CatalogServiceSelfTest {
.columns(List.of("VAL"))
.build();
- assertThat(service.createIndex(params), willBe(nullValue()));
- assertThat(service.createIndex(params),
willThrow(IndexAlreadyExistsException.class));
+ assertThat(manager.createIndex(params), willBe(nullValue()));
+ assertThat(manager.createIndex(params),
willThrow(IndexAlreadyExistsException.class));
}
@Test
public void testCreateIndexOnDuplicateColumns() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
CreateHashIndexParams params = CreateHashIndexParams.builder()
.indexName(INDEX_NAME)
@@ -1102,7 +1109,7 @@ public class CatalogServiceSelfTest {
.columns(List.of("VAL", "VAL"))
.build();
- assertThat(service.createIndex(params),
+ assertThat(manager.createIndex(params),
willThrow(IgniteInternalException.class, "Can't create index
on duplicate columns: VAL, VAL"));
}
@@ -1114,13 +1121,13 @@ public class CatalogServiceSelfTest {
doNothing().when(updateLogMock).registerUpdateHandler(updateHandlerCapture.capture());
- CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock,
clockWaiter);
- service.start();
+ CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock,
clockWaiter);
+ manager.start();
when(updateLogMock.append(any())).thenAnswer(invocation -> {
// here we emulate concurrent updates. First of all, we return a
future completed with "false"
- // as if someone has concurrently appended an update. Besides, in
order to unblock service and allow to
- // make another attempt, we must notify service with the same
version as in current attempt.
+ // as if someone has concurrently appended an update. Besides, in
order to unblock manager and allow to
+ // make another attempt, we must notify manager with the same
version as in current attempt.
VersionedUpdate updateFromInvocation = invocation.getArgument(0,
VersionedUpdate.class);
VersionedUpdate update = new VersionedUpdate(
@@ -1134,7 +1141,7 @@ public class CatalogServiceSelfTest {
return completedFuture(false);
});
- CompletableFuture<Void> createTableFut =
service.createTable(simpleTable("T"));
+ CompletableFuture<Void> createTableFut =
manager.createTable(simpleTable("T"));
assertThat(createTableFut, willThrow(IgniteInternalException.class,
"Max retry limit exceeded"));
@@ -1146,9 +1153,9 @@ public class CatalogServiceSelfTest {
public void catalogActivationTime() throws Exception {
final long delayDuration = TimeUnit.DAYS.toMillis(365);
- CatalogServiceImpl service = new CatalogServiceImpl(updateLog,
clockWaiter, delayDuration);
+ CatalogManagerImpl manager = new CatalogManagerImpl(updateLog,
clockWaiter, delayDuration);
- service.start();
+ manager.start();
try {
CreateTableParams params = CreateTableParams.builder()
@@ -1161,7 +1168,7 @@ public class CatalogServiceSelfTest {
.primaryKeyColumns(List.of("key"))
.build();
- service.createTable(params);
+ manager.createTable(params);
verify(updateLog).append(any());
// TODO IGNITE-19400: recheck createTable future completion
guarantees
@@ -1169,15 +1176,15 @@ public class CatalogServiceSelfTest {
// This waits till the new Catalog version lands in the internal
structures.
verify(clockWaiter, timeout(10_000)).waitFor(any());
- assertSame(service.schema(0),
service.activeSchema(clock.nowLong()));
- assertNull(service.table(TABLE_NAME, clock.nowLong()));
+ assertSame(manager.schema(0),
manager.activeSchema(clock.nowLong()));
+ assertNull(manager.table(TABLE_NAME, clock.nowLong()));
clock.update(clock.now().addPhysicalTime(delayDuration));
- assertSame(service.schema(1),
service.activeSchema(clock.nowLong()));
- assertNotNull(service.table(TABLE_NAME, clock.nowLong()));
+ assertSame(manager.schema(1),
manager.activeSchema(clock.nowLong()));
+ assertNotNull(manager.table(TABLE_NAME, clock.nowLong()));
} finally {
- service.stop();
+ manager.stop();
}
}
@@ -1185,13 +1192,13 @@ public class CatalogServiceSelfTest {
public void catalogServiceManagesUpdateLogLifecycle() throws Exception {
UpdateLog updateLogMock = mock(UpdateLog.class);
- CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock,
clockWaiter);
+ CatalogManagerImpl manager = new CatalogManagerImpl(updateLogMock,
clockWaiter);
- service.start();
+ manager.start();
verify(updateLogMock).start();
- service.stop();
+ manager.stop();
verify(updateLogMock).stop();
}
@@ -1216,13 +1223,13 @@ public class CatalogServiceSelfTest {
EventListener<CatalogEventParameters> eventListener =
mock(EventListener.class);
when(eventListener.notify(any(),
any())).thenReturn(completedFuture(false));
- service.listen(CatalogEvent.TABLE_CREATE, eventListener);
- service.listen(CatalogEvent.TABLE_DROP, eventListener);
+ manager.listen(CatalogEvent.TABLE_CREATE, eventListener);
+ manager.listen(CatalogEvent.TABLE_DROP, eventListener);
- assertThat(service.createTable(createTableParams),
willBe(nullValue()));
+ assertThat(manager.createTable(createTableParams),
willBe(nullValue()));
verify(eventListener).notify(any(CreateTableEventParameters.class),
isNull());
- assertThat(service.dropTable(dropTableparams), willBe(nullValue()));
+ assertThat(manager.dropTable(dropTableparams), willBe(nullValue()));
verify(eventListener).notify(any(DropTableEventParameters.class),
isNull());
verifyNoMoreInteractions(eventListener);
@@ -1257,36 +1264,36 @@ public class CatalogServiceSelfTest {
EventListener<CatalogEventParameters> eventListener =
mock(EventListener.class);
when(eventListener.notify(any(),
any())).thenReturn(completedFuture(false));
- service.listen(CatalogEvent.INDEX_CREATE, eventListener);
- service.listen(CatalogEvent.INDEX_DROP, eventListener);
+ manager.listen(CatalogEvent.INDEX_CREATE, eventListener);
+ manager.listen(CatalogEvent.INDEX_DROP, eventListener);
// Try to create index without table.
- assertThat(service.createIndex(createIndexParams),
willThrow(TableNotFoundException.class));
+ assertThat(manager.createIndex(createIndexParams),
willThrow(TableNotFoundException.class));
verifyNoInteractions(eventListener);
// Create table with PK index.
- assertThat(service.createTable(createTableParams),
willCompleteSuccessfully());
+ assertThat(manager.createTable(createTableParams),
willCompleteSuccessfully());
verify(eventListener).notify(any(CreateIndexEventParameters.class),
isNull());
clearInvocations(eventListener);
// Create index.
- assertThat(service.createIndex(createIndexParams),
willCompleteSuccessfully());
+ assertThat(manager.createIndex(createIndexParams),
willCompleteSuccessfully());
verify(eventListener).notify(any(CreateIndexEventParameters.class),
isNull());
clearInvocations(eventListener);
// Drop index.
- assertThat(service.dropIndex(dropIndexParams), willBe(nullValue()));
+ assertThat(manager.dropIndex(dropIndexParams), willBe(nullValue()));
verify(eventListener).notify(any(DropIndexEventParameters.class),
isNull());
clearInvocations(eventListener);
// Drop table with pk index.
- assertThat(service.dropTable(dropTableparams), willBe(nullValue()));
+ assertThat(manager.dropTable(dropTableparams), willBe(nullValue()));
// Try drop index once again.
- assertThat(service.dropIndex(dropIndexParams),
willThrow(IndexNotFoundException.class));
+ assertThat(manager.dropIndex(dropIndexParams),
willThrow(IndexNotFoundException.class));
verify(eventListener).notify(any(DropIndexEventParameters.class),
isNull());
}
@@ -1303,19 +1310,19 @@ public class CatalogServiceSelfTest {
.filter("expression")
.build();
- assertThat(service.createDistributionZone(params),
willCompleteSuccessfully());
+ assertThat(manager.createDistributionZone(params),
willCompleteSuccessfully());
// Validate catalog version from the past.
- assertNull(service.zone(zoneName, 0));
- assertNull(service.zone(2, 0));
- assertNull(service.zone(zoneName, 123L));
- assertNull(service.zone(2, 123L));
+ assertNull(manager.zone(zoneName, 0));
+ assertNull(manager.zone(2, 0));
+ assertNull(manager.zone(zoneName, 123L));
+ assertNull(manager.zone(2, 123L));
// Validate actual catalog
- CatalogZoneDescriptor zone = service.zone(zoneName, clock.nowLong());
+ CatalogZoneDescriptor zone = manager.zone(zoneName, clock.nowLong());
assertNotNull(zone);
- assertSame(zone, service.zone(zone.id(), clock.nowLong()));
+ assertSame(zone, manager.zone(zone.id(), clock.nowLong()));
// Validate newly created zone
assertEquals(zoneName, zone.name());
@@ -1335,7 +1342,7 @@ public class CatalogServiceSelfTest {
.zoneName(zoneName)
.build();
- assertThat(service.createDistributionZone(createZoneParams),
willCompleteSuccessfully());
+ assertThat(manager.createDistributionZone(createZoneParams),
willCompleteSuccessfully());
long beforeDropTimestamp = clock.nowLong();
@@ -1343,24 +1350,24 @@ public class CatalogServiceSelfTest {
.zoneName(zoneName)
.build();
- CompletableFuture<Void> fut = service.dropDistributionZone(params);
+ CompletableFuture<Void> fut = manager.dropDistributionZone(params);
assertThat(fut, willCompleteSuccessfully());
// Validate catalog version from the past.
- CatalogZoneDescriptor zone = service.zone(zoneName,
beforeDropTimestamp);
+ CatalogZoneDescriptor zone = manager.zone(zoneName,
beforeDropTimestamp);
assertNotNull(zone);
assertEquals(zoneName, zone.name());
- assertSame(zone, service.zone(zone.id(), beforeDropTimestamp));
+ assertSame(zone, manager.zone(zone.id(), beforeDropTimestamp));
// Validate actual catalog
- assertNull(service.zone(zoneName, clock.nowLong()));
- assertNull(service.zone(zone.id(), clock.nowLong()));
+ assertNull(manager.zone(zoneName, clock.nowLong()));
+ assertNull(manager.zone(zone.id(), clock.nowLong()));
// Try to drop non-existing zone.
- assertThat(service.dropDistributionZone(params),
willThrow(DistributionZoneNotFoundException.class));
+ assertThat(manager.dropDistributionZone(params),
willThrow(DistributionZoneNotFoundException.class));
}
@Test
@@ -1373,7 +1380,7 @@ public class CatalogServiceSelfTest {
.replicas(15)
.build();
- assertThat(service.createDistributionZone(createParams),
willCompleteSuccessfully());
+ assertThat(manager.createDistributionZone(createParams),
willCompleteSuccessfully());
long beforeDropTimestamp = clock.nowLong();
@@ -1386,29 +1393,29 @@ public class CatalogServiceSelfTest {
.newZoneName(newZoneName)
.build();
- assertThat(service.renameDistributionZone(renameZoneParams),
willCompleteSuccessfully());
+ assertThat(manager.renameDistributionZone(renameZoneParams),
willCompleteSuccessfully());
// Validate catalog version from the past.
- CatalogZoneDescriptor zone = service.zone(zoneName,
beforeDropTimestamp);
+ CatalogZoneDescriptor zone = manager.zone(zoneName,
beforeDropTimestamp);
assertNotNull(zone);
assertEquals(zoneName, zone.name());
- assertSame(zone, service.zone(zone.id(), beforeDropTimestamp));
+ assertSame(zone, manager.zone(zone.id(), beforeDropTimestamp));
// Validate actual catalog
- zone = service.zone(newZoneName, clock.nowLong());
+ zone = manager.zone(newZoneName, clock.nowLong());
assertNotNull(zone);
- assertNull(service.zone(zoneName, clock.nowLong()));
+ assertNull(manager.zone(zoneName, clock.nowLong()));
assertEquals(newZoneName, zone.name());
- assertSame(zone, service.zone(zone.id(), clock.nowLong()));
+ assertSame(zone, manager.zone(zone.id(), clock.nowLong()));
}
@Test
public void testDefaultZone() {
- CatalogZoneDescriptor defaultZone = service.zone(DEFAULT_ZONE_NAME,
clock.nowLong());
+ CatalogZoneDescriptor defaultZone = manager.zone(DEFAULT_ZONE_NAME,
clock.nowLong());
// Try to create zone with default zone name.
CreateZoneParams createParams = CreateZoneParams.builder()
@@ -1416,10 +1423,10 @@ public class CatalogServiceSelfTest {
.partitions(42)
.replicas(15)
.build();
- assertThat(service.createDistributionZone(createParams),
willThrow(IgniteInternalException.class));
+ assertThat(manager.createDistributionZone(createParams),
willThrow(IgniteInternalException.class));
// Validate default zone wasn't changed.
- assertSame(defaultZone, service.zone(DEFAULT_ZONE_NAME,
clock.nowLong()));
+ assertSame(defaultZone, manager.zone(DEFAULT_ZONE_NAME,
clock.nowLong()));
// Try to rename default zone.
String newDefaultZoneName = "RenamedDefaultZone";
@@ -1428,20 +1435,20 @@ public class CatalogServiceSelfTest {
.zoneName(DEFAULT_ZONE_NAME)
.newZoneName(newDefaultZoneName)
.build();
- assertThat(service.renameDistributionZone(renameZoneParams),
willThrow(IgniteInternalException.class));
+ assertThat(manager.renameDistributionZone(renameZoneParams),
willThrow(IgniteInternalException.class));
// Validate default zone wasn't changed.
- assertNull(service.zone(newDefaultZoneName, clock.nowLong()));
- assertSame(defaultZone, service.zone(DEFAULT_ZONE_NAME,
clock.nowLong()));
+ assertNull(manager.zone(newDefaultZoneName, clock.nowLong()));
+ assertSame(defaultZone, manager.zone(DEFAULT_ZONE_NAME,
clock.nowLong()));
// Try to drop default zone.
DropZoneParams dropZoneParams = DropZoneParams.builder()
.zoneName(DEFAULT_ZONE_NAME)
.build();
- assertThat(service.dropDistributionZone(dropZoneParams),
willThrow(IgniteInternalException.class));
+ assertThat(manager.dropDistributionZone(dropZoneParams),
willThrow(IgniteInternalException.class));
// Validate default zone wasn't changed.
- assertSame(defaultZone, service.zone(DEFAULT_ZONE_NAME,
clock.nowLong()));
+ assertSame(defaultZone, manager.zone(DEFAULT_ZONE_NAME,
clock.nowLong()));
}
@Test
@@ -1465,13 +1472,13 @@ public class CatalogServiceSelfTest {
.filter("newExpression")
.build();
- assertThat(service.createDistributionZone(createParams),
willCompleteSuccessfully());
- assertThat(service.alterDistributionZone(alterZoneParams),
willCompleteSuccessfully());
+ assertThat(manager.createDistributionZone(createParams),
willCompleteSuccessfully());
+ assertThat(manager.alterDistributionZone(alterZoneParams),
willCompleteSuccessfully());
// Validate actual catalog
- CatalogZoneDescriptor zone = service.zone(zoneName, clock.nowLong());
+ CatalogZoneDescriptor zone = manager.zone(zoneName, clock.nowLong());
assertNotNull(zone);
- assertSame(zone, service.zone(zone.id(), clock.nowLong()));
+ assertSame(zone, manager.zone(zone.id(), clock.nowLong()));
assertEquals(zoneName, zone.name());
assertEquals(10, zone.partitions());
@@ -1492,7 +1499,7 @@ public class CatalogServiceSelfTest {
.replicas(15)
.build();
- assertThat(service.createDistributionZone(params),
willCompleteSuccessfully());
+ assertThat(manager.createDistributionZone(params),
willCompleteSuccessfully());
// Try to create zone with same name.
params = CreateZoneParams.builder()
@@ -1501,14 +1508,14 @@ public class CatalogServiceSelfTest {
.replicas(1)
.build();
- assertThat(service.createDistributionZone(params),
willThrowFast(DistributionZoneAlreadyExistsException.class));
+ assertThat(manager.createDistributionZone(params),
willThrowFast(DistributionZoneAlreadyExistsException.class));
// Validate zone was NOT changed
- CatalogZoneDescriptor zone = service.zone(zoneName, clock.nowLong());
+ CatalogZoneDescriptor zone = manager.zone(zoneName, clock.nowLong());
assertNotNull(zone);
- assertSame(zone, service.zone(zoneName, clock.nowLong()));
- assertSame(zone, service.zone(zone.id(), clock.nowLong()));
+ assertSame(zone, manager.zone(zoneName, clock.nowLong()));
+ assertSame(zone, manager.zone(zone.id(), clock.nowLong()));
assertEquals(zoneName, zone.name());
assertEquals(42, zone.partitions());
@@ -1530,16 +1537,16 @@ public class CatalogServiceSelfTest {
EventListener<CatalogEventParameters> eventListener =
mock(EventListener.class);
when(eventListener.notify(any(),
any())).thenReturn(completedFuture(false));
- service.listen(CatalogEvent.ZONE_CREATE, eventListener);
- service.listen(CatalogEvent.ZONE_DROP, eventListener);
+ manager.listen(CatalogEvent.ZONE_CREATE, eventListener);
+ manager.listen(CatalogEvent.ZONE_DROP, eventListener);
- CompletableFuture<Void> fut =
service.createDistributionZone(createZoneParams);
+ CompletableFuture<Void> fut =
manager.createDistributionZone(createZoneParams);
assertThat(fut, willCompleteSuccessfully());
verify(eventListener).notify(any(CreateZoneEventParameters.class),
isNull());
- fut = service.dropDistributionZone(dropZoneParams);
+ fut = manager.dropDistributionZone(dropZoneParams);
assertThat(fut, willCompleteSuccessfully());
@@ -1570,25 +1577,25 @@ public class CatalogServiceSelfTest {
EventListener<CatalogEventParameters> eventListener =
mock(EventListener.class);
when(eventListener.notify(any(),
any())).thenReturn(completedFuture(false));
- service.listen(CatalogEvent.TABLE_ALTER, eventListener);
+ manager.listen(CatalogEvent.TABLE_ALTER, eventListener);
// Try to add column without table.
- assertThat(service.addColumn(addColumnParams),
willThrow(TableNotFoundException.class));
+ assertThat(manager.addColumn(addColumnParams),
willThrow(TableNotFoundException.class));
verifyNoInteractions(eventListener);
// Create table.
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
// Add column.
- assertThat(service.addColumn(addColumnParams), willBe(nullValue()));
+ assertThat(manager.addColumn(addColumnParams), willBe(nullValue()));
verify(eventListener).notify(any(AddColumnEventParameters.class),
isNull());
// Drop column.
- assertThat(service.dropColumn(dropColumnParams), willBe(nullValue()));
+ assertThat(manager.dropColumn(dropColumnParams), willBe(nullValue()));
verify(eventListener).notify(any(DropColumnEventParameters.class),
isNull());
// Try drop column once again.
- assertThat(service.dropColumn(dropColumnParams),
willThrow(ColumnNotFoundException.class));
+ assertThat(manager.dropColumn(dropColumnParams),
willThrow(ColumnNotFoundException.class));
verifyNoMoreInteractions(eventListener);
}
@@ -1599,9 +1606,9 @@ public class CatalogServiceSelfTest {
HybridTimestamp startTs = clock.now();
- CatalogServiceImpl service = new CatalogServiceImpl(updateLog,
clockWaiter, delayDuration);
+ CatalogManagerImpl manager = new CatalogManagerImpl(updateLog,
clockWaiter, delayDuration);
- service.start();
+ manager.start();
try {
CreateTableParams params = CreateTableParams.builder()
@@ -1614,7 +1621,7 @@ public class CatalogServiceSelfTest {
.primaryKeyColumns(List.of("key"))
.build();
- CompletableFuture<Void> future = service.createTable(params);
+ CompletableFuture<Void> future = manager.createTable(params);
assertThat(future.isDone(), is(false));
@@ -1627,7 +1634,7 @@ public class CatalogServiceSelfTest {
greaterThanOrEqualTo(delayDuration +
HybridTimestamp.maxClockSkew())
);
} finally {
- service.stop();
+ manager.stop();
}
}
@@ -1635,9 +1642,9 @@ public class CatalogServiceSelfTest {
void testGetCatalogEntityInCatalogEvent() {
CompletableFuture<Void> result = new CompletableFuture<>();
- service.listen(CatalogEvent.TABLE_CREATE, (parameters, exception) -> {
+ manager.listen(CatalogEvent.TABLE_CREATE, (parameters, exception) -> {
try {
- assertNotNull(service.schema(parameters.catalogVersion()));
+ assertNotNull(manager.schema(parameters.catalogVersion()));
result.complete(null);
@@ -1649,26 +1656,26 @@ public class CatalogServiceSelfTest {
}
});
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
assertThat(result, willCompleteSuccessfully());
}
@Test
void testGetTableByIdAndCatalogVersion() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
- CatalogTableDescriptor table = service.table(TABLE_NAME,
clock.nowLong());
+ CatalogTableDescriptor table = manager.table(TABLE_NAME,
clock.nowLong());
- assertNull(service.table(table.id(), 0));
- assertSame(table, service.table(table.id(), 1));
+ assertNull(manager.table(table.id(), 0));
+ assertSame(table, manager.table(table.id(), 1));
}
@Test
void testGetTableIdOnDropIndexEvent() {
- assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
assertThat(
- service.createIndex(
+ manager.createIndex(
CreateHashIndexParams.builder()
.schemaName(SCHEMA_NAME)
.tableName(TABLE_NAME)
@@ -1679,9 +1686,9 @@ public class CatalogServiceSelfTest {
willBe(nullValue())
);
- int tableId = service.table(TABLE_NAME, clock.nowLong()).id();
- int pkIndexId = service.index(createPkIndexName(TABLE_NAME),
clock.nowLong()).id();
- int indexId = service.index(INDEX_NAME, clock.nowLong()).id();
+ int tableId = manager.table(TABLE_NAME, clock.nowLong()).id();
+ int pkIndexId = manager.index(createPkIndexName(TABLE_NAME),
clock.nowLong()).id();
+ int indexId = manager.index(INDEX_NAME, clock.nowLong()).id();
assertNotEquals(tableId, indexId);
@@ -1691,11 +1698,11 @@ public class CatalogServiceSelfTest {
doReturn(completedFuture(false)).when(eventListener).notify(captor.capture(),
any());
- service.listen(CatalogEvent.INDEX_DROP, eventListener);
+ manager.listen(CatalogEvent.INDEX_DROP, eventListener);
// Let's remove the index.
assertThat(
-
service.dropIndex(DropIndexParams.builder().schemaName(SCHEMA_NAME).indexName(INDEX_NAME).build()),
+
manager.dropIndex(DropIndexParams.builder().schemaName(SCHEMA_NAME).indexName(INDEX_NAME).build()),
willBe(nullValue())
);
@@ -1706,7 +1713,7 @@ public class CatalogServiceSelfTest {
// Let's delete the table.
assertThat(
-
service.dropTable(DropTableParams.builder().schemaName(SCHEMA_NAME).tableName(TABLE_NAME).build()),
+
manager.dropTable(DropTableParams.builder().schemaName(SCHEMA_NAME).tableName(TABLE_NAME).build()),
willBe(nullValue())
);
@@ -1721,7 +1728,7 @@ public class CatalogServiceSelfTest {
void testCreateTableErrors() {
// Table must have at least one column.
assertThat(
- service.createTable(
+ manager.createTable(
CreateTableParams.builder()
.schemaName(SCHEMA_NAME)
.zone(ZONE_NAME)
@@ -1735,7 +1742,7 @@ public class CatalogServiceSelfTest {
// Table must have PK columns.
assertThat(
- service.createTable(
+ manager.createTable(
CreateTableParams.builder()
.schemaName(SCHEMA_NAME)
.zone(ZONE_NAME)
@@ -1751,7 +1758,7 @@ public class CatalogServiceSelfTest {
// PK column must be a valid column
assertThat(
- service.createTable(
+ manager.createTable(
CreateTableParams.builder()
.schemaName(SCHEMA_NAME)
.zone(ZONE_NAME)
@@ -1767,7 +1774,7 @@ public class CatalogServiceSelfTest {
// Column names must be unique.
assertThat(
- service.createTable(
+ manager.createTable(
CreateTableParams.builder()
.schemaName(SCHEMA_NAME)
.tableName(TABLE_NAME)
@@ -1784,7 +1791,7 @@ public class CatalogServiceSelfTest {
// PK column names must be unique.
assertThat(
- service.createTable(
+ manager.createTable(
CreateTableParams.builder()
.schemaName(SCHEMA_NAME)
.tableName(TABLE_NAME)
@@ -1800,7 +1807,7 @@ public class CatalogServiceSelfTest {
// Colocated columns names must be unique.
assertThat(
- service.createTable(
+ manager.createTable(
CreateTableParams.builder()
.schemaName(SCHEMA_NAME)
.tableName(TABLE_NAME)
@@ -1818,7 +1825,7 @@ public class CatalogServiceSelfTest {
// Colocated columns must be valid primary key columns.
assertThat(
- service.createTable(
+ manager.createTable(
CreateTableParams.builder()
.schemaName(SCHEMA_NAME)
.tableName(TABLE_NAME)
@@ -1834,6 +1841,37 @@ public class CatalogServiceSelfTest {
willThrowFast(IgniteInternalException.class, "Colocation
columns must be subset of primary key: outstandingColumns=[val]"));
}
+ @Test
+ void testLatestCatalogVersion() {
+ assertEquals(0, manager.latestCatalogVersion());
+
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertEquals(1, manager.latestCatalogVersion());
+
+ assertThat(manager.createIndex(simpleIndex(INDEX_NAME, TABLE_NAME)),
willBe(nullValue()));
+ assertEquals(2, manager.latestCatalogVersion());
+ }
+
+ @Test
+ void testTables() {
+ assertThat(manager.createTable(simpleTable(TABLE_NAME + 0)),
willBe(nullValue()));
+ assertThat(manager.createTable(simpleTable(TABLE_NAME + 1)),
willBe(nullValue()));
+
+ assertThat(manager.tables(0), empty());
+ assertThat(manager.tables(1), hasItems(table(1, TABLE_NAME + 0)));
+ assertThat(manager.tables(2), hasItems(table(2, TABLE_NAME + 0),
table(2, TABLE_NAME + 1)));
+ }
+
+ @Test
+ void testIndexes() {
+ assertThat(manager.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.createIndex(simpleIndex(INDEX_NAME, TABLE_NAME)),
willBe(nullValue()));
+
+ assertThat(manager.indexes(0), empty());
+ assertThat(manager.indexes(1), hasItems(index(1,
createPkIndexName(TABLE_NAME))));
+ assertThat(manager.indexes(2), hasItems(index(2,
createPkIndexName(TABLE_NAME)), index(2, INDEX_NAME)));
+ }
+
private CompletableFuture<Void> changeColumn(
String tab,
String col,
@@ -1866,7 +1904,7 @@ public class CatalogServiceSelfTest {
}
}
- return service.alterColumn(builder.build());
+ return manager.alterColumn(builder.build());
}
private static CreateTableParams simpleTable(String name) {
@@ -1923,4 +1961,12 @@ public class CatalogServiceSelfTest {
private static String createPkIndexName(String tableName) {
return tableName + "_PK";
}
+
+ private @Nullable CatalogTableDescriptor table(int catalogVersion, String
tableName) {
+ return manager.schema(catalogVersion).table(tableName);
+ }
+
+ private @Nullable CatalogIndexDescriptor index(int catalogVersion, String
indexName) {
+ return manager.schema(catalogVersion).index(indexName);
+ }
}
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
index 7fbfb55391..275bae9979 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
@@ -23,18 +23,21 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
+import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
import org.apache.ignite.lang.IgniteInternalException;
@@ -47,6 +50,7 @@ import org.junit.jupiter.params.provider.ValueSource;
/** Tests to verify {@link UpdateLogImpl}. */
@SuppressWarnings("ConstantConditions")
class UpdateLogImplTest {
+ private KeyValueStorage keyValueStorage;
private MetaStorageManager metastore;
@@ -56,7 +60,9 @@ class UpdateLogImplTest {
void setUp() {
vault = new VaultManager(new InMemoryVaultService());
- metastore = StandaloneMetaStorageManager.create(vault, new
SimpleInMemoryKeyValueStorage("test"));
+ keyValueStorage = new SimpleInMemoryKeyValueStorage("test");
+
+ metastore = StandaloneMetaStorageManager.create(vault,
keyValueStorage);
vault.start();
metastore.start();
@@ -64,61 +70,69 @@ class UpdateLogImplTest {
@AfterEach
public void tearDown() throws Exception {
- metastore.stop();
- vault.stop();
+ IgniteUtils.closeAll(
+ metastore == null ? null : metastore::stop,
+ vault == null ? null : vault::stop
+ );
}
@Test
- public void logReplayedOnStart() throws Exception {
- // first, let's append a few entries to the log
- UpdateLogImpl updateLog = createUpdateLogImpl();
+ void logReplayedOnStart() throws Exception {
+ // First, let's append a few entries to the update log.
+ UpdateLogImpl updateLogImpl = createAndStartUpdateLogImpl((update, ts,
causalityToken) -> {/* no-op */});
- long revisionBefore = metastore.appliedRevision();
+ assertThat(metastore.deployWatches(), willCompleteSuccessfully());
- updateLog.registerUpdateHandler((update, ts, causalityToken) -> {/*
no-op */});
- updateLog.start();
+ List<VersionedUpdate> expectedUpdates =
List.of(singleEntryUpdateOfVersion(1), singleEntryUpdateOfVersion(2));
- assertThat("Watches were not deployed", metastore.deployWatches(),
willCompleteSuccessfully());
+ appendUpdates(updateLogImpl, expectedUpdates);
- List<VersionedUpdate> expectedVersions = List.of(
- new VersionedUpdate(1, 1L, List.of(new
TestUpdateEntry("foo"))),
- new VersionedUpdate(2, 2L, List.of(new TestUpdateEntry("bar")))
- );
+ // Let's restart the log and metastore with recovery.
+ updateLogImpl.stop();
- for (VersionedUpdate update : expectedVersions) {
- assertThat(updateLog.append(update), willBe(true));
- }
+ restartMetastore();
- // and wait till metastore apply necessary revision
- assertTrue(
- waitForCondition(
- () -> metastore.appliedRevision() -
expectedVersions.size() == revisionBefore,
- TimeUnit.SECONDS.toMillis(5)
- )
- );
+ var actualUpdates = new ArrayList<VersionedUpdate>();
+
+ createAndStartUpdateLogImpl((update, ts, causalityToken) ->
actualUpdates.add(update));
+
+ // Let's check that we have recovered to the latest version.
+ assertThat(actualUpdates, equalTo(expectedUpdates));
+ }
- updateLog.stop();
+ private UpdateLogImpl createUpdateLogImpl() {
+ return new UpdateLogImpl(metastore);
+ }
- // now let's create new component over a stuffed vault/metastore
- // and check if log is replayed on start
- updateLog = createUpdateLogImpl();
+ private UpdateLogImpl createAndStartUpdateLogImpl(OnUpdateHandler
onUpdateHandler) {
+ UpdateLogImpl updateLogImpl = createUpdateLogImpl();
- List<VersionedUpdate> actualVersions = new ArrayList<>();
- List<Long> actualCausalityTokens = new ArrayList<>();
+ updateLogImpl.registerUpdateHandler(onUpdateHandler);
+ updateLogImpl.start();
- updateLog.registerUpdateHandler((update, ts, causalityToken) -> {
- actualVersions.add(update);
- actualCausalityTokens.add(causalityToken);
- });
+ return updateLogImpl;
+ }
- updateLog.start();
+ private void appendUpdates(UpdateLogImpl updateLogImpl,
Collection<VersionedUpdate> updates) throws Exception {
+ long revisionBeforeAppend = metastore.appliedRevision();
+
+ updates.forEach(update -> assertThat(updateLogImpl.append(update),
willBe(true)));
- assertEquals(expectedVersions, actualVersions);
- assertEquals(List.of(revisionBefore + 1, revisionBefore + 2),
actualCausalityTokens);
+ assertTrue(waitForCondition(
+ () -> metastore.appliedRevision() - updates.size() ==
revisionBeforeAppend,
+ TimeUnit.SECONDS.toMillis(1))
+ );
}
- private UpdateLogImpl createUpdateLogImpl() {
- return new UpdateLogImpl(metastore);
+ private void restartMetastore() throws Exception {
+ long recoverRevision = metastore.appliedRevision();
+
+ metastore.stop();
+
+ metastore = StandaloneMetaStorageManager.create(vault,
keyValueStorage);
+ metastore.start();
+
+ assertThat(metastore.recoveryFinishedFuture(),
willBe(recoverRevision));
}
@Test
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 77b022c0d4..1b5979fa23 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -71,7 +71,7 @@ import
org.apache.ignite.client.handler.configuration.ClientConnectorConfigurati
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.catalog.CatalogManager;
-import org.apache.ignite.internal.catalog.CatalogServiceImpl;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.ClockWaiter;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
@@ -785,7 +785,7 @@ public class ItRebalanceDistributedTest {
clockWaiter = new ClockWaiter("test", hybridClock);
- catalogManager = new CatalogServiceImpl(
+ catalogManager = new CatalogManagerImpl(
new UpdateLogImpl(metaStorageManager),
clockWaiter
);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 7a9b323950..2f542b5a61 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -58,7 +58,7 @@ import org.apache.ignite.InitParameters;
import org.apache.ignite.internal.BaseIgniteRestartTest;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.baseline.BaselineManager;
-import org.apache.ignite.internal.catalog.CatalogServiceImpl;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.ClockWaiter;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -351,7 +351,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
var clockWaiter = new ClockWaiter("test", hybridClock);
- var catalogManager = new CatalogServiceImpl(
+ var catalogManager = new CatalogManagerImpl(
new UpdateLogImpl(metaStorageMgr),
clockWaiter
);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index d89c111b1a..4ba7dfbcb5 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -41,7 +41,7 @@ import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.configuration.ConfigurationModule;
import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.catalog.CatalogManager;
-import org.apache.ignite.internal.catalog.CatalogServiceImpl;
+import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.ClockWaiter;
import
org.apache.ignite.internal.catalog.configuration.SchemaSynchronizationConfiguration;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
@@ -514,7 +514,7 @@ public class IgniteImpl implements Ignite {
SchemaSynchronizationConfiguration.KEY
);
- catalogManager = new CatalogServiceImpl(
+ catalogManager = new CatalogManagerImpl(
new UpdateLogImpl(metaStorageMgr),
clockWaiter,
() -> schemaSyncConfig.delayDuration().value()