This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-19768 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 877b4ed8b842e81a863b9eb1eecd59b2499b554d Author: amashenkov <[email protected]> AuthorDate: Thu Aug 24 18:48:29 2023 +0300 Use sync service instead of configuration. --- .../table/distributed/ConfiguredTablesCache.java | 130 ----------------- .../internal/table/distributed/TableManager.java | 154 ++++++++------------- .../distributed/ConfiguredTablesCacheTest.java | 85 ------------ 3 files changed, 56 insertions(+), 313 deletions(-) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCache.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCache.java deleted file mode 100644 index c1b0a6e2c9..0000000000 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCache.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.table.distributed; - -import it.unimi.dsi.fastutil.ints.IntRBTreeSet; -import it.unimi.dsi.fastutil.ints.IntSet; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.IntFunction; -import org.apache.ignite.internal.schema.configuration.TableView; -import org.apache.ignite.internal.schema.configuration.TablesConfiguration; -import org.apache.ignite.internal.schema.configuration.TablesView; - -/** - * Caches IDs of the tables that are currently configured in a way consistent with creation/removal - * of tables: that is, if the set of configured table IDs changes externally (in the configuration), - * the cache will not return a stale result. - */ -// TODO: IGNITE-19226 - remove this -class ConfiguredTablesCache { - private static final int NO_GENERATION = Integer.MIN_VALUE; - - private final TablesConfiguration tablesConfig; - private final boolean getMetadataLocallyOnly; - - private int cachedGeneration = NO_GENERATION; - private final IntSet configuredTableIds = new IntRBTreeSet(); - - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - - private final IntFunction<Boolean> isTableConfigured = configuredTableIds::contains; - - private final IntFunction<List<Integer>> getConfiguredTableIds = unused -> new ArrayList<>(configuredTableIds); - - ConfiguredTablesCache(TablesConfiguration tablesConfig, boolean getMetadataLocallyOnly) { - this.tablesConfig = tablesConfig; - this.getMetadataLocallyOnly = getMetadataLocallyOnly; - } - - /** - * Returns whether the table is present in the configuration. - * - * @param tableId ID of the table. - * @return Whether the table is present in the configuration. - */ - public boolean isTableConfigured(int tableId) { - return getConsistently(tableId, isTableConfigured); - } - - /** - * Returns all configured table IDs. - * - * @return All configured table IDs. - */ - public List<Integer> configuredTableIds() { - return getConsistently(0, getConfiguredTableIds); - } - - private <T> T getConsistently(int intArg, IntFunction<T> getter) { - int currentGeneration = getCurrentGeneration(); - - lock.readLock().lock(); - - try { - if (cachedGenerationMatches(currentGeneration)) { - return getter.apply(intArg); - } - } finally { - lock.readLock().unlock(); - } - - lock.writeLock().lock(); - - try { - // Check again. - currentGeneration = getCurrentGeneration(); - - if (cachedGenerationMatches(currentGeneration)) { - return getter.apply(intArg); - } - - refillCache(); - - return getter.apply(intArg); - } finally { - lock.writeLock().unlock(); - } - } - - private Integer getCurrentGeneration() { - return tablesConfigOrDirectProxy().tablesGeneration().value(); - } - - private TablesConfiguration tablesConfigOrDirectProxy() { - return getMetadataLocallyOnly ? tablesConfig : tablesConfig.directProxy(); - } - - private boolean cachedGenerationMatches(int currentGeneration) { - return cachedGeneration != NO_GENERATION && cachedGeneration == currentGeneration; - } - - private void refillCache() { - TablesView tablesView = tablesConfigOrDirectProxy().value(); - - configuredTableIds.clear(); - - for (TableView tableView : tablesView.tables()) { - configuredTableIds.add(tableView.id()); - } - - cachedGeneration = tablesView.tablesGeneration(); - } -} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 370dd0f725..8ccad0fb7f 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -23,7 +23,6 @@ import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.concurrent.CompletableFuture.runAsync; -import static java.util.concurrent.CompletableFuture.supplyAsync; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.getZoneById; @@ -82,7 +81,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.ignite.configuration.ConfigurationChangeException; -import org.apache.ignite.configuration.ConfigurationProperty; import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener; import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent; import org.apache.ignite.internal.affinity.AffinityUtils; @@ -190,7 +188,6 @@ import org.apache.ignite.lang.ByteArray; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.IgniteInternalException; import org.apache.ignite.lang.IgniteStringFormatter; -import org.apache.ignite.lang.IgniteSystemProperties; import org.apache.ignite.lang.NodeStoppingException; import org.apache.ignite.lang.TableAlreadyExistsException; import org.apache.ignite.lang.TableNotFoundException; @@ -223,14 +220,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp private static final int TX_STATE_STORAGE_FLUSH_DELAY = 1000; private static final IntSupplier TX_STATE_STORAGE_FLUSH_DELAY_SUPPLIER = () -> TX_STATE_STORAGE_FLUSH_DELAY; - /** - * If this property is set to {@code true} then an attempt to get the configuration property directly from Meta storage will be skipped, - * and the local property will be returned. - * TODO: IGNITE-16774 This property and overall approach, access configuration directly through Meta storage, - * TODO: will be removed after fix of the issue. - */ - private final boolean getMetadataLocallyOnly = IgniteSystemProperties.getBoolean("IGNITE_GET_METADATA_LOCALLY_ONLY"); - /** Tables configuration. */ private final TablesConfiguration tablesCfg; @@ -379,8 +368,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp private final IndexBuilder indexBuilder; - private final ConfiguredTablesCache configuredTablesCache; - private final Marshaller raftCommandsMarshaller; /** @@ -509,8 +496,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp indexBuilder = new IndexBuilder(nodeName, cpus); - configuredTablesCache = new ConfiguredTablesCache(tablesCfg, getMetadataLocallyOnly); - raftCommandsMarshaller = new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()); } @@ -1832,51 +1817,19 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp * @return Future representing pending completion of the operation. */ private CompletableFuture<List<Table>> tablesAsyncInternal() { - return supplyAsync(() -> inBusyLock(busyLock, this::directTableIds), ioExecutor) - .thenCompose(tableIds -> inBusyLock(busyLock, () -> { - var tableFuts = new CompletableFuture[tableIds.size()]; - - var i = 0; - - for (int tblId : tableIds) { - tableFuts[i++] = tableAsyncInternal(tblId, false); - } - - return allOf(tableFuts).thenApply(unused -> inBusyLock(busyLock, () -> { - var tables = new ArrayList<Table>(tableIds.size()); - - for (var fut : tableFuts) { - var table = fut.join(); - - if (table != null) { - tables.add((Table) table); - } - } - - return tables; - })); - })); + return schemaSyncService.waitForMetadataCompleteness(clock.now()) + .thenApply(ignore -> List.copyOf(latestTablesById().values())); } /** - * Collects a list of direct table ids. + * Return actual table id by given name or {@code null} if table doesn't exist. * - * @return A list of direct table ids. + * @param tableName Table name. + * @return Table id or {@code null} if not found. */ - private List<Integer> directTableIds() { - return configuredTablesCache.configuredTableIds(); - } - - /** - * Gets direct id of table with {@code tblName}. - * - * @param tblName Name of the table. - * @return Direct id of the table, or {@code null} if the table with the {@code tblName} has not been found. - */ - @Nullable - private Integer directTableId(String tblName) { + private @Nullable Integer tableNameToId(String tableName) { try { - TableConfiguration exTblCfg = directProxy(tablesCfg.tables()).get(tblName); + TableConfiguration exTblCfg = tablesCfg.tables().get(tableName); if (exTblCfg == null) { return null; @@ -1973,7 +1926,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp throw new IgniteException(new NodeStoppingException()); } try { - return tableAsyncInternal(id, true); + return tableAsyncInternal(id); } finally { busyLock.leaveBusy(); } @@ -2012,44 +1965,70 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp /** * Gets a table by name, if it was created before. Doesn't parse canonical name. * - * @param name Table name. + * @param tableName Table name. * @return Future representing pending completion of the {@code TableManager#tableAsyncInternal} operation. */ - public CompletableFuture<TableImpl> tableAsyncInternal(String name) { + public CompletableFuture<TableImpl> tableAsyncInternal(String tableName) { if (!busyLock.enterBusy()) { throw new IgniteException(new NodeStoppingException()); } try { - return supplyAsync(() -> inBusyLock(busyLock, () -> directTableId(name)), ioExecutor) - .thenCompose(tableId -> inBusyLock(busyLock, () -> { + HybridTimestamp now = clock.now(); + + return schemaSyncService.waitForMetadataCompleteness(now) + .thenComposeAsync(ignore -> { + Integer tableId = tableNameToId(tableName); + if (tableId == null) { return completedFuture(null); } - return tableAsyncInternal(tableId, false); - })); + // Table with given name found. Wait for table initialization. + return tableReadyFuture(tableId); + }, ioExecutor); } finally { busyLock.leaveBusy(); } } /** - * Internal method for getting table by id. + * Gets a table by id, if it was created before. + * + * @param tableId Table id. + * @return Future representing pending completion of the {@code TableManager#tableAsyncInternal} operation. + */ + public CompletableFuture<TableImpl> tableAsyncInternal(int tableId) { + HybridTimestamp now = clock.now(); + + return schemaSyncService.waitForMetadataCompleteness(now) + .thenComposeAsync(ignore -> { + // Here we are sure metadata is actual. + if (catalogService.table(tableId, now.longValue()) == null) { + return completedFuture(null); + } + + // But we may need to wait for table initialization. + return tableReadyFuture(tableId); + }, ioExecutor); + } + + /** + * Internal method for getting table by id, without awaiting for actual metadata. * * @param id Table id. - * @param checkConfiguration {@code True} when the method checks a configuration before trying to get a table, {@code false} - * otherwise. * @return Future representing pending completion of the operation. */ - public CompletableFuture<TableImpl> tableAsyncInternal(int id, boolean checkConfiguration) { - CompletableFuture<Boolean> tblCfgFut = checkConfiguration - ? supplyAsync(() -> inBusyLock(busyLock, () -> isTableConfigured(id)), ioExecutor) - : completedFuture(true); + private CompletableFuture<TableImpl> tableReadyFuture(int id) { + if (!busyLock.enterBusy()) { + throw new IgniteException(new NodeStoppingException()); + } - return tblCfgFut.thenCompose(isCfg -> inBusyLock(busyLock, () -> { - if (!isCfg) { - return completedFuture(null); + try { + TableImpl table = tablesByIdVv.latest().get(id); + + if (table != null) { + return completedFuture(table); } TableImpl tbl = latestTablesById().get(id); @@ -2068,10 +2047,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp if (e != null) { getTblFut.completeExceptionally(e); } else { - TableImpl table = tables.get(id); + TableImpl table0 = tables.get(id); - if (table != null) { - getTblFut.complete(table); + if (table0 != null) { + getTblFut.complete(table0); } } }); @@ -2093,17 +2072,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp } return getTblFut.whenComplete((unused, throwable) -> assignmentsUpdatedVv.removeWhenComplete(tablesListener)); - })); - } - - /** - * Checks that the table is configured with specific id. - * - * @param id Table id. - * @return True when the table is configured into cluster, false otherwise. - */ - private boolean isTableConfigured(int id) { - return configuredTablesCache.isTableConfigured(id); + } finally { + busyLock.leaveBusy(); + } } /** @@ -2475,19 +2446,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp return new PartitionMover(busyLock, () -> internalTable.partitionRaftGroupService(partId)); } - /** - * Gets a direct accessor for the configuration distributed property. If the metadata access only locally configured the method will - * return local property accessor. - * - * @param property Distributed configuration property to receive direct access. - * @param <T> Type of the property accessor. - * @return An accessor for distributive property. - * @see #getMetadataLocallyOnly - */ - private <T extends ConfigurationProperty<?>> T directProxy(T property) { - return getMetadataLocallyOnly ? property : (T) property.directProxy(); - } - private static PeersAndLearners configurationFromAssignments(Collection<Assignment> assignments) { var peers = new HashSet<String>(); var learners = new HashSet<String>(); diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCacheTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCacheTest.java deleted file mode 100644 index 65e8d66f58..0000000000 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCacheTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.table.distributed; - -import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.is; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.util.concurrent.CompletableFuture; -import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; -import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; -import org.apache.ignite.internal.schema.configuration.TablesConfiguration; -import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - -@ExtendWith(ConfigurationExtension.class) -class ConfiguredTablesCacheTest extends BaseIgniteAbstractTest { - @InjectConfiguration - private TablesConfiguration tablesConfig; - - private ConfiguredTablesCache cache; - - @BeforeEach - void createCache() { - cache = new ConfiguredTablesCache(tablesConfig, false); - } - - @Test - void tableIdCheckIsCoherentWithConfigGenerationUpdate() { - createTable(1, "t1"); - - assertTrue(cache.isTableConfigured(1)); - assertFalse(cache.isTableConfigured(2)); - - createTable(2, "t2"); - - assertTrue(cache.isTableConfigured(2)); - } - - private void createTable(int tableId, String tableName) { - CompletableFuture<Void> future = tablesConfig.change(tablesChange -> { - tablesChange.changeTables(ch -> ch.create(tableName, tableChange -> { - tableChange.changeId(tableId); - })); - - tablesChange.changeTablesGeneration(tablesChange.tablesGeneration() + 1); - }); - - assertThat(future, willCompleteSuccessfully()); - } - - @Test - void tableIdsIsCoherentWithConfigGenerationUpdate() { - assertThat(cache.configuredTableIds(), is(empty())); - - createTable(1, "t1"); - - assertThat(cache.configuredTableIds(), contains(1)); - - createTable(2, "t2"); - - assertThat(cache.configuredTableIds(), contains(1, 2)); - } -}
