This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 66611120ec IGNITE-19081 Introduce Catalog events (#2064)
66611120ec is described below
commit 66611120ecb4d9fb7eb4e0d29f4bedc7ea5ec295
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Wed May 17 11:15:14 2023 +0300
IGNITE-19081 Introduce Catalog events (#2064)
---
.../ignite/internal/catalog/CatalogManager.java | 2 +-
.../ignite/internal/catalog/CatalogService.java | 14 +-
.../internal/catalog/CatalogServiceImpl.java | 82 +++++++++--
.../internal/catalog/events/CatalogEvent.java | 31 +++++
.../catalog/events/CatalogEventParameters.java | 34 +++++
.../catalog/events/CreateTableEventParameters.java | 47 +++++++
.../catalog/events/DropTableEventParameters.java | 43 ++++++
.../internal/catalog/storage/DropTableEntry.java | 49 +++++++
.../internal/catalog/CatalogServiceSelfTest.java | 150 ++++++++++++++++++++-
modules/index/build.gradle | 1 +
.../ignite/internal/index/IndexManagerTest.java | 3 +-
.../storage/ItRebalanceDistributedTest.java | 8 ++
.../runner/app/ItIgniteNodeRestartTest.java | 9 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 6 +-
.../internal/sql/engine/SqlQueryProcessor.java | 12 +-
.../sql/engine/exec/ddl/DdlCommandHandler.java | 2 +-
.../engine/exec/ddl/DdlCommandHandlerWrapper.java | 21 ++-
.../exec/ddl/DdlToCatalogCommandConverter.java | 9 ++
.../sql/engine/exec/MockedStructuresTest.java | 23 ++--
19 files changed, 498 insertions(+), 48 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
index 47f883180f..2724d5d965 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManager.java
@@ -27,7 +27,7 @@ import org.apache.ignite.internal.manager.IgniteComponent;
/**
* The catalog manager provides schema manipulation methods and is responsible
for managing distributed operations.
*/
-public interface CatalogManager extends IgniteComponent {
+public interface CatalogManager extends IgniteComponent, CatalogService {
/**
* Creates new table.
*
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index 507eefde03..70e70aaf38 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -21,6 +21,9 @@ import java.util.Collection;
import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.SchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.manager.EventListener;
/**
* Catalog service provides methods to access schema object's descriptors of
exact version and/or last actual version at given timestamp,
@@ -34,15 +37,6 @@ import
org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
public interface CatalogService {
String PUBLIC = "PUBLIC";
- //TODO: IGNITE-19082 Drop this stuff when all versioned schema stuff will
be moved to Catalog.
- @Deprecated(forRemoval = true)
- String IGNITE_USE_CATALOG_PROPERTY = "IGNITE_USE_CATALOG";
-
- @Deprecated(forRemoval = true)
- static boolean useCatalogService() {
- return Boolean.getBoolean(IGNITE_USE_CATALOG_PROPERTY);
- }
-
TableDescriptor table(String tableName, long timestamp);
TableDescriptor table(int tableId, long timestamp);
@@ -54,4 +48,6 @@ public interface CatalogService {
SchemaDescriptor schema(int version);
SchemaDescriptor activeSchema(long timestamp);
+
+ void listen(CatalogEvent evt, EventListener<CatalogEventParameters>
closure);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index be13cf23f9..4920647d4a 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.catalog;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
@@ -35,26 +37,38 @@ import
org.apache.ignite.internal.catalog.commands.DropTableParams;
import org.apache.ignite.internal.catalog.descriptors.IndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.SchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
+import org.apache.ignite.internal.catalog.storage.DropTableEntry;
import org.apache.ignite.internal.catalog.storage.NewTableEntry;
import org.apache.ignite.internal.catalog.storage.ObjectIdGenUpdateEntry;
import org.apache.ignite.internal.catalog.storage.UpdateEntry;
import org.apache.ignite.internal.catalog.storage.UpdateLog;
import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler;
import org.apache.ignite.internal.catalog.storage.VersionedUpdate;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups.Common;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.TableAlreadyExistsException;
+import org.apache.ignite.lang.TableNotFoundException;
import org.jetbrains.annotations.Nullable;
/**
* Catalog service implementation.
* TODO: IGNITE-19081 Introduce catalog events and make CatalogServiceImpl
extends Producer.
*/
-public class CatalogServiceImpl implements CatalogService, CatalogManager {
+public class CatalogServiceImpl extends Producer<CatalogEvent,
CatalogEventParameters> implements CatalogManager {
private static final int MAX_RETRY_COUNT = 10;
+ /** The logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(CatalogServiceImpl.class);
+
/** Versioned catalog descriptors. */
private final NavigableMap<Integer, Catalog> catalogByVer = new
ConcurrentSkipListMap<>();
@@ -171,7 +185,21 @@ public class CatalogServiceImpl implements CatalogService,
CatalogManager {
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> dropTable(DropTableParams params) {
- return failedFuture(new UnsupportedOperationException("Not implemented
yet."));
+ return saveUpdate(catalog -> {
+ String schemaName =
Objects.requireNonNullElse(params.schemaName(), CatalogService.PUBLIC);
+
+ SchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(schemaName), "No schema found: " +
schemaName);
+
+ TableDescriptor table = schema.table(params.tableName());
+
+ if (table == null) {
+ throw new TableNotFoundException(schemaName,
params.tableName());
+ }
+
+ return List.of(
+ new DropTableEntry(table.id())
+ );
+ });
}
/** {@inheritDoc} */
@@ -229,36 +257,66 @@ public class CatalogServiceImpl implements
CatalogService, CatalogManager {
class OnUpdateHandlerImpl implements OnUpdateHandler {
@Override
public void handle(VersionedUpdate update) {
- Catalog catalog = catalogByVer.get(update.version() - 1);
+ int version = update.version();
+ Catalog catalog = catalogByVer.get(version - 1);
assert catalog != null;
+ List<CompletableFuture<?>> eventFutures = new
ArrayList<>(update.entries().size());
+
for (UpdateEntry entry : update.entries()) {
String schemaName = CatalogService.PUBLIC;
SchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(schemaName), "No schema found: " +
schemaName);
if (entry instanceof NewTableEntry) {
catalog = new Catalog(
- update.version(),
+ version,
System.currentTimeMillis(),
catalog.objectIdGenState(),
new SchemaDescriptor(
schema.id(),
schema.name(),
- update.version(),
+ version,
ArrayUtils.concat(schema.tables(),
((NewTableEntry) entry).descriptor()),
schema.indexes()
)
);
+
+ eventFutures.add(fireEvent(
+ CatalogEvent.TABLE_CREATE,
+ new CreateTableEventParameters(version,
((NewTableEntry) entry).descriptor())
+ ));
+
+ } else if (entry instanceof DropTableEntry) {
+ int tableId = ((DropTableEntry) entry).tableId();
+
+ catalog = new Catalog(
+ version,
+ System.currentTimeMillis(),
+ catalog.objectIdGenState(),
+ new SchemaDescriptor(
+ schema.id(),
+ schema.name(),
+ version,
+ Arrays.stream(schema.tables()).filter(t ->
t.id() != tableId).toArray(TableDescriptor[]::new),
+ schema.indexes()
+ )
+ );
+
+ eventFutures.add(fireEvent(
+ CatalogEvent.TABLE_DROP,
+ new DropTableEventParameters(version, tableId)
+ ));
+
} else if (entry instanceof ObjectIdGenUpdateEntry) {
catalog = new Catalog(
- update.version(),
+ version,
System.currentTimeMillis(),
catalog.objectIdGenState() +
((ObjectIdGenUpdateEntry) entry).delta(),
new SchemaDescriptor(
schema.id(),
schema.name(),
- update.version(),
+ version,
schema.tables(),
schema.indexes()
)
@@ -270,7 +328,15 @@ public class CatalogServiceImpl implements CatalogService,
CatalogManager {
registerCatalog(catalog);
- versionTracker.update(catalog.version(), null);
+
CompletableFuture.allOf(eventFutures.toArray(CompletableFuture[]::new))
+ .thenRun(() -> versionTracker.update(version, null))
+ .whenComplete((ignore, err) -> {
+ if (err != null) {
+ LOG.warn("Failed to apply catalog update.", err);
+ } else {
+ versionTracker.update(version, null);
+ }
+ });
}
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
new file mode 100644
index 0000000000..d88347aeca
--- /dev/null
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.events;
+
+import org.apache.ignite.internal.manager.Event;
+
+/**
+ * Catalog management events.
+ */
+public enum CatalogEvent implements Event {
+ /** This event is fired, when a table was created in Catalog. */
+ TABLE_CREATE,
+
+ /** This event is fired, when a table was dropped in Catalog. */
+ TABLE_DROP
+}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
new file mode 100644
index 0000000000..11c03c885e
--- /dev/null
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.events;
+
+import org.apache.ignite.internal.manager.EventParameters;
+
+/**
+ * Base class for Catalog event parameters.
+ */
+public abstract class CatalogEventParameters extends EventParameters {
+ /**
+ * Constructor.
+ *
+ * @param causalityToken Causality token.
+ */
+ public CatalogEventParameters(long causalityToken) {
+ super(causalityToken);
+ }
+}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
new file mode 100644
index 0000000000..345956f672
--- /dev/null
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.events;
+
+import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
+
+/**
+ * Create table event parameters contains a table descriptor for newly created
table.
+ */
+public class CreateTableEventParameters extends CatalogEventParameters {
+
+ private final TableDescriptor tableDescriptor;
+
+ /**
+ * Constructor.
+ *
+ * @param causalityToken Causality token.
+ * @param tableDescriptor Newly created table descriptor.
+ */
+ public CreateTableEventParameters(long causalityToken, TableDescriptor
tableDescriptor) {
+ super(causalityToken);
+
+ this.tableDescriptor = tableDescriptor;
+ }
+
+ /**
+ * Gets table descriptor for newly created table.
+ */
+ public TableDescriptor tableDescriptor() {
+ return tableDescriptor;
+ }
+}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
new file mode 100644
index 0000000000..71c2e126fe
--- /dev/null
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.events;
+
+/**
+ * Drop table event parameters contains an id of dropped table.
+ */
+public class DropTableEventParameters extends CatalogEventParameters {
+
+ private final int tableId;
+
+ /**
+ * Constructor.
+ *
+ * @param causalityToken Causality token.
+ * @param tableId An id of dropped table.
+ */
+ public DropTableEventParameters(long causalityToken, int tableId) {
+ super(causalityToken);
+
+ this.tableId = tableId;
+ }
+
+ /** Returns an id of dropped table. */
+ public int tableId() {
+ return tableId;
+ }
+}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
new file mode 100644
index 0000000000..ee53b390d2
--- /dev/null
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog.storage;
+
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Describes deletion of a table.
+ */
+public class DropTableEntry implements UpdateEntry {
+ private static final long serialVersionUID = 7727583734058987315L;
+
+ private final int tableId;
+
+ /**
+ * Constructs the object.
+ *
+ * @param tableId An id of a table to drop.
+ */
+ public DropTableEntry(int tableId) {
+ this.tableId = tableId;
+ }
+
+ /** Returns an id of a table to drop. */
+ public int tableId() {
+ return tableId;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+}
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
index b532dc5e7a..003ab864f1 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
@@ -24,6 +24,7 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -31,6 +32,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import java.util.List;
@@ -38,13 +40,19 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.commands.CreateTableParams;
import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.commands.DropTableParams;
import org.apache.ignite.internal.catalog.descriptors.SchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.TableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
import org.apache.ignite.internal.catalog.storage.ObjectIdGenUpdateEntry;
import org.apache.ignite.internal.catalog.storage.UpdateLog;
import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.catalog.storage.VersionedUpdate;
+import org.apache.ignite.internal.manager.EventListener;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
@@ -53,11 +61,13 @@ import
org.apache.ignite.internal.vault.inmemory.InMemoryVaultService;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.lang.TableAlreadyExistsException;
+import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.sql.ColumnType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
/**
@@ -65,6 +75,7 @@ import org.mockito.Mockito;
*/
public class CatalogServiceSelfTest {
private static final String TABLE_NAME = "myTable";
+ private static final String TABLE_NAME_2 = "myTable2";
private MetaStorageManager metastore;
@@ -110,6 +121,7 @@ public class CatalogServiceSelfTest {
SchemaDescriptor schema = service.schema(0);
assertEquals(CatalogService.PUBLIC, schema.name());
+ assertEquals(0, schema.id());
assertEquals(0, schema.version());
assertEquals(0, schema.tables().length);
assertEquals(0, schema.indexes().length);
@@ -141,6 +153,7 @@ public class CatalogServiceSelfTest {
assertNotNull(schema);
assertEquals(0, schema.id());
assertEquals(CatalogService.PUBLIC, schema.name());
+ assertEquals(0, schema.version());
assertSame(schema, service.activeSchema(0L));
assertSame(schema, service.activeSchema(123L));
@@ -154,6 +167,7 @@ public class CatalogServiceSelfTest {
assertNotNull(schema);
assertEquals(0, schema.id());
assertEquals(CatalogService.PUBLIC, schema.name());
+ assertEquals(1, schema.version());
assertSame(schema, service.activeSchema(System.currentTimeMillis()));
assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME,
System.currentTimeMillis()));
@@ -166,12 +180,34 @@ public class CatalogServiceSelfTest {
assertEquals(TABLE_NAME, table.name());
assertEquals(0L, table.engineId());
assertEquals(0L, table.zoneId());
+
+ // Validate another table creation.
+ fut = service.createTable(simpleTable(TABLE_NAME_2));
+
+ assertThat(fut, willBe((Object) null));
+
+ // Validate actual catalog has both tables.
+ schema = service.schema(2);
+
+ assertNotNull(schema);
+ assertEquals(0, schema.id());
+ assertEquals(CatalogService.PUBLIC, schema.name());
+ assertEquals(2, schema.version());
+ assertSame(schema, service.activeSchema(System.currentTimeMillis()));
+
+ assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME,
System.currentTimeMillis()));
+ assertSame(schema.table(TABLE_NAME), service.table(1,
System.currentTimeMillis()));
+
+ assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2,
System.currentTimeMillis()));
+ assertSame(schema.table(TABLE_NAME_2), service.table(2,
System.currentTimeMillis()));
+
+ assertNotSame(schema.table(TABLE_NAME), schema.table(TABLE_NAME_2));
}
@Test
public void testCreateTableIfExistsFlag() {
CreateTableParams params = CreateTableParams.builder()
- .tableName("table1")
+ .tableName(TABLE_NAME)
.columns(List.of(
new ColumnParams("key", ColumnType.INT32,
DefaultValue.constant(null), false),
new ColumnParams("val", ColumnType.INT32,
DefaultValue.constant(null), false)
@@ -185,7 +221,7 @@ public class CatalogServiceSelfTest {
CompletableFuture<?> fut = service.createTable(
CreateTableParams.builder()
- .tableName("table1")
+ .tableName(TABLE_NAME)
.columns(List.of(
new ColumnParams("key", ColumnType.INT32,
DefaultValue.constant(null), false),
new ColumnParams("val", ColumnType.INT32,
DefaultValue.constant(null), false)
@@ -197,6 +233,78 @@ public class CatalogServiceSelfTest {
assertThat(fut, willThrowFast(TableAlreadyExistsException.class));
}
+ @Test
+ public void testDropTable() {
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
+ assertThat(service.createTable(simpleTable(TABLE_NAME_2)),
willBe((Object) null));
+
+ long beforeDropTimestamp = System.currentTimeMillis();
+
+ DropTableParams dropTableParams =
DropTableParams.builder().schemaName("PUBLIC").tableName(TABLE_NAME).build();
+
+ assertThat(service.dropTable(dropTableParams), willBe((Object) null));
+
+ // Validate catalog version from the past.
+ SchemaDescriptor schema = service.schema(2);
+
+ assertNotNull(schema);
+ assertEquals(0, schema.id());
+ assertEquals(CatalogService.PUBLIC, schema.name());
+ assertEquals(2, schema.version());
+ assertSame(schema, service.activeSchema(beforeDropTimestamp));
+
+ assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME,
beforeDropTimestamp));
+ assertSame(schema.table(TABLE_NAME), service.table(1,
beforeDropTimestamp));
+
+ assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2,
beforeDropTimestamp));
+ assertSame(schema.table(TABLE_NAME_2), service.table(2,
beforeDropTimestamp));
+
+ // Validate actual catalog
+ schema = service.schema(3);
+
+ assertNotNull(schema);
+ assertEquals(0, schema.id());
+ assertEquals(CatalogService.PUBLIC, schema.name());
+ assertEquals(3, schema.version());
+ assertSame(schema, service.activeSchema(System.currentTimeMillis()));
+
+ assertNull(schema.table(TABLE_NAME));
+ assertNull(service.table(TABLE_NAME, System.currentTimeMillis()));
+ assertNull(service.table(1, System.currentTimeMillis()));
+
+ assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2,
System.currentTimeMillis()));
+ assertSame(schema.table(TABLE_NAME_2), service.table(2,
System.currentTimeMillis()));
+ }
+
+ @Test
+ public void testDropTableIfExistsFlag() {
+ CreateTableParams createTableParams = CreateTableParams.builder()
+ .tableName(TABLE_NAME)
+ .columns(List.of(
+ new ColumnParams("key", ColumnType.INT32,
DefaultValue.constant(null), false),
+ new ColumnParams("val", ColumnType.INT32,
DefaultValue.constant(null), false)
+ ))
+ .primaryKeyColumns(List.of("key"))
+ .build();
+
+ assertThat(service.createTable(createTableParams), willBe((Object)
null));
+
+ DropTableParams params = DropTableParams.builder()
+ .tableName(TABLE_NAME)
+ .ifTableExists(true)
+ .build();
+
+ assertThat(service.dropTable(params), willBe((Object) null));
+ assertThat(service.dropTable(params),
willThrowFast(TableNotFoundException.class));
+
+ params = DropTableParams.builder()
+ .tableName(TABLE_NAME)
+ .ifTableExists(false)
+ .build();
+
+ assertThat(service.dropTable(params),
willThrowFast(TableNotFoundException.class));
+ }
+
@Test
public void operationWillBeRetriedFiniteAmountOfTimes() {
UpdateLog updateLogMock = Mockito.mock(UpdateLog.class);
@@ -247,6 +355,44 @@ public class CatalogServiceSelfTest {
verify(updateLogMock).stop();
}
+ @Test
+ public void testCreateTableEvents() {
+ CreateTableParams params = CreateTableParams.builder()
+ .schemaName("PUBLIC")
+ .tableName(TABLE_NAME)
+ .ifTableExists(true)
+ .zone("ZONE")
+ .columns(List.of(
+ new ColumnParams("key1", ColumnType.INT32,
DefaultValue.constant(null), false),
+ new ColumnParams("key2", ColumnType.INT32,
DefaultValue.constant(null), false),
+ new ColumnParams("val", ColumnType.INT32,
DefaultValue.constant(null), true)
+ ))
+ .primaryKeyColumns(List.of("key1", "key2"))
+ .colocationColumns(List.of("key2"))
+ .build();
+
+ EventListener<CatalogEventParameters> eventListener =
Mockito.mock(EventListener.class);
+ when(eventListener.notify(any(),
any())).thenReturn(completedFuture(false));
+
+ service.listen(CatalogEvent.TABLE_CREATE, eventListener);
+ service.listen(CatalogEvent.TABLE_DROP, eventListener);
+
+ CompletableFuture<Void> fut = service.createTable(params);
+
+ assertThat(fut, willBe((Object) null));
+
+ verify(eventListener).notify(any(CreateTableEventParameters.class),
ArgumentMatchers.isNull());
+
+ DropTableParams dropTableparams =
DropTableParams.builder().tableName(TABLE_NAME).build();
+
+ fut = service.dropTable(dropTableparams);
+
+ assertThat(fut, willBe((Object) null));
+
+ verify(eventListener).notify(any(DropTableEventParameters.class),
ArgumentMatchers.isNull());
+ verifyNoMoreInteractions(eventListener);
+ }
+
private static CreateTableParams simpleTable(String name) {
return CreateTableParams.builder()
.schemaName("PUBLIC")
diff --git a/modules/index/build.gradle b/modules/index/build.gradle
index 33c33510ba..cb1ae8e24f 100644
--- a/modules/index/build.gradle
+++ b/modules/index/build.gradle
@@ -21,6 +21,7 @@ apply from: "$rootDir/buildscripts/java-junit5.gradle"
dependencies {
implementation project(':ignite-api')
+ implementation project(':ignite-catalog')
implementation project(':ignite-core')
implementation project(':ignite-configuration-api')
implementation project(':ignite-schema')
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index 800c2c92eb..33d576539b 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -93,7 +93,8 @@ public class IndexManagerTest {
when(schManager.schemaRegistry(anyLong(),
any())).thenReturn(completedFuture(null));
- indexManager = new IndexManager("test", tablesConfig, schManager,
tableManagerMock, mock(ClusterService.class));
+ indexManager = new IndexManager("test", tablesConfig, schManager,
tableManagerMock,
+ mock(ClusterService.class));
indexManager.start();
assertThat(
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
index 8924fede26..32ec34b698 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java
@@ -68,6 +68,9 @@ import java.util.stream.IntStream;
import
org.apache.ignite.client.handler.configuration.ClientConnectorConfiguration;
import org.apache.ignite.internal.affinity.Assignment;
import org.apache.ignite.internal.baseline.BaselineManager;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogServiceImpl;
+import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.DistributedConfigurationUpdater;
import
org.apache.ignite.internal.cluster.management.configuration.ClusterManagementConfiguration;
@@ -584,6 +587,8 @@ public class ItRebalanceDistributedTest {
private final SchemaManager schemaManager;
+ private final CatalogManager catalogManager;
+
private final DistributedConfigurationUpdater
distributedConfigurationUpdater;
private List<IgniteComponent> nodeComponents;
@@ -732,6 +737,8 @@ public class ItRebalanceDistributedTest {
metaStorageManager,
clusterService);
+ catalogManager = new CatalogServiceImpl(new
UpdateLogImpl(metaStorageManager, vaultManager));
+
schemaManager = new SchemaManager(registry, tablesCfg,
metaStorageManager);
TopologyAwareRaftGroupServiceFactory
topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory(
@@ -821,6 +828,7 @@ public class ItRebalanceDistributedTest {
cmgManager,
metaStorageManager,
clusterCfgMgr,
+ catalogManager,
distributionZoneManager,
replicaManager,
txManager,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index f88b8a397f..4a35db44ed 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -59,7 +59,6 @@ import org.apache.ignite.InitParameters;
import org.apache.ignite.configuration.ConfigurationModule;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.baseline.BaselineManager;
-import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogServiceImpl;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
@@ -383,6 +382,8 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
new RaftGroupEventsClientListener()
);
+ var catalogManager = new CatalogServiceImpl(new
UpdateLogImpl(metaStorageMgr, vault));
+
TableManager tableManager = new TableManager(
name,
registry,
@@ -411,10 +412,6 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
var indexManager = new IndexManager(name, tablesConfiguration,
schemaManager, tableManager, clusterSvc);
- CatalogManager catalogManager = new CatalogServiceImpl(
- new UpdateLogImpl(metaStorageMgr, vault)
- );
-
SqlQueryProcessor qryEngine = new SqlQueryProcessor(
registry,
clusterSvc,
@@ -455,11 +452,11 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
distributedConfigurationUpdater,
clusterCfgMgr,
dataStorageManager,
+ catalogManager,
schemaManager,
distributionZoneManager,
tableManager,
indexManager,
- catalogManager,
qryEngine
);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index b3b5f7a690..3948610af4 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -481,6 +481,8 @@ public class IgniteImpl implements Ignite {
outgoingSnapshotsManager = new
OutgoingSnapshotsManager(clusterSvc.messagingService());
+ catalogManager = new CatalogServiceImpl(new
UpdateLogImpl(metaStorageMgr, vaultMgr));
+
distributedTblMgr = new TableManager(
name,
registry,
@@ -509,10 +511,6 @@ public class IgniteImpl implements Ignite {
indexManager = new IndexManager(name, tablesConfiguration,
schemaManager, distributedTblMgr, clusterSvc);
- catalogManager = new CatalogServiceImpl(
- new UpdateLogImpl(metaStorageMgr, vaultMgr)
- );
-
qryEngine = new SqlQueryProcessor(
registry,
clusterSvc,
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 67eb3cbf5d..a02617c5c0 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -46,7 +46,6 @@ import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.catalog.CatalogManager;
-import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.index.IndexManager;
@@ -67,7 +66,6 @@ import
org.apache.ignite.internal.sql.engine.exec.MailboxRegistryImpl;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutor;
import org.apache.ignite.internal.sql.engine.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.sql.engine.exec.QueryValidationException;
-import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandlerWrapper;
import org.apache.ignite.internal.sql.engine.message.MessageServiceImpl;
import org.apache.ignite.internal.sql.engine.prepare.PrepareService;
@@ -249,9 +247,13 @@ public class SqlQueryProcessor implements QueryProcessor {
this.prepareSvc = prepareSvc;
- var ddlCommandHandler = CatalogService.useCatalogService()
- ? new DdlCommandHandlerWrapper(distributionZoneManager,
tableManager, indexManager, dataStorageManager, catalogManager)
- : new DdlCommandHandler(distributionZoneManager, tableManager,
indexManager, dataStorageManager);
+ var ddlCommandHandler = new DdlCommandHandlerWrapper(
+ distributionZoneManager,
+ tableManager,
+ indexManager,
+ dataStorageManager,
+ catalogManager
+ );
var executionSrvc = registerService(ExecutionServiceImpl.create(
clusterSrvc.topologyService(),
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index 221695ff12..6b0a1c27f2 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -297,7 +297,7 @@ public class DdlCommandHandler {
.handle(handleModificationResult(cmd.ifTableExists(),
TableNotFoundException.class));
}
- private static BiFunction<Object, Throwable, Boolean>
handleModificationResult(boolean ignoreExpectedError, Class<?> expErrCls) {
+ protected static BiFunction<Object, Throwable, Boolean>
handleModificationResult(boolean ignoreExpectedError, Class<?> expErrCls) {
return (val, err) -> {
if (err == null) {
return val instanceof Boolean ? (Boolean) val : Boolean.TRUE;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java
index 5ca2457242..21f39ea169 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java
@@ -24,11 +24,15 @@ import
org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.index.IndexManager;
import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateTableCommand;
import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlCommand;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.DropTableCommand;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.lang.TableAlreadyExistsException;
+import org.apache.ignite.lang.TableNotFoundException;
/**
* Wrapper for DDL command handler passes DDL commands to CatalogService.
+ * TODO: IGNITE-19082 Drop this wrapper when all the versioned schema stuff
will be moved from Configuration to Catalog.
*/
public class DdlCommandHandlerWrapper extends DdlCommandHandler {
@@ -52,11 +56,22 @@ public class DdlCommandHandlerWrapper extends
DdlCommandHandler {
/** Handles ddl commands. */
@Override
public CompletableFuture<Boolean> handle(DdlCommand cmd) {
+ // Handle command in usual way.
+ CompletableFuture<Boolean> ddlCommandFuture = super.handle(cmd);
+
+ // Pass supported commands to the Catalog.
if (cmd instanceof CreateTableCommand) {
- return
catalogManager.createTable(DdlToCatalogCommandConverter.convert((CreateTableCommand)
cmd))
- .thenCompose(res -> super.handle(cmd));
+ return ddlCommandFuture
+ .thenCompose(res ->
catalogManager.createTable(DdlToCatalogCommandConverter.convert((CreateTableCommand)
cmd))
+
.handle(handleModificationResult(((CreateTableCommand) cmd).ifTableExists(),
TableAlreadyExistsException.class))
+ );
+ } else if (cmd instanceof DropTableCommand) {
+ return ddlCommandFuture
+ .thenCompose(res ->
catalogManager.dropTable(DdlToCatalogCommandConverter.convert((DropTableCommand)
cmd))
+
.handle(handleModificationResult(((DropTableCommand) cmd).ifTableExists(),
TableNotFoundException.class))
+ );
}
- return super.handle(cmd);
+ return ddlCommandFuture;
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlToCatalogCommandConverter.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlToCatalogCommandConverter.java
index 65a19e040e..e0e9f4f520 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlToCatalogCommandConverter.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlToCatalogCommandConverter.java
@@ -22,9 +22,11 @@ import java.util.stream.Collectors;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.commands.CreateTableParams;
import org.apache.ignite.internal.catalog.commands.DefaultValue;
+import org.apache.ignite.internal.catalog.commands.DropTableParams;
import org.apache.ignite.internal.sql.engine.prepare.ddl.ColumnDefinition;
import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateTableCommand;
import
org.apache.ignite.internal.sql.engine.prepare.ddl.DefaultValueDefinition;
+import org.apache.ignite.internal.sql.engine.prepare.ddl.DropTableCommand;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
/**
@@ -47,6 +49,13 @@ class DdlToCatalogCommandConverter {
.build();
}
+ static DropTableParams convert(DropTableCommand cmd) {
+ return DropTableParams.builder()
+ .schemaName(cmd.schemaName())
+ .tableName(cmd.tableName())
+ .build();
+ }
+
private static ColumnParams convert(ColumnDefinition def) {
return new ColumnParams(def.name(), TypeUtils.columnType(def.type()),
convert(def.defaultValueDefinition()), def.nullable());
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 9939084c42..69335f7410 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -160,7 +160,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
HybridClock clock;
@Mock
- CatalogManager catalogManager;
+ private VaultManager vaultManager;
/**
* Revision listener holder. It uses for the test configurations:
@@ -204,9 +204,11 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
private DistributionZoneManager distributionZoneManager;
- DataStorageManager dataStorageManager;
+ private DataStorageManager dataStorageManager;
- SchemaManager schemaManager;
+ private SchemaManager schemaManager;
+
+ private CatalogManager catalogManager;
/** Returns current method name. */
private static String getCurrentMethodName() {
@@ -237,6 +239,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
/** Inner initialisation. */
@BeforeEach
void before() throws Exception {
+ mockVault();
mockMetastore();
revisionUpdater = (Function<Long, CompletableFuture<?>> function) -> {
@@ -267,6 +270,10 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
schemaManager.start();
+ catalogManager = mock(CatalogManager.class);
+
when(catalogManager.createTable(any())).thenReturn(completedFuture(null));
+
when(catalogManager.dropTable(any())).thenReturn(completedFuture(null));
+
cmgMgr = mock(ClusterManagementGroupManager.class);
when(cmgMgr.logicalTopology()).thenReturn(completedFuture(logicalTopologySnapshot));
@@ -317,6 +324,11 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
.get(1, TimeUnit.SECONDS);
}
+ private void mockVault() {
+
when(vaultManager.get(any(ByteArray.class))).thenReturn(completedFuture(null));
+ when(vaultManager.put(any(ByteArray.class),
any(byte[].class))).thenReturn(completedFuture(null));
+ }
+
/** Dummy metastore activity mock. */
private void mockMetastore() {
when(msm.prefix(any())).thenReturn(subscriber -> {
@@ -587,11 +599,6 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
}
private TableManager createTableManager() {
- VaultManager vaultManager = mock(VaultManager.class);
-
-
when(vaultManager.get(any(ByteArray.class))).thenReturn(completedFuture(null));
- when(vaultManager.put(any(ByteArray.class),
any(byte[].class))).thenReturn(completedFuture(null));
-
TableManager tableManager = new TableManager(
"",
revisionUpdater,