This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-19642 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 224716ea5491b9b88d91800917ea3367b656e640 Author: amashenkov <[email protected]> AuthorDate: Tue Jun 6 17:43:08 2023 +0300 Make Catalog use HybridClock instead of system time. --- .../internal/catalog/CatalogServiceImpl.java | 30 ++++-- .../internal/catalog/CatalogServiceSelfTest.java | 115 +++++++++++---------- .../storage/ItRebalanceDistributedTest.java | 2 +- .../runner/app/ItIgniteNodeRestartTest.java | 2 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 2 +- 5 files changed, 84 insertions(+), 67 deletions(-) diff --git a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java index 22d5cbe311..b120079277 100644 --- a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java +++ b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java @@ -65,6 +65,7 @@ import org.apache.ignite.internal.catalog.storage.UpdateEntry; import org.apache.ignite.internal.catalog.storage.UpdateLog; import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler; import org.apache.ignite.internal.catalog.storage.VersionedUpdate; +import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.manager.Producer; @@ -90,6 +91,9 @@ import org.jetbrains.annotations.Nullable; public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParameters> implements CatalogManager { private static final int MAX_RETRY_COUNT = 10; + /** Safe time to wait before new Catalog version activation. */ + private static final int DELAY_DURATION = 0; + /** The logger. */ private static final IgniteLogger LOG = Loggers.forClass(CatalogServiceImpl.class); @@ -103,11 +107,14 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam private final PendingComparableValuesTracker<Integer, Void> versionTracker = new PendingComparableValuesTracker<>(0); + private final HybridClock clock; + /** * Constructor. */ - public CatalogServiceImpl(UpdateLog updateLog) { + public CatalogServiceImpl(UpdateLog updateLog, HybridClock clock) { this.updateLog = updateLog; + this.clock = clock; } /** {@inheritDoc} */ @@ -519,7 +526,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam if (entry instanceof NewTableEntry) { catalog = new Catalog( version, - System.currentTimeMillis(), + activationTimestamp(), catalog.objectIdGenState(), new SchemaDescriptor( schema.id(), @@ -540,7 +547,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam catalog = new Catalog( version, - System.currentTimeMillis(), + activationTimestamp(), catalog.objectIdGenState(), new SchemaDescriptor( schema.id(), @@ -561,7 +568,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam catalog = new Catalog( version, - System.currentTimeMillis(), + activationTimestamp(), catalog.objectIdGenState(), new SchemaDescriptor( schema.id(), @@ -592,7 +599,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam catalog = new Catalog( version, - System.currentTimeMillis(), + activationTimestamp(), catalog.objectIdGenState(), new SchemaDescriptor( schema.id(), @@ -621,7 +628,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam } else if (entry instanceof NewIndexEntry) { catalog = new Catalog( version, - System.currentTimeMillis(), + activationTimestamp(), catalog.objectIdGenState(), new SchemaDescriptor( schema.id(), @@ -641,7 +648,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam catalog = new Catalog( version, - System.currentTimeMillis(), + activationTimestamp(), catalog.objectIdGenState(), new SchemaDescriptor( schema.id(), @@ -659,7 +666,7 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam } else if (entry instanceof ObjectIdGenUpdateEntry) { catalog = new Catalog( version, - System.currentTimeMillis(), + activationTimestamp(), catalog.objectIdGenState() + ((ObjectIdGenUpdateEntry) entry).delta(), new SchemaDescriptor( schema.id(), @@ -688,6 +695,13 @@ public class CatalogServiceImpl extends Producer<CatalogEvent, CatalogEventParam } } + /** + * Returns catalog activation timestamp. + */ + protected long activationTimestamp() { + return clock.now().addPhysicalTime(DELAY_DURATION).longValue(); + } + @FunctionalInterface interface UpdateProducer { List<UpdateEntry> get(Catalog catalog); diff --git a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java index 5ff386c26e..aa34babad1 100644 --- a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java +++ b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java @@ -69,6 +69,8 @@ import org.apache.ignite.internal.catalog.storage.UpdateLog; import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler; import org.apache.ignite.internal.catalog.storage.UpdateLogImpl; import org.apache.ignite.internal.catalog.storage.VersionedUpdate; +import org.apache.ignite.internal.hlc.HybridClock; +import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.manager.EventListener; import org.apache.ignite.internal.metastorage.MetaStorageManager; import org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager; @@ -110,6 +112,8 @@ public class CatalogServiceSelfTest { private CatalogServiceImpl service; + private HybridClock clock; + @BeforeEach void setUp() throws NodeStoppingException { vault = new VaultManager(new InMemoryVaultService()); @@ -118,7 +122,14 @@ public class CatalogServiceSelfTest { vault, new SimpleInMemoryKeyValueStorage("test") ); - service = new CatalogServiceImpl(new UpdateLogImpl(metastore, vault)); + clock = new HybridClockImpl(); + service = new CatalogServiceImpl(new UpdateLogImpl(metastore, vault), clock) { + // Immediate activation. + @Override + protected long activationTimestamp() { + return clock.nowLong(); + } + }; vault.start(); metastore.start(); @@ -136,14 +147,14 @@ public class CatalogServiceSelfTest { @Test public void testEmptyCatalog() { - assertNotNull(service.activeSchema(System.currentTimeMillis())); + assertNotNull(service.activeSchema(clock.nowLong())); assertNotNull(service.schema(0)); assertNull(service.schema(1)); assertThrows(IllegalStateException.class, () -> service.activeSchema(-1L)); - assertNull(service.table(0, System.currentTimeMillis())); - assertNull(service.index(0, System.currentTimeMillis())); + assertNull(service.table(0, clock.nowLong())); + assertNull(service.index(0, clock.nowLong())); SchemaDescriptor schema = service.schema(0); assertEquals(SCHEMA_NAME, schema.name()); @@ -194,10 +205,10 @@ public class CatalogServiceSelfTest { assertEquals(0, schema.id()); assertEquals(SCHEMA_NAME, schema.name()); assertEquals(1, schema.version()); - assertSame(schema, service.activeSchema(System.currentTimeMillis())); + assertSame(schema, service.activeSchema(clock.nowLong())); - assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, System.currentTimeMillis())); - assertSame(schema.table(TABLE_NAME), service.table(1, System.currentTimeMillis())); + assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, clock.nowLong())); + assertSame(schema.table(TABLE_NAME), service.table(1, clock.nowLong())); // Validate newly created table TableDescriptor table = schema.table(TABLE_NAME); @@ -217,13 +228,13 @@ public class CatalogServiceSelfTest { assertEquals(0, schema.id()); assertEquals(SCHEMA_NAME, schema.name()); assertEquals(2, schema.version()); - assertSame(schema, service.activeSchema(System.currentTimeMillis())); + assertSame(schema, service.activeSchema(clock.nowLong())); - assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, System.currentTimeMillis())); - assertSame(schema.table(TABLE_NAME), service.table(1, System.currentTimeMillis())); + assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, clock.nowLong())); + assertSame(schema.table(TABLE_NAME), service.table(1, clock.nowLong())); - assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2, System.currentTimeMillis())); - assertSame(schema.table(TABLE_NAME_2), service.table(2, System.currentTimeMillis())); + assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2, clock.nowLong())); + assertSame(schema.table(TABLE_NAME_2), service.table(2, clock.nowLong())); assertNotSame(schema.table(TABLE_NAME), schema.table(TABLE_NAME_2)); @@ -231,17 +242,15 @@ public class CatalogServiceSelfTest { assertThat(service.createTable(simpleTable(TABLE_NAME_2)), willThrowFast(TableAlreadyExistsException.class)); // Validate schema wasn't changed. - assertSame(schema, service.activeSchema(System.currentTimeMillis())); + assertSame(schema, service.activeSchema(clock.nowLong())); } @Test - public void testDropTable() throws InterruptedException { + public void testDropTable() { assertThat(service.createTable(simpleTable(TABLE_NAME)), willBe((Object) null)); assertThat(service.createTable(simpleTable(TABLE_NAME_2)), willBe((Object) null)); - long beforeDropTimestamp = System.currentTimeMillis(); - - Thread.sleep(5); + long beforeDropTimestamp = clock.nowLong(); DropTableParams dropTableParams = DropTableParams.builder().schemaName(SCHEMA_NAME).tableName(TABLE_NAME).build(); @@ -269,24 +278,24 @@ public class CatalogServiceSelfTest { assertEquals(0, schema.id()); assertEquals(SCHEMA_NAME, schema.name()); assertEquals(3, schema.version()); - assertSame(schema, service.activeSchema(System.currentTimeMillis())); + assertSame(schema, service.activeSchema(clock.nowLong())); assertNull(schema.table(TABLE_NAME)); - assertNull(service.table(TABLE_NAME, System.currentTimeMillis())); - assertNull(service.table(1, System.currentTimeMillis())); + assertNull(service.table(TABLE_NAME, clock.nowLong())); + assertNull(service.table(1, clock.nowLong())); - assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2, System.currentTimeMillis())); - assertSame(schema.table(TABLE_NAME_2), service.table(2, System.currentTimeMillis())); + assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2, clock.nowLong())); + assertSame(schema.table(TABLE_NAME_2), service.table(2, clock.nowLong())); // Try to drop table once again. assertThat(service.dropTable(dropTableParams), willThrowFast(TableNotFoundException.class)); // Validate schema wasn't changed. - assertSame(schema, service.activeSchema(System.currentTimeMillis())); + assertSame(schema, service.activeSchema(clock.nowLong())); } @Test - public void testAddColumn() throws InterruptedException { + public void testAddColumn() { assertThat(service.createTable(simpleTable(TABLE_NAME)), willBe((Object) null)); AlterTableAddColumnParams params = AlterTableAddColumnParams.builder() @@ -301,9 +310,7 @@ public class CatalogServiceSelfTest { )) .build(); - long beforeAddedTimestamp = System.currentTimeMillis(); - - Thread.sleep(5); + long beforeAddedTimestamp = clock.nowLong(); assertThat(service.addColumn(params), willBe((Object) null)); @@ -315,7 +322,7 @@ public class CatalogServiceSelfTest { assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME)); // Validate actual catalog - schema = service.activeSchema(System.currentTimeMillis()); + schema = service.activeSchema(clock.nowLong()); assertNotNull(schema); assertNotNull(schema.table(TABLE_NAME)); @@ -335,7 +342,7 @@ public class CatalogServiceSelfTest { } @Test - public void testDropColumn() throws InterruptedException { + public void testDropColumn() { assertThat(service.createTable(simpleTable(TABLE_NAME)), willBe((Object) null)); // Validate dropping column @@ -345,9 +352,7 @@ public class CatalogServiceSelfTest { .columns(Set.of("VAL")) .build(); - long beforeAddedTimestamp = System.currentTimeMillis(); - - Thread.sleep(5); + long beforeAddedTimestamp = clock.nowLong(); assertThat(service.dropColumn(params), willBe((Object) null)); @@ -359,7 +364,7 @@ public class CatalogServiceSelfTest { assertNotNull(schema.table(TABLE_NAME).column("VAL")); // Validate actual catalog - schema = service.activeSchema(System.currentTimeMillis()); + schema = service.activeSchema(clock.nowLong()); assertNotNull(schema); assertNotNull(schema.table(TABLE_NAME)); @@ -368,7 +373,7 @@ public class CatalogServiceSelfTest { @Test public void testCreateDropColumnIfTableNotExists() { - assertNull(service.table(TABLE_NAME, System.currentTimeMillis())); + assertNull(service.table(TABLE_NAME, clock.nowLong())); // Try to add a new column. AlterTableAddColumnParams addColumnParams = AlterTableAddColumnParams.builder() @@ -413,7 +418,7 @@ public class CatalogServiceSelfTest { assertThat(service.dropColumn(params), willThrow(SqlException.class)); // Validate actual catalog - SchemaDescriptor schema = service.activeSchema(System.currentTimeMillis()); + SchemaDescriptor schema = service.activeSchema(clock.nowLong()); assertNotNull(schema); assertNotNull(schema.table(TABLE_NAME)); assertEquals(2, schema.version()); @@ -439,7 +444,7 @@ public class CatalogServiceSelfTest { assertThat(service.addColumn(addColumnParams), willThrow(ColumnAlreadyExistsException.class)); // Validate no column added. - SchemaDescriptor schema = service.activeSchema(System.currentTimeMillis()); + SchemaDescriptor schema = service.activeSchema(clock.nowLong()); assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME)); @@ -456,7 +461,7 @@ public class CatalogServiceSelfTest { assertThat(service.addColumn(addColumnParams), willBe((Object) null)); // Validate both columns added. - schema = service.activeSchema(System.currentTimeMillis()); + schema = service.activeSchema(clock.nowLong()); assertNotNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME)); assertNotNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME_2)); @@ -471,7 +476,7 @@ public class CatalogServiceSelfTest { assertThat(service.dropColumn(dropColumnParams), willBe((Object) null)); // Validate both columns dropped. - schema = service.activeSchema(System.currentTimeMillis()); + schema = service.activeSchema(clock.nowLong()); assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME)); assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME_2)); @@ -486,13 +491,13 @@ public class CatalogServiceSelfTest { assertThat(service.dropColumn(dropColumnParams), willThrow(ColumnNotFoundException.class)); // Validate no column dropped. - schema = service.activeSchema(System.currentTimeMillis()); + schema = service.activeSchema(clock.nowLong()); assertNotNull(schema.table(TABLE_NAME).column("VAL")); } @Test - public void testDropTableWithIndex() throws InterruptedException { + public void testDropTableWithIndex() { CreateHashIndexParams params = CreateHashIndexParams.builder() .indexName(INDEX_NAME) .tableName(TABLE_NAME) @@ -502,9 +507,7 @@ public class CatalogServiceSelfTest { assertThat(service.createTable(simpleTable(TABLE_NAME)), willBe((Object) null)); assertThat(service.createIndex(params), willBe((Object) null)); - long beforeDropTimestamp = System.currentTimeMillis(); - - Thread.sleep(5); + long beforeDropTimestamp = clock.nowLong(); DropTableParams dropTableParams = DropTableParams.builder().schemaName("PUBLIC").tableName(TABLE_NAME).build(); @@ -532,15 +535,15 @@ public class CatalogServiceSelfTest { assertEquals(0, schema.id()); assertEquals(CatalogService.PUBLIC, schema.name()); assertEquals(3, schema.version()); - assertSame(schema, service.activeSchema(System.currentTimeMillis())); + assertSame(schema, service.activeSchema(clock.nowLong())); assertNull(schema.table(TABLE_NAME)); - assertNull(service.table(TABLE_NAME, System.currentTimeMillis())); - assertNull(service.table(1, System.currentTimeMillis())); + assertNull(service.table(TABLE_NAME, clock.nowLong())); + assertNull(service.table(1, clock.nowLong())); assertNull(schema.index(INDEX_NAME)); - assertNull(service.index(INDEX_NAME, System.currentTimeMillis())); - assertNull(service.index(2, System.currentTimeMillis())); + assertNull(service.index(INDEX_NAME, clock.nowLong())); + assertNull(service.index(2, clock.nowLong())); } @Test @@ -567,9 +570,9 @@ public class CatalogServiceSelfTest { schema = service.schema(2); assertNotNull(schema); - assertNull(service.index(1, System.currentTimeMillis())); - assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, System.currentTimeMillis())); - assertSame(schema.index(INDEX_NAME), service.index(2, System.currentTimeMillis())); + assertNull(service.index(1, clock.nowLong())); + assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, clock.nowLong())); + assertSame(schema.index(INDEX_NAME), service.index(2, clock.nowLong())); // Validate newly created hash index HashIndexDescriptor index = (HashIndexDescriptor) schema.index(INDEX_NAME); @@ -608,9 +611,9 @@ public class CatalogServiceSelfTest { schema = service.schema(2); assertNotNull(schema); - assertNull(service.index(1, System.currentTimeMillis())); - assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, System.currentTimeMillis())); - assertSame(schema.index(INDEX_NAME), service.index(2, System.currentTimeMillis())); + assertNull(service.index(1, clock.nowLong())); + assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, clock.nowLong())); + assertSame(schema.index(INDEX_NAME), service.index(2, clock.nowLong())); // Validate newly created sorted index SortedIndexDescriptor index = (SortedIndexDescriptor) schema.index(INDEX_NAME); @@ -648,7 +651,7 @@ public class CatalogServiceSelfTest { doNothing().when(updateLogMock).registerUpdateHandler(updateHandlerCapture.capture()); - CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock); + CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock, clock); service.start(); when(updateLogMock.append(any())).thenAnswer(invocation -> { @@ -679,7 +682,7 @@ public class CatalogServiceSelfTest { public void catalogServiceManagesUpdateLogLifecycle() throws Exception { UpdateLog updateLogMock = Mockito.mock(UpdateLog.class); - CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock); + CatalogServiceImpl service = new CatalogServiceImpl(updateLogMock, clock); service.start(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java index b414e06169..1eb85a25c5 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItRebalanceDistributedTest.java @@ -743,7 +743,7 @@ public class ItRebalanceDistributedTest { metaStorageManager, clusterService); - catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageManager, vaultManager)); + catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageManager, vaultManager), hybridClock); schemaManager = new SchemaManager(registry, tablesCfg, metaStorageManager); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index ab485aa530..b43e47caa5 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -396,7 +396,7 @@ public class ItIgniteNodeRestartTest extends IgniteAbstractTest { new RaftGroupEventsClientListener() ); - var catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageMgr, vault)); + var catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageMgr, vault), hybridClock); TableManager tableManager = new TableManager( name, diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 9712457a43..fb9646cda7 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -486,7 +486,7 @@ public class IgniteImpl implements Ignite { outgoingSnapshotsManager = new OutgoingSnapshotsManager(clusterSvc.messagingService()); - catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageMgr, vaultMgr)); + catalogManager = new CatalogServiceImpl(new UpdateLogImpl(metaStorageMgr, vaultMgr), clock); distributedTblMgr = new TableManager( name,
