This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-19942 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 331c9b8b22ae8a7d4d141e773c0954d68199fd7b Author: amashenkov <[email protected]> AuthorDate: Tue Jul 4 23:29:33 2023 +0300 Mirror DistributionZone changes from Config to Catalog. --- .../internal/catalog/CatalogManagerImpl.java | 6 +- .../catalog/descriptors/CatalogZoneDescriptor.java | 3 + modules/distribution-zones/build.gradle | 2 + .../distributionzones/DistributionZoneManager.java | 74 ++++++++++++++++++++-- .../BaseDistributionZoneManagerTest.java | 1 + ...ibutionZoneManagerConfigurationChangesTest.java | 1 + .../DistributionZoneManagerTest.java | 1 + .../DistributionZoneMockTest.java | 2 + .../DistributionZonesTestUtil.java | 34 ++++++++++ .../storage/ItRebalanceDistributedTest.java | 1 + ...niteDistributionZoneManagerNodeRestartTest.java | 2 + .../runner/app/ItIgniteNodeRestartTest.java | 17 ++--- .../org/apache/ignite/internal/app/IgniteImpl.java | 21 +++--- .../engine/exec/ddl/DdlCommandHandlerWrapper.java | 34 ---------- .../DdlCommandHandlerExceptionHandlingTest.java | 28 ++++---- 15 files changed, 152 insertions(+), 75 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java index a6b29af3cb..02e9ca4209 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java @@ -150,18 +150,18 @@ public class CatalogManagerImpl extends Producer<CatalogEvent, CatalogEventParam @Override public void start() { - int objectIdGen = 0; + int objectIdGen = 1; // TODO: IGNITE-19082 Move default schema objects initialization to cluster init procedure. CatalogSchemaDescriptor schemaPublic = new CatalogSchemaDescriptor( - objectIdGen++, + 0, DEFAULT_SCHEMA_NAME, new CatalogTableDescriptor[0], new CatalogIndexDescriptor[0] ); CatalogZoneDescriptor defaultZone = new CatalogZoneDescriptor( - objectIdGen++, + 0, DEFAULT_ZONE_NAME, 25, 1, diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptor.java index fd636e2b68..01fd779cb0 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptor.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptor.java @@ -76,6 +76,9 @@ public class CatalogZoneDescriptor extends CatalogObjectDescriptor { this.dataNodesAutoAdjustScaleUp = dataNodesAutoAdjustScaleUp; this.dataNodesAutoAdjustScaleDown = dataNodesAutoAdjustScaleDown; this.filter = filter; + + // TODO: IGNITE-19719 Fix it + this.dataStorage = new CatalogDataStorageDescriptor("aipersist", "default"); } /** diff --git a/modules/distribution-zones/build.gradle b/modules/distribution-zones/build.gradle index 54c78c763f..ec90a2420d 100644 --- a/modules/distribution-zones/build.gradle +++ b/modules/distribution-zones/build.gradle @@ -26,6 +26,7 @@ dependencies { annotationProcessor project(":ignite-configuration-annotation-processor") annotationProcessor libs.auto.service + implementation project(':ignite-catalog') implementation project(':ignite-core') implementation project(':ignite-configuration-api') implementation project(':ignite-api') @@ -66,6 +67,7 @@ dependencies { testFixturesImplementation libs.mockito.core testFixturesImplementation libs.mockito.junit testFixturesImplementation libs.hamcrest.core + testFixturesImplementation project(':ignite-catalog') testFixturesImplementation project(':ignite-raft-api') testFixturesImplementation project(':ignite-metastorage') testFixturesImplementation project(':ignite-schema') diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java index d61f044560..97863daf36 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.distributionzones; import static java.util.Collections.emptySet; +import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.concurrent.CompletableFuture.supplyAsync; @@ -92,6 +93,12 @@ import org.apache.ignite.configuration.notifications.ConfigurationListener; import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener; import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent; import org.apache.ignite.configuration.validation.ConfigurationValidationException; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.commands.AlterZoneParams; +import org.apache.ignite.internal.catalog.commands.CreateZoneParams; +import org.apache.ignite.internal.catalog.commands.DropZoneParams; +import org.apache.ignite.internal.catalog.commands.RenameZoneParams; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; @@ -185,6 +192,9 @@ public class DistributionZoneManager implements IgniteComponent { /** Tables configuration. */ private final TablesConfiguration tablesConfiguration; + /** Catalog service. */ + private final CatalogManager catalogManager; + /** Meta Storage manager. */ private final MetaStorageManager metaStorageManager; @@ -256,6 +266,7 @@ public class DistributionZoneManager implements IgniteComponent { * * @param zonesConfiguration Distribution zones configuration. * @param tablesConfiguration Tables configuration. + * @param catalogManager Catalog manager. * @param metaStorageManager Meta Storage manager. * @param logicalTopologyService Logical topology service. * @param vaultMgr Vault manager. @@ -264,6 +275,7 @@ public class DistributionZoneManager implements IgniteComponent { public DistributionZoneManager( DistributionZonesConfiguration zonesConfiguration, TablesConfiguration tablesConfiguration, + CatalogManager catalogManager, MetaStorageManager metaStorageManager, LogicalTopologyService logicalTopologyService, VaultManager vaultMgr, @@ -274,6 +286,7 @@ public class DistributionZoneManager implements IgniteComponent { this.metaStorageManager = metaStorageManager; this.logicalTopologyService = logicalTopologyService; this.vaultMgr = vaultMgr; + this.catalogManager = catalogManager; this.topologyWatchListener = createMetastorageTopologyListener(); @@ -386,9 +399,37 @@ public class DistributionZoneManager implements IgniteComponent { } try { - CompletableFuture<Integer> fut = new CompletableFuture<>(); + return catalogManager.createDistributionZone(CreateZoneParams.builder() + .zoneName(distributionZoneCfg.name()) + .partitions(distributionZoneCfg.partitions()) + .filter(distributionZoneCfg.filter()) + .replicas(distributionZoneCfg.replicas()) + .dataNodesAutoAdjust(distributionZoneCfg.dataNodesAutoAdjust()) + .dataNodesAutoAdjustScaleUp(distributionZoneCfg.dataNodesAutoAdjustScaleUp()) + .dataNodesAutoAdjustScaleDown(distributionZoneCfg.dataNodesAutoAdjustScaleDown()) + .build()) + .thenApply(ignore -> catalogManager.zone(distributionZoneCfg.name(), Long.MAX_VALUE)) + .thenCompose(zoneDescriptor -> createZone(zoneDescriptor.id(), distributionZoneCfg)) + .whenComplete((id, ex) -> { + if (ex != null) { + LOG.warn("Failed to create zone.", ex); + } + }); + } finally { + busyLock.leaveBusy(); + } + } - int[] zoneIdContainer = new int[1]; + private CompletableFuture<Integer> createZone( + int intZoneId, + DistributionZoneConfigurationParameters distributionZoneCfg + ) { + if (!busyLock.enterBusy()) { + return failedFuture(new NodeStoppingException()); + } + + try { + CompletableFuture<Integer> fut = new CompletableFuture<>(); zonesConfiguration.change(zonesChange -> zonesChange.changeDistributionZones(zonesListChange -> { try { @@ -427,11 +468,9 @@ public class DistributionZoneManager implements IgniteComponent { zoneChange.changeDataNodesAutoAdjustScaleDown(distributionZoneCfg.dataNodesAutoAdjustScaleDown()); } - int intZoneId = zonesChange.globalIdCounter() + 1; zonesChange.changeGlobalIdCounter(intZoneId); zoneChange.changeZoneId(intZoneId); - zoneIdContainer[0] = intZoneId; }); } catch (ConfigurationNodeAlreadyExistException e) { throw new DistributionZoneAlreadyExistsException(distributionZoneCfg.name(), e); @@ -445,7 +484,7 @@ public class DistributionZoneManager implements IgniteComponent { ConfigurationValidationException.class) ); } else { - fut.complete(zoneIdContainer[0]); + fut.complete(intZoneId); } }); @@ -496,6 +535,23 @@ public class DistributionZoneManager implements IgniteComponent { try { CompletableFuture<Void> fut = new CompletableFuture<>(); + CompletableFuture<Void> catalogFut = catalogManager.alterDistributionZone(AlterZoneParams.builder() + .zoneName(name) + .partitions(distributionZoneCfg.partitions()) + .replicas(distributionZoneCfg.replicas()) + .filter(distributionZoneCfg.filter()) + .dataNodesAutoAdjust(distributionZoneCfg.dataNodesAutoAdjust()) + .dataNodesAutoAdjustScaleUp(distributionZoneCfg.dataNodesAutoAdjustScaleUp()) + .dataNodesAutoAdjustScaleDown(distributionZoneCfg.dataNodesAutoAdjustScaleDown()) + .build()); + + if (!name.equals(distributionZoneCfg.name())) { + catalogFut = catalogFut.thenCompose(ignore -> catalogManager.renameDistributionZone(RenameZoneParams.builder() + .zoneName(name) + .newZoneName(distributionZoneCfg.name()) + .build())); + } + CompletableFuture<Void> change; if (DEFAULT_ZONE_NAME.equals(name)) { @@ -524,7 +580,7 @@ public class DistributionZoneManager implements IgniteComponent { })); } - change.whenComplete((res, e) -> { + allOf(catalogFut, change).whenComplete((res, e) -> { if (e != null) { fut.completeExceptionally( unwrapDistributionZoneException( @@ -603,7 +659,11 @@ public class DistributionZoneManager implements IgniteComponent { } }); - return fut; + CompletableFuture<Void> dropZoneFut = catalogManager.dropDistributionZone(DropZoneParams.builder() + .zoneName(name) + .build()); + + return allOf(dropZoneFut, fut); } finally { busyLock.leaveBusy(); } diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java index 9c69f09930..a9177185e0 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/BaseDistributionZoneManagerTest.java @@ -126,6 +126,7 @@ public class BaseDistributionZoneManagerTest extends BaseIgniteAbstractTest { distributionZoneManager = new DistributionZoneManager( zonesConfiguration, tablesConfiguration, + DistributionZonesTestUtil.mockCatalog(), metaStorageManager, new LogicalTopologyServiceImpl(topology, cmgManager), vaultMgr, diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java index 713ee78a4e..8f58ea7b78 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java @@ -146,6 +146,7 @@ public class DistributionZoneManagerConfigurationChangesTest extends IgniteAbstr distributionZoneManager = new DistributionZoneManager( zonesConfiguration, tablesConfiguration, + DistributionZonesTestUtil.mockCatalog(), metaStorageManager, logicalTopologyService, vaultMgr, diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java index 5bd1324a97..8c1bb0fc2a 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerTest.java @@ -99,6 +99,7 @@ class DistributionZoneManagerTest extends IgniteAbstractTest { distributionZoneManager = new DistributionZoneManager( zonesConfiguration, tablesConfiguration, + DistributionZonesTestUtil.mockCatalog(), null, null, null, diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneMockTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneMockTest.java index 75db4d543f..17ede4c011 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneMockTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneMockTest.java @@ -21,6 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.Mockito.mock; import java.util.concurrent.TimeUnit; +import org.apache.ignite.internal.catalog.CatalogManager; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; @@ -44,6 +45,7 @@ public class DistributionZoneMockTest { DistributionZoneManager zoneMgr = new DistributionZoneManager( zonesConfiguration, mock(TablesConfiguration.class), + mock(CatalogManager.class), mock(MetaStorageManager.class), mock(LogicalTopologyService.class), mock(VaultManager.class), diff --git a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java index 1cdabe81d1..729f15d143 100644 --- a/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java +++ b/modules/distribution-zones/src/testFixtures/java/org/apache/ignite/internal/distributionzones/DistributionZonesTestUtil.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.distributionzones; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.DEFAULT_FILTER; import static org.apache.ignite.internal.distributionzones.DistributionZoneManager.IMMEDIATE_TIMER_VALUE; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.updateLogicalTopologyAndVersion; @@ -41,6 +42,11 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.Map; import java.util.Objects; @@ -48,9 +54,12 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters.Builder; import org.apache.ignite.internal.metastorage.MetaStorageManager; @@ -486,4 +495,29 @@ public class DistributionZonesTestUtil { assertThat(dataNodes, is(expectedValueNames)); } } + + /** + * Creates CatalogService mock. + */ + public static CatalogManager mockCatalog() { + AtomicInteger idGen = new AtomicInteger(0); + + CatalogManager catalogManager = mock(CatalogManager.class); + when(catalogManager.createDistributionZone(any())).thenReturn(completedFuture(null)); + when(catalogManager.alterDistributionZone(any())).thenReturn(completedFuture(null)); + when(catalogManager.renameDistributionZone(any())).thenReturn(completedFuture(null)); + when(catalogManager.dropDistributionZone(any())).thenReturn(completedFuture(null)); + when(catalogManager.zone(anyString(), anyLong())).thenAnswer(invocation -> zoneDescriptorMock(idGen)); + + return catalogManager; + } + + private static CatalogZoneDescriptor zoneDescriptorMock(AtomicInteger idGen) { + int zoneId = idGen.incrementAndGet(); + + CatalogZoneDescriptor descriptor = mock(CatalogZoneDescriptor.class); + when(descriptor.id()).thenReturn(zoneId); + + return descriptor; + } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java index b2e4c8ad38..a57cbb8250 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java @@ -797,6 +797,7 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { distributionZoneManager = new DistributionZoneManager( zonesCfg, tablesCfg, + catalogManager, metaStorageManager, logicalTopologyService, vaultManager, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java index daff409fc2..3b2174a6e0 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/distribution/zones/ItIgniteDistributionZoneManagerNodeRestartTest.java @@ -67,6 +67,7 @@ import org.apache.ignite.internal.configuration.validation.ConfigurationValidato import org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters; import org.apache.ignite.internal.distributionzones.DistributionZoneManager; import org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState; +import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil; import org.apache.ignite.internal.distributionzones.NodeWithAttributes; import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration; import org.apache.ignite.internal.manager.IgniteComponent; @@ -204,6 +205,7 @@ public class ItIgniteDistributionZoneManagerNodeRestartTest extends BaseIgniteRe DistributionZoneManager distributionZoneManager = new DistributionZoneManager( zonesConfiguration, tablesConfiguration, + DistributionZonesTestUtil.mockCatalog(), metaStorageMgr, logicalTopologyService, vault, diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 7a3c00c0d0..4a562affdb 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -52,13 +52,13 @@ import java.util.function.LongFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import org.apache.calcite.jdbc.CalciteMetaImpl; import org.apache.ignite.Ignite; import org.apache.ignite.IgnitionManager; import org.apache.ignite.InitParameters; import org.apache.ignite.internal.BaseIgniteRestartTest; import org.apache.ignite.internal.app.IgniteImpl; import org.apache.ignite.internal.baseline.BaselineManager; -import org.apache.ignite.internal.catalog.CatalogManagerImpl; import org.apache.ignite.internal.catalog.ClockWaiter; import org.apache.ignite.internal.catalog.storage.UpdateLogImpl; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; @@ -339,24 +339,25 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { GcConfiguration gcConfig = clusterConfigRegistry.getConfiguration(GcConfiguration.KEY); + var clockWaiter = new ClockWaiter("test", hybridClock); + + var catalogManager = new CalciteMetaImpl( + new UpdateLogImpl(metaStorageMgr), + clockWaiter + ); + SchemaManager schemaManager = new SchemaManager(registry, tablesConfig, metaStorageMgr); DistributionZoneManager distributionZoneManager = new DistributionZoneManager( zonesConfig, tablesConfig, + catalogManager, metaStorageMgr, logicalTopologyService, vault, name ); - var clockWaiter = new ClockWaiter("test", hybridClock); - - var catalogManager = new CatalogManagerImpl( - new UpdateLogImpl(metaStorageMgr), - clockWaiter - ); - TableManager tableManager = new TableManager( name, registry, diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index de5e7e783a..94c63b5d6e 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -498,9 +498,20 @@ public class IgniteImpl implements Ignite { schemaManager = new SchemaManager(registry, tablesConfig, metaStorageMgr); + SchemaSynchronizationConfiguration schemaSyncConfig = clusterConfigRegistry.getConfiguration( + SchemaSynchronizationConfiguration.KEY + ); + + catalogManager = new CatalogManagerImpl( + new UpdateLogImpl(metaStorageMgr), + clockWaiter, + () -> schemaSyncConfig.delayDuration().value() + ); + distributionZoneManager = new DistributionZoneManager( zonesConfiguration, tablesConfig, + catalogManager, metaStorageMgr, logicalTopologyService, vaultMgr, @@ -511,16 +522,6 @@ public class IgniteImpl implements Ignite { outgoingSnapshotsManager = new OutgoingSnapshotsManager(clusterSvc.messagingService()); - SchemaSynchronizationConfiguration schemaSyncConfig = clusterConfigRegistry.getConfiguration( - SchemaSynchronizationConfiguration.KEY - ); - - catalogManager = new CatalogManagerImpl( - new UpdateLogImpl(metaStorageMgr), - clockWaiter, - () -> schemaSyncConfig.delayDuration().value() - ); - distributedTblMgr = new TableManager( name, registry, diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java index 912c66fe3b..a7a9b7f78e 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerWrapper.java @@ -28,19 +28,13 @@ import org.apache.ignite.internal.index.IndexManager; import org.apache.ignite.internal.sql.engine.prepare.ddl.AlterColumnCommand; import org.apache.ignite.internal.sql.engine.prepare.ddl.AlterTableAddCommand; import org.apache.ignite.internal.sql.engine.prepare.ddl.AlterTableDropCommand; -import org.apache.ignite.internal.sql.engine.prepare.ddl.AlterZoneRenameCommand; -import org.apache.ignite.internal.sql.engine.prepare.ddl.AlterZoneSetCommand; import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateIndexCommand; import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateTableCommand; -import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateZoneCommand; import org.apache.ignite.internal.sql.engine.prepare.ddl.DdlCommand; import org.apache.ignite.internal.sql.engine.prepare.ddl.DropIndexCommand; import org.apache.ignite.internal.sql.engine.prepare.ddl.DropTableCommand; -import org.apache.ignite.internal.sql.engine.prepare.ddl.DropZoneCommand; import org.apache.ignite.internal.storage.DataStorageManager; import org.apache.ignite.internal.table.distributed.TableManager; -import org.apache.ignite.lang.DistributionZoneAlreadyExistsException; -import org.apache.ignite.lang.DistributionZoneNotFoundException; import org.apache.ignite.lang.IndexAlreadyExistsException; import org.apache.ignite.lang.IndexNotFoundException; import org.apache.ignite.lang.TableAlreadyExistsException; @@ -120,34 +114,6 @@ public class DdlCommandHandlerWrapper extends DdlCommandHandler { .thenCompose(res -> catalogManager.dropIndex(DdlToCatalogCommandConverter.convert((DropIndexCommand) cmd)) .handle(handleModificationResult(((DropIndexCommand) cmd).ifNotExists(), IndexNotFoundException.class)) ); - } else if (cmd instanceof CreateZoneCommand) { - CreateZoneCommand zoneCommand = (CreateZoneCommand) cmd; - - return ddlCommandFuture - .thenCompose(res -> catalogManager.createDistributionZone(DdlToCatalogCommandConverter.convert(zoneCommand)) - .handle(handleModificationResult(zoneCommand.ifNotExists(), DistributionZoneAlreadyExistsException.class)) - ); - } else if (cmd instanceof DropZoneCommand) { - DropZoneCommand zoneCommand = (DropZoneCommand) cmd; - - return ddlCommandFuture - .thenCompose(res -> catalogManager.dropDistributionZone(DdlToCatalogCommandConverter.convert(zoneCommand)) - .handle(handleModificationResult(zoneCommand.ifExists(), DistributionZoneNotFoundException.class)) - ); - } else if (cmd instanceof AlterZoneRenameCommand) { - AlterZoneRenameCommand zoneCommand = (AlterZoneRenameCommand) cmd; - - return ddlCommandFuture - .thenCompose(res -> catalogManager.renameDistributionZone(DdlToCatalogCommandConverter.convert(zoneCommand)) - .handle(handleModificationResult(zoneCommand.ifExists(), DistributionZoneNotFoundException.class)) - ); - } else if (cmd instanceof AlterZoneSetCommand) { - AlterZoneSetCommand zoneCommand = (AlterZoneSetCommand) cmd; - - return ddlCommandFuture - .thenCompose(res -> catalogManager.alterDistributionZone(DdlToCatalogCommandConverter.convert(zoneCommand)) - .handle(handleModificationResult(zoneCommand.ifExists(), DistributionZoneNotFoundException.class)) - ); } return ddlCommandFuture; diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerExceptionHandlingTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerExceptionHandlingTest.java index 0c42f9fe49..b9e810ecd0 100644 --- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerExceptionHandlingTest.java +++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandlerExceptionHandlingTest.java @@ -17,23 +17,26 @@ package org.apache.ignite.internal.sql.engine.exec.ddl; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.configuration.annotation.ConfigurationType.DISTRIBUTED; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.ignite.configuration.NamedConfigurationTree; -import org.apache.ignite.configuration.NamedListView; +import org.apache.ignite.internal.catalog.CatalogManager; +import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor; import org.apache.ignite.internal.configuration.ConfigurationRegistry; import org.apache.ignite.internal.configuration.ConfigurationTreeGenerator; import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage; @@ -42,9 +45,6 @@ import org.apache.ignite.internal.distributionzones.DistributionZoneConfiguratio import org.apache.ignite.internal.distributionzones.DistributionZoneManager; import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration; import org.apache.ignite.internal.index.IndexManager; -import org.apache.ignite.internal.schema.configuration.TableChange; -import org.apache.ignite.internal.schema.configuration.TableConfiguration; -import org.apache.ignite.internal.schema.configuration.TableView; import org.apache.ignite.internal.sql.engine.prepare.ddl.CreateZoneCommand; import org.apache.ignite.internal.sql.engine.prepare.ddl.DropZoneCommand; import org.apache.ignite.internal.storage.DataStorageManager; @@ -106,17 +106,19 @@ public class DdlCommandHandlerExceptionHandlingTest extends IgniteAbstractTest { DistributionZonesConfiguration zonesConfiguration = registry.getConfiguration(DistributionZonesConfiguration.KEY); - NamedConfigurationTree<TableConfiguration, TableView, TableChange> tables = mock(NamedConfigurationTree.class); - - NamedListView<TableView> value = mock(NamedListView.class); - - when(tables.value()).thenReturn(value); - - when(value.namedListKeys()).thenReturn(new ArrayList<>()); + CatalogManager catalogManager = mock(CatalogManager.class); + when(catalogManager.createDistributionZone(any())).thenReturn(completedFuture(null)); + when(catalogManager.alterDistributionZone(any())).thenReturn(completedFuture(null)); + when(catalogManager.renameDistributionZone(any())).thenReturn(completedFuture(null)); + when(catalogManager.dropDistributionZone(any())).thenReturn(completedFuture(null)); + CatalogZoneDescriptor desc = mock(CatalogZoneDescriptor.class); + when(catalogManager.zone(anyString(), anyLong())).thenReturn(desc); + when(desc.id()).thenReturn(42); distributionZoneManager = new DistributionZoneManager( zonesConfiguration, null, + catalogManager, null, null, null,
