This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-19080 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit a746b771aa5d07d7fb75132cc812a659bf2312b4 Author: amashenkov <[email protected]> AuthorDate: Wed Mar 22 12:44:00 2023 +0300 WIP. --- modules/catalog/build.gradle | 1 + .../ignite/internal/catalog/CatalogService.java | 1 - .../internal/catalog/CatalogServiceImpl.java | 158 +++++++++++++++------ .../internal/catalog/commands/CatalogUtils.java | 4 + .../catalog/descriptors/TableDescriptor.java | 15 +- .../internal/catalog/CatalogServiceSelfTest.java | 50 +++++-- 6 files changed, 165 insertions(+), 64 deletions(-) diff --git a/modules/catalog/build.gradle b/modules/catalog/build.gradle index 7d9f945e71..01fa3474ce 100644 --- a/modules/catalog/build.gradle +++ b/modules/catalog/build.gradle @@ -29,6 +29,7 @@ dependencies { implementation libs.jetbrains.annotations testImplementation(testFixtures(project(':ignite-core'))) + testImplementation(testFixtures(project(':ignite-metastorage'))) testImplementation libs.mockito.junit testImplementation libs.mockito.core testImplementation libs.hamcrest.core diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java index 507eefde03..9a7ae7c1dd 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java @@ -32,7 +32,6 @@ import org.apache.ignite.internal.catalog.descriptors.TableDescriptor; * <p>TBD: events */ public interface CatalogService { - String PUBLIC = "PUBLIC"; //TODO: IGNITE-19082 Drop this stuff when all versioned schema stuff will be moved to Catalog. @Deprecated(forRemoval = true) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java index 2d9ce8c768..6599fb60a7 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java @@ -21,12 +21,15 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import java.util.Collection; -import java.util.Map.Entry; +import java.util.Map; import java.util.NavigableMap; -import java.util.Objects; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; import org.apache.ignite.internal.catalog.commands.AlterTableAddColumnParams; import org.apache.ignite.internal.catalog.commands.AlterTableDropColumnParams; import org.apache.ignite.internal.catalog.commands.CatalogUtils; @@ -38,10 +41,16 @@ import org.apache.ignite.internal.catalog.descriptors.SchemaDescriptor; import org.apache.ignite.internal.catalog.descriptors.TableDescriptor; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.internal.metastorage.Entry; +import org.apache.ignite.internal.metastorage.EntryEvent; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.WatchEvent; import org.apache.ignite.internal.metastorage.WatchListener; +import org.apache.ignite.internal.metastorage.dsl.Conditions; +import org.apache.ignite.internal.metastorage.dsl.Operations; +import org.apache.ignite.internal.metastorage.dsl.Statements; import org.apache.ignite.internal.util.ArrayUtils; +import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.lang.ByteArray; import org.apache.ignite.lang.TableAlreadyExistsException; import org.jetbrains.annotations.Nullable; @@ -51,10 +60,9 @@ import org.jetbrains.annotations.Nullable; * TODO: IGNITE-19081 Introduce catalog events and make CatalogServiceImpl extends Producer. */ public class CatalogServiceImpl implements CatalogService, CatalogManager { - private static final AtomicInteger TABLE_ID_GEN = new AtomicInteger(); - /** The logger. */ private static final IgniteLogger LOG = Loggers.forClass(CatalogServiceImpl.class); + public static final String CATALOG_VER_PREFIX = "catalog.ver."; /** Versioned catalog descriptors. */ private final NavigableMap<Integer, CatalogDescriptor> catalogByVer = new ConcurrentSkipListMap<>(); @@ -66,6 +74,10 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager { private final WatchListener catalogVersionsListener; + private final ExecutorService executorService = ForkJoinPool.commonPool(); + + private final ConcurrentMap<Integer, CompletableFuture<Boolean>> futMap = new ConcurrentHashMap<>(); + /** * Constructor. */ @@ -78,7 +90,7 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager { @Override public void start() { if (CatalogService.useCatalogService()) { - metaStorageMgr.registerPrefixWatch(ByteArray.fromString("catalog-"), catalogVersionsListener); + metaStorageMgr.registerPrefixWatch(ByteArray.fromString(CATALOG_VER_PREFIX), catalogVersionsListener); } //TODO: IGNITE-19080 restore state. @@ -95,7 +107,7 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager { /** {@inheritDoc} */ @Override public TableDescriptor table(String tableName, long timestamp) { - return catalogAt(timestamp).schema(CatalogService.PUBLIC).table(tableName); + return catalogAt(timestamp).schema(CatalogUtils.DEFAULT_SCHEMA).table(tableName); } /** {@inheritDoc} */ @@ -125,13 +137,13 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager { return null; } - return catalog.schema(CatalogService.PUBLIC); + return catalog.schema(CatalogUtils.DEFAULT_SCHEMA); } /** {@inheritDoc} */ @Override public @Nullable SchemaDescriptor activeSchema(long timestamp) { - return catalogAt(timestamp).schema(CatalogService.PUBLIC); + return catalogAt(timestamp).schema(CatalogUtils.DEFAULT_SCHEMA); } private CatalogDescriptor catalog(int version) { @@ -139,7 +151,7 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager { } private CatalogDescriptor catalogAt(long timestamp) { - Entry<Long, CatalogDescriptor> entry = catalogByTs.floorEntry(timestamp); + Map.Entry<Long, CatalogDescriptor> entry = catalogByTs.floorEntry(timestamp); if (entry == null) { throw new IllegalStateException("No valid schema found for given timestamp: " + timestamp); @@ -151,7 +163,7 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager { /** * MetaStorage event listener for catalog metadata updates. */ - private static class CatalogEventListener implements WatchListener { + private class CatalogEventListener implements WatchListener { /** {@inheritDoc} */ @Override public String id() { @@ -161,6 +173,44 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager { /** {@inheritDoc} */ @Override public CompletableFuture<Void> onUpdate(WatchEvent event) { + assert event.single(); + + EntryEvent entryEvent = event.entryEvent(); + + if (entryEvent.newEntry() != null) { + Object obj = ByteUtils.fromBytes(entryEvent.newEntry().value()); + + assert obj instanceof TableDescriptor; + + TableDescriptor tableDesc = (TableDescriptor) obj; + + // TODO: Add catalog version to event. + int ver = catalogByVer.lastKey() + 1; + + CatalogDescriptor catalog = catalogByVer.get(ver - 1); + + SchemaDescriptor schema = catalog.schema(tableDesc.schemaName()); + + CatalogDescriptor newCatalog = new CatalogDescriptor( + ver, + System.currentTimeMillis(), + new SchemaDescriptor( + schema.id(), + schema.name(), + ver, + ArrayUtils.concat(schema.tables(), tableDesc), + schema.indexes() + ) + ); + + registerCatalog(newCatalog); + + CompletableFuture<Boolean> rmv = futMap.remove(ver); + if (rmv != null) { + rmv.complete(true); + } + } + return completedFuture(null); } @@ -173,53 +223,69 @@ public class CatalogServiceImpl implements CatalogService, CatalogManager { /** {@inheritDoc} */ @Override - public CompletableFuture<?> createTable(CreateTableParams params) { - // Creates TableDescriptor and saves it to MetaStorage. + public CompletableFuture<Boolean> createTable(CreateTableParams params) { + // Creates diff from params, then save to metastorage. // Atomically: - // int id = metaStorage.get("lastId") + 1; - // TableDescriptor table = new TableDescriptor(tableId, params) + // int newVer = metaStorage.get("lastVer") + 1; // - // Catalog newCatalog = catalogByVer.get(id -1).copy() - // newCatalog.setId(id).addTable(table); + // validate(params); + // Object diff = createDiff(catalog, params); // - // metaStorage.put("catalog-"+id, newCatalog); - // metaStorage.put("lastId", id); + // metaStorage.put("catalog.ver." + newVer, diff); + // metaStorage.put("lastVer", newVer); - // Dummy implementation. - synchronized (this) { - CatalogDescriptor catalog = catalogByVer.lastEntry().getValue(); + ByteArray LAST_VER_KEY = ByteArray.fromString("catalog.lastVer"); + ByteArray TABLE_ID_KEY = ByteArray.fromString("catalog.tableId"); - //TODO: IGNITE-19081 Add validation. - String schemaName = Objects.requireNonNullElse(params.schemaName(), CatalogService.PUBLIC); + return metaStorageMgr.getAll(Set.of(LAST_VER_KEY, TABLE_ID_KEY)) + .thenCompose(entries -> { + Entry lastVerEntry = entries.get(LAST_VER_KEY); + Entry tableIdEntry = entries.get(TABLE_ID_KEY); - SchemaDescriptor schema = Objects.requireNonNull(catalog.schema(schemaName), "No schema found: " + schemaName); + int lastVer = lastVerEntry.empty() ? 0 : ByteUtils.bytesToInt(lastVerEntry.value()); + int tableId = tableIdEntry.empty() ? 0 : ByteUtils.bytesToInt(tableIdEntry.value()); - if (schema.table(params.tableName()) != null) { - return params.ifTableExists() - ? completedFuture(false) - : failedFuture(new TableAlreadyExistsException(schemaName, params.tableName())); - } + int newVer = lastVer + 1; + int newTableId = tableId + 1; - int newVersion = catalogByVer.lastKey() + 1; + CatalogDescriptor catalog = catalogByVer.get(lastVer); - TableDescriptor table = CatalogUtils.fromParams(TABLE_ID_GEN.incrementAndGet(), params); + assert catalog.table(newTableId) == null; - CatalogDescriptor newCatalog = new CatalogDescriptor( - newVersion, - System.currentTimeMillis(), - new SchemaDescriptor( - schema.id(), - schemaName, - newVersion, - ArrayUtils.concat(schema.tables(), table), - schema.indexes() - ) - ); + TableDescriptor tableDesc = CatalogUtils.fromParams(newTableId, params); - registerCatalog(newCatalog); - } + // params.validate(catalog); ??? + // validate(catalog, table); + + SchemaDescriptor schema = catalog.schema(tableDesc.schemaName()); + + if (schema.table(tableDesc.name()) != null) { + return params.ifTableExists() + ? completedFuture(false) + : failedFuture(new TableAlreadyExistsException(tableDesc.schemaName(), tableDesc.name())); + } + + CompletableFuture<Boolean> opFut = new CompletableFuture<>(); + + if (futMap.putIfAbsent(newVer, opFut) != null) { + return completedFuture(null).thenComposeAsync(ignore -> createTable(params), executorService); + } - return completedFuture(true); + return metaStorageMgr.invoke( + Statements.iif( + Conditions.value(LAST_VER_KEY).eq(lastVerEntry.value()), + Operations.ops( + Operations.put(LAST_VER_KEY, ByteUtils.intToBytes(newVer)), + Operations.put(TABLE_ID_KEY, ByteUtils.intToBytes(newTableId)), + Operations.put(ByteArray.fromString(CATALOG_VER_PREFIX + newVer), ByteUtils.toBytes(tableDesc)) + ).yield(true), + Operations.ops().yield(false) + ) + ).thenComposeAsync( + res -> res.getAsBoolean() ? opFut.thenApply(ignore -> true) : createTable(params), + executorService + ); + }); } /** {@inheritDoc} */ diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java index 1ead944291..eff8c1e1cc 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CatalogUtils.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.catalog.commands; +import java.util.Objects; import java.util.stream.Collectors; import org.apache.ignite.internal.catalog.descriptors.TableColumnDescriptor; import org.apache.ignite.internal.catalog.descriptors.TableDescriptor; @@ -25,6 +26,8 @@ import org.apache.ignite.internal.catalog.descriptors.TableDescriptor; * Catalog utils. */ public class CatalogUtils { + public static final String DEFAULT_SCHEMA = "PUBLIC"; + /** * Converts CreateTable command params to descriptor. * @@ -34,6 +37,7 @@ public class CatalogUtils { */ public static TableDescriptor fromParams(int id, CreateTableParams params) { return new TableDescriptor(id, + Objects.requireNonNullElse(params.schemaName(), DEFAULT_SCHEMA), params.tableName(), params.columns().stream().map(CatalogUtils::fromParams).collect(Collectors.toList()), params.primaryKeyColumns(), diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java index a9bfd56168..719cf6a1a0 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/TableDescriptor.java @@ -38,7 +38,7 @@ public class TableDescriptor extends ObjectDescriptor { private final int zoneId = 0; private final int engineId = 0; - + private final String schemaName; private final TableColumnDescriptor[] columns; private final String[] primaryKeyColumns; private final String[] colocationColumns; @@ -50,20 +50,23 @@ public class TableDescriptor extends ObjectDescriptor { * Constructor. * * @param id Table id. - * @param name Table name. + * @param schemaName Schema name. + * @param tableName Table name. * @param columns Table column descriptors. * @param pkCols Primary key column names. * @param colocationCols Colocation column names. */ public TableDescriptor( int id, - String name, + String schemaName, + String tableName, List<TableColumnDescriptor> columns, List<String> pkCols, @Nullable List<String> colocationCols ) { - super(id, Type.TABLE, name); + super(id, Type.TABLE, tableName); + this.schemaName = Objects.requireNonNull(schemaName); this.columns = Objects.requireNonNull(columns, "No columns defined.").toArray(TableColumnDescriptor[]::new); primaryKeyColumns = Objects.requireNonNull(pkCols, "No primary key columns.").toArray(String[]::new); colocationColumns = colocationCols == null ? primaryKeyColumns : colocationCols.toArray(String[]::new); @@ -80,6 +83,10 @@ public class TableDescriptor extends ObjectDescriptor { assert primaryKeyColumns == colocationColumns || Set.of(primaryKeyColumns).containsAll(List.of(colocationColumns)); } + public String schemaName(){ + return schemaName; + } + public int zoneId() { return zoneId; } diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java index 8953e5737c..2c0495e0e3 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java @@ -23,33 +23,63 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import org.apache.ignite.internal.catalog.commands.CatalogUtils; import org.apache.ignite.internal.catalog.commands.ColumnParams; import org.apache.ignite.internal.catalog.commands.CreateTableParams; import org.apache.ignite.internal.catalog.commands.DefaultValue; import org.apache.ignite.internal.catalog.descriptors.SchemaDescriptor; import org.apache.ignite.internal.catalog.descriptors.TableDescriptor; import org.apache.ignite.internal.metastorage.MetaStorageManager; +import org.apache.ignite.internal.metastorage.impl.StandaloneMetastorageManager; +import org.apache.ignite.internal.testframework.SystemPropertiesExtension; +import org.apache.ignite.internal.testframework.WithSystemProperty; import org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher; +import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.lang.TableAlreadyExistsException; import org.apache.ignite.sql.ColumnType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; +import org.junit.jupiter.api.extension.ExtendWith; /** * Catalog service self test. */ +@ExtendWith(SystemPropertiesExtension.class) +@WithSystemProperty(key = CatalogService.IGNITE_USE_CATALOG_PROPERTY, value = "true") public class CatalogServiceSelfTest { private static final String TABLE_NAME = "myTable"; + private MetaStorageManager metaStorageManager; + private CatalogServiceImpl service; - @Test - public void testEmptyCatalog() { - CatalogServiceImpl service = new CatalogServiceImpl(Mockito.mock(MetaStorageManager.class)); + @BeforeEach + public void initCatalogService() { + metaStorageManager = StandaloneMetastorageManager.create(); + service = new CatalogServiceImpl(metaStorageManager); + + metaStorageManager.start(); service.start(); + try { + metaStorageManager.deployWatches(); + } catch (NodeStoppingException e) { + fail(e); + } + } + + @AfterEach + public void cleanupResources() throws Exception { + service.stop(); + metaStorageManager.stop(); + } + + @Test + public void testEmptyCatalog() { assertNotNull(service.activeSchema(System.currentTimeMillis())); assertNotNull(service.schema(0)); @@ -60,7 +90,7 @@ public class CatalogServiceSelfTest { assertNull(service.index(0, System.currentTimeMillis())); SchemaDescriptor schema = service.schema(0); - assertEquals(CatalogService.PUBLIC, schema.name()); + assertEquals(CatalogUtils.DEFAULT_SCHEMA, schema.name()); assertEquals(0, schema.version()); assertEquals(0, schema.tables().length); @@ -69,9 +99,6 @@ public class CatalogServiceSelfTest { @Test public void testCreateTable() { - CatalogServiceImpl service = new CatalogServiceImpl(Mockito.mock(MetaStorageManager.class)); - service.start(); - CreateTableParams params = CreateTableParams.builder() .schemaName("PUBLIC") .tableName(TABLE_NAME) @@ -99,7 +126,7 @@ public class CatalogServiceSelfTest { assertNotNull(schema); assertEquals(0, schema.id()); - assertEquals(CatalogService.PUBLIC, schema.name()); + assertEquals(CatalogUtils.DEFAULT_SCHEMA, schema.name()); assertSame(schema, service.activeSchema(0L)); assertSame(schema, service.activeSchema(123L)); @@ -112,7 +139,7 @@ public class CatalogServiceSelfTest { assertNotNull(schema); assertEquals(0, schema.id()); - assertEquals(CatalogService.PUBLIC, schema.name()); + assertEquals(CatalogUtils.DEFAULT_SCHEMA, schema.name()); assertSame(schema, service.activeSchema(System.currentTimeMillis())); assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, System.currentTimeMillis())); @@ -129,9 +156,6 @@ public class CatalogServiceSelfTest { @Test public void testCreateTableIfExistsFlag() { - CatalogServiceImpl service = new CatalogServiceImpl(Mockito.mock(MetaStorageManager.class)); - service.start(); - CreateTableParams params = CreateTableParams.builder() .tableName("table1") .columns(List.of(
