This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b033af0c34 IGNITE-19732 Cache IDs of configured tables (#2193)
b033af0c34 is described below
commit b033af0c34a62a3ee9e6a43bd0b855daafcf901a
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Jun 16 16:01:27 2023 +0400
IGNITE-19732 Cache IDs of configured tables (#2193)
---
.../configuration/TablesConfigurationSchema.java | 4 +
.../table/distributed/ConfiguredTablesCache.java | 130 +++++++++++++++++++++
.../internal/table/distributed/TableManager.java | 90 +++++++-------
.../distributed/ConfiguredTablesCacheTest.java | 84 +++++++++++++
4 files changed, 266 insertions(+), 42 deletions(-)
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TablesConfigurationSchema.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TablesConfigurationSchema.java
index cfcc0b7f00..e7e602ebf0 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TablesConfigurationSchema.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/TablesConfigurationSchema.java
@@ -43,4 +43,8 @@ public class TablesConfigurationSchema {
@NamedConfigValue
@IndexValidator
public TableIndexConfigurationSchema indexes;
+
+ /** This counter is increased each time a table is created or dropped. */
+ @Value(hasDefault = true)
+ public int tablesGeneration = 0;
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCache.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCache.java
new file mode 100644
index 0000000000..c1b0a6e2c9
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCache.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import it.unimi.dsi.fastutil.ints.IntRBTreeSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.IntFunction;
+import org.apache.ignite.internal.schema.configuration.TableView;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesView;
+
+/**
+ * Caches IDs of the tables that are currently configured in a way consistent
with creation/removal
+ * of tables: that is, if the set of configured table IDs changes externally
(in the configuration),
+ * the cache will not return a stale result.
+ */
+// TODO: IGNITE-19226 - remove this
+class ConfiguredTablesCache {
+ private static final int NO_GENERATION = Integer.MIN_VALUE;
+
+ private final TablesConfiguration tablesConfig;
+ private final boolean getMetadataLocallyOnly;
+
+ private int cachedGeneration = NO_GENERATION;
+ private final IntSet configuredTableIds = new IntRBTreeSet();
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private final IntFunction<Boolean> isTableConfigured =
configuredTableIds::contains;
+
+ private final IntFunction<List<Integer>> getConfiguredTableIds = unused ->
new ArrayList<>(configuredTableIds);
+
+ ConfiguredTablesCache(TablesConfiguration tablesConfig, boolean
getMetadataLocallyOnly) {
+ this.tablesConfig = tablesConfig;
+ this.getMetadataLocallyOnly = getMetadataLocallyOnly;
+ }
+
+ /**
+ * Returns whether the table is present in the configuration.
+ *
+ * @param tableId ID of the table.
+ * @return Whether the table is present in the configuration.
+ */
+ public boolean isTableConfigured(int tableId) {
+ return getConsistently(tableId, isTableConfigured);
+ }
+
+ /**
+ * Returns all configured table IDs.
+ *
+ * @return All configured table IDs.
+ */
+ public List<Integer> configuredTableIds() {
+ return getConsistently(0, getConfiguredTableIds);
+ }
+
+ private <T> T getConsistently(int intArg, IntFunction<T> getter) {
+ int currentGeneration = getCurrentGeneration();
+
+ lock.readLock().lock();
+
+ try {
+ if (cachedGenerationMatches(currentGeneration)) {
+ return getter.apply(intArg);
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+
+ lock.writeLock().lock();
+
+ try {
+ // Check again.
+ currentGeneration = getCurrentGeneration();
+
+ if (cachedGenerationMatches(currentGeneration)) {
+ return getter.apply(intArg);
+ }
+
+ refillCache();
+
+ return getter.apply(intArg);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private Integer getCurrentGeneration() {
+ return tablesConfigOrDirectProxy().tablesGeneration().value();
+ }
+
+ private TablesConfiguration tablesConfigOrDirectProxy() {
+ return getMetadataLocallyOnly ? tablesConfig :
tablesConfig.directProxy();
+ }
+
+ private boolean cachedGenerationMatches(int currentGeneration) {
+ return cachedGeneration != NO_GENERATION && cachedGeneration ==
currentGeneration;
+ }
+
+ private void refillCache() {
+ TablesView tablesView = tablesConfigOrDirectProxy().value();
+
+ configuredTableIds.clear();
+
+ for (TableView tableView : tablesView.tables()) {
+ configuredTableIds.add(tableView.id());
+ }
+
+ cachedGeneration = tablesView.tablesGeneration();
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 3534b34311..343ec3828b 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -83,7 +83,6 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.configuration.ConfigurationChangeException;
import org.apache.ignite.configuration.ConfigurationProperty;
-import org.apache.ignite.configuration.NamedConfigurationTree;
import
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.internal.affinity.AffinityUtils;
@@ -132,6 +131,7 @@ import
org.apache.ignite.internal.schema.configuration.GcConfiguration;
import org.apache.ignite.internal.schema.configuration.TableChange;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TableView;
+import org.apache.ignite.internal.schema.configuration.TablesChange;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
import org.apache.ignite.internal.schema.event.SchemaEvent;
@@ -365,6 +365,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
private final IndexBuilder indexBuilder;
+ private final ConfiguredTablesCache configuredTablesCache;
+
/**
* Creates a new table manager.
*
@@ -486,6 +488,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
lowWatermark = new LowWatermark(nodeName, gcConfig.lowWatermark(),
clock, txManager, vaultManager, mvGc);
indexBuilder = new IndexBuilder(nodeName, cpus);
+
+ configuredTablesCache = new ConfiguredTablesCache(tablesCfg,
getMetadataLocallyOnly);
}
@Override
@@ -1466,7 +1470,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
try {
-
changeTablesConfiguration(
+
changeTablesConfigurationOnTableCreate(
name,
zoneId,
tableInitChange,
@@ -1507,35 +1511,39 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
* @param tableInitChange Table changer.
* @param tblFut Future representing pending completion of the table
creation.
*/
- private void changeTablesConfiguration(
+ private void changeTablesConfigurationOnTableCreate(
String name,
int zoneId,
Consumer<TableChange> tableInitChange,
CompletableFuture<Table> tblFut
) {
- tablesCfg.change(tablesChange ->
tablesChange.changeTables(tablesListChange -> {
- if (tablesListChange.get(name) != null) {
- throw new TableAlreadyExistsException(DEFAULT_SCHEMA_NAME,
name);
- }
+ tablesCfg.change(tablesChange -> {
+ incrementTablesGeneration(tablesChange);
+
+ tablesChange.changeTables(tablesListChange -> {
+ if (tablesListChange.get(name) != null) {
+ throw new TableAlreadyExistsException(DEFAULT_SCHEMA_NAME,
name);
+ }
- tablesListChange.create(name, (tableChange) -> {
- tableInitChange.accept(tableChange);
+ tablesListChange.create(name, (tableChange) -> {
+ tableInitChange.accept(tableChange);
- tableChange.changeZoneId(zoneId);
+ tableChange.changeZoneId(zoneId);
- var extConfCh = ((ExtendedTableChange) tableChange);
+ var extConfCh = ((ExtendedTableChange) tableChange);
- int tableId = tablesChange.globalIdCounter() + 1;
+ int tableId = tablesChange.globalIdCounter() + 1;
- extConfCh.changeId(tableId);
+ extConfCh.changeId(tableId);
- tablesChange.changeGlobalIdCounter(tableId);
+ tablesChange.changeGlobalIdCounter(tableId);
- extConfCh.changeSchemaId(INITIAL_SCHEMA_VERSION);
+ extConfCh.changeSchemaId(INITIAL_SCHEMA_VERSION);
- tableCreateFuts.put(extConfCh.id(), tblFut);
+ tableCreateFuts.put(extConfCh.id(), tblFut);
+ });
});
- })).exceptionally(t -> {
+ }).exceptionally(t -> {
Throwable ex = getRootCause(t);
if (ex instanceof TableAlreadyExistsException) {
@@ -1552,6 +1560,10 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
});
}
+ private static void incrementTablesGeneration(TablesChange tablesChange) {
+ tablesChange.changeTablesGeneration(tablesChange.tablesGeneration() +
1);
+ }
+
/**
* Alters a cluster table. If an appropriate table does not exist, a
future will be completed with {@link TableNotFoundException}.
*
@@ -1679,21 +1691,25 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
return tablesCfg
- .change(chg -> chg
- .changeTables(tblChg -> {
- if (tblChg.get(name) == null) {
- throw new
TableNotFoundException(DEFAULT_SCHEMA_NAME, name);
- }
+ .change(chg -> {
+ incrementTablesGeneration(chg);
- tblChg.delete(name);
- })
- .changeIndexes(idxChg -> {
- for (TableIndexView index : idxChg) {
- if (index.tableId() == tbl.tableId()) {
- idxChg.delete(index.name());
+ chg
+ .changeTables(tblChg -> {
+ if (tblChg.get(name) == null) {
+ throw new
TableNotFoundException(DEFAULT_SCHEMA_NAME, name);
}
- }
- }))
+
+ tblChg.delete(name);
+ })
+ .changeIndexes(idxChg -> {
+ for (TableIndexView index : idxChg) {
+ if (index.tableId() == tbl.tableId()) {
+ idxChg.delete(index.name());
+ }
+ }
+ });
+ })
.exceptionally(t -> {
Throwable ex = getRootCause(t);
@@ -1764,9 +1780,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
* @return A list of direct table ids.
*/
private List<Integer> directTableIds() {
- return directProxy(tablesCfg.tables()).value().stream()
- .map(TableView::id)
- .collect(toList());
+ return configuredTablesCache.configuredTableIds();
}
/**
@@ -2005,15 +2019,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
* @return True when the table is configured into cluster, false otherwise.
*/
private boolean isTableConfigured(int id) {
- NamedConfigurationTree<TableConfiguration, TableView, TableChange>
tables = directProxy(tablesCfg.tables());
-
- for (TableView tableConfig : tables.value()) {
- if (tableConfig.id() == id) {
- return true;
- }
- }
-
- return false;
+ return configuredTablesCache.isTableConfigured(id);
}
/**
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCacheTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCacheTest.java
new file mode 100644
index 0000000000..deda91133c
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/ConfiguredTablesCacheTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ConfigurationExtension.class)
+class ConfiguredTablesCacheTest {
+ @InjectConfiguration
+ private TablesConfiguration tablesConfig;
+
+ private ConfiguredTablesCache cache;
+
+ @BeforeEach
+ void createCache() {
+ cache = new ConfiguredTablesCache(tablesConfig, false);
+ }
+
+ @Test
+ void tableIdCheckIsCoherentWithConfigGenerationUpdate() {
+ createTable(1, "t1");
+
+ assertTrue(cache.isTableConfigured(1));
+ assertFalse(cache.isTableConfigured(2));
+
+ createTable(2, "t2");
+
+ assertTrue(cache.isTableConfigured(2));
+ }
+
+ private void createTable(int tableId, String tableName) {
+ CompletableFuture<Void> future = tablesConfig.change(tablesChange -> {
+ tablesChange.changeTables(ch -> ch.create(tableName, tableChange
-> {
+ tableChange.changeId(tableId);
+ }));
+
+
tablesChange.changeTablesGeneration(tablesChange.tablesGeneration() + 1);
+ });
+
+ assertThat(future, willCompleteSuccessfully());
+ }
+
+ @Test
+ void tableIdsIsCoherentWithConfigGenerationUpdate() {
+ assertThat(cache.configuredTableIds(), is(empty()));
+
+ createTable(1, "t1");
+
+ assertThat(cache.configuredTableIds(), contains(1));
+
+ createTable(2, "t2");
+
+ assertThat(cache.configuredTableIds(), contains(1, 2));
+ }
+}