This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 74f0b7a IGNITE-15718 Issues in implementation of the method
TableManager#table(String)
74f0b7a is described below
commit 74f0b7aec6b912c93c56bea604ad1d0845b720cd
Author: Mirza Aliev <[email protected]>
AuthorDate: Wed Dec 8 10:47:10 2021 +0300
IGNITE-15718 Issues in implementation of the method
TableManager#table(String)
---
.../internal/runner/app/ItTablesApiTest.java | 48 +++++++-
.../internal/table/distributed/TableManager.java | 129 ++++++++-------------
2 files changed, 96 insertions(+), 81 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index 96260ff..2f932c2 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -54,6 +54,7 @@ import org.apache.ignite.schema.definition.ColumnDefinition;
import org.apache.ignite.schema.definition.ColumnType;
import org.apache.ignite.schema.definition.index.IndexDefinition;
import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -120,7 +121,7 @@ public class ItTablesApiTest extends IgniteAbstractTest {
* Before each.
*/
@BeforeEach
- void beforeEach(TestInfo testInfo) throws Exception {
+ void beforeEach(TestInfo testInfo) {
String metastorageNodeName = IgniteTestUtils.testNodeName(testInfo, 0);
clusterNodes = IntStream.range(0,
nodesBootstrapCfg.size()).mapToObj(value -> {
@@ -210,6 +211,51 @@ public class ItTablesApiTest extends IgniteAbstractTest {
}
/**
+ * Test scenario when we have lagged node, and tables with the same name
are deleted and created again.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testGetTableFromLaggedNode() throws Exception {
+ clusterNodes.forEach(ign ->
assertNull(ign.tables().table(TABLE_NAME)));
+
+ Ignite ignite0 = clusterNodes.get(0);
+
+ Ignite ignite1 = clusterNodes.get(1);
+
+ Table tbl = createTable(ignite0, SCHEMA, SHORT_TABLE_NAME);
+
+ Tuple tableKey = Tuple.create()
+ .set("key", 123L);
+
+ Tuple value = Tuple.create()
+ .set("valInt", 1234)
+ .set("valStr", "some string row");
+
+ tbl.keyValueView().put(null, tableKey, value);
+
+ assertEquals(value, tbl.keyValueView().get(null, tableKey));
+
+ assertEquals(value,
ignite1.tables().table(TABLE_NAME).keyValueView().get(null, tableKey));
+
+ WatchListenerInhibitor ignite1Inhibitor =
metastorageEventsInhibitor(ignite1);
+
+ ignite1Inhibitor.startInhibit();
+
+ Tuple otherValue = Tuple.create()
+ .set("valInt", 12345)
+ .set("valStr", "some other string row");
+
+ tbl.keyValueView().put(null, tableKey, otherValue);
+
+ assertEquals(otherValue, tbl.keyValueView().get(null, tableKey));
+
+ ignite1Inhibitor.stopInhibit();
+
+ assertEquals(otherValue,
ignite1.tables().table(TABLE_NAME).keyValueView().get(null, tableKey));
+ }
+
+ /**
* Trys to create an index which is already created.
*
* @throws Exception If failed.
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index b5bba99..a7b7232 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -709,7 +709,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
private CompletableFuture<Table> createTableAsyncInternal(String name,
Consumer<TableChange> tableInitChange) {
CompletableFuture<Table> tblFut = new CompletableFuture<>();
- tableAsync(name, true).thenAccept(tbl -> {
+ tableAsync(name).thenAccept(tbl -> {
if (tbl != null) {
tblFut.completeExceptionally(new
TableAlreadyExistsException(name));
} else {
@@ -810,7 +810,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
private CompletableFuture<Void> alterTableAsyncInternal(String name,
Consumer<TableChange> tableChange) {
CompletableFuture<Void> tblFut = new CompletableFuture<>();
- tableAsync(name, true).thenAccept(tbl -> {
+ tableAsync(name).thenAccept(tbl -> {
if (tbl == null) {
tblFut.completeExceptionally(new TableNotFoundException(name));
} else {
@@ -938,7 +938,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
private CompletableFuture<Void> dropTableAsyncInternal(String name) {
CompletableFuture<Void> dropTblFut = new CompletableFuture<>();
- tableAsync(name, true).thenAccept(tbl -> {
+ tableAsync(name).thenAccept(tbl -> {
// In case of drop it's an optimization that allows not to fire
drop-change-closure if there's no such
// distributed table and the local config has lagged behind.
if (tbl == null) {
@@ -998,16 +998,18 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
* @return Future representing pending completion of the operation.
*/
private CompletableFuture<List<Table>> tablesAsyncInternal() {
- List<String> tableNames = tableNamesConfigured();
- var tableFuts = new CompletableFuture[tableNames.size()];
+ List<IgniteUuid> tableIds = directTableIds();
+
+ var tableFuts = new CompletableFuture[tableIds.size()];
+
var i = 0;
- for (String tblName : tableNames) {
- tableFuts[i++] = tableAsync(tblName, false);
+ for (IgniteUuid tblId : tableIds) {
+ tableFuts[i++] = tableAsyncInternal(tblId, false);
}
return CompletableFuture.allOf(tableFuts).thenApply(unused -> {
- var tables = new ArrayList<Table>(tableNames.size());
+ var tables = new ArrayList<Table>(tableIds.size());
try {
for (var fut : tableFuts) {
@@ -1026,12 +1028,36 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
/**
- * Collects a list of table names from the distributed configuration
storage.
+ * Collects a list of direct table ids.
*
- * @return A list of table names.
+ * @return A list of direct table ids.
+ * @see DirectConfigurationProperty
*/
- private List<String> tableNamesConfigured() {
- return directProxy(tablesCfg.tables()).value().namedListKeys();
+ private List<IgniteUuid> directTableIds() {
+ NamedListView<TableView> views =
directProxy(tablesCfg.tables()).value();
+
+ List<IgniteUuid> tableUuids = new ArrayList<>();
+
+ for (int i = 0; i < views.size(); i++) {
+ ExtendedTableView extView = (ExtendedTableView) views.get(i);
+
+ tableUuids.add(IgniteUuid.fromString(extView.id()));
+ }
+
+ return tableUuids;
+ }
+
+ /**
+ * Gets direct id of table with {@code tblName}.
+ *
+ * @param tblName Name of the table.
+ * @return Direct id of the table, or {@code null} if the table with the
{@code tblName} has not been found.
+ * @see DirectConfigurationProperty
+ */
+ private IgniteUuid directTableId(String tblName) {
+ ExtendedTableView view = (ExtendedTableView)
directProxy(tablesCfg.tables()).value().get(tblName);
+
+ return view == null ? null : IgniteUuid.fromString(view.id());
}
/**
@@ -1101,7 +1127,13 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
throw new IgniteException(new NodeStoppingException());
}
try {
- return tableAsync(name, true);
+ IgniteUuid tableId = directTableId(name);
+
+ if (tableId == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ return (CompletableFuture) tableAsyncInternal(tableId, false);
} finally {
busyLock.leaveBusy();
}
@@ -1114,75 +1146,22 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
throw new NodeStoppingException();
}
try {
- return tableAsyncInternal(id);
+ return tableAsyncInternal(id, true);
} finally {
busyLock.leaveBusy();
}
}
/**
- * Gets a table if it exists or {@code null} if it was not created or was
removed before.
- *
- * @param checkConfiguration True when the method checks a configuration
before tries to get a table, false otherwise.
- * @return A table or {@code null} if table does not exist.
- */
- private CompletableFuture<Table> tableAsync(String name, boolean
checkConfiguration) {
- if (checkConfiguration && !isTableConfigured(name)) {
- return CompletableFuture.completedFuture(null);
- }
-
- Table tbl = tables.get(name);
-
- if (tbl != null) {
- return CompletableFuture.completedFuture(tbl);
- }
-
- CompletableFuture<Table> getTblFut = new CompletableFuture<>();
-
- EventListener<TableEventParameters> clo = new EventListener<>() {
- @Override
- public boolean notify(@NotNull TableEventParameters parameters,
@Nullable Throwable e) {
- String tableName = parameters.tableName();
-
- if (!name.equals(tableName)) {
- return false;
- }
-
- if (e == null) {
- getTblFut.complete(parameters.table());
- } else {
- getTblFut.completeExceptionally(e);
- }
-
- return true;
- }
-
- @Override
- public void remove(@NotNull Throwable e) {
- getTblFut.completeExceptionally(e);
- }
- };
-
- listen(TableEvent.CREATE, clo);
-
- tbl = tables.get(name);
-
- if (tbl != null && getTblFut.complete(tbl) || !isTableConfigured(name)
&& getTblFut.complete(null)) {
- removeListener(TableEvent.CREATE, clo, null);
- }
-
- return getTblFut;
- }
-
- /**
* Internal method for getting table by id.
*
* @param id Table id.
+ * @param checkConfiguration {@code True} when the method checks a
configuration before trying to get a table, {@code false} otherwise.
* @return Future representing pending completion of the operation.
*/
@NotNull
- private CompletableFuture<TableImpl> tableAsyncInternal(IgniteUuid id) {
- if (!isTableConfigured(id)) {
+ private CompletableFuture<TableImpl> tableAsyncInternal(IgniteUuid id,
boolean checkConfiguration) {
+ if (checkConfiguration && !isTableConfigured(id)) {
return CompletableFuture.completedFuture(null);
}
@@ -1250,16 +1229,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}
/**
- * Checks that the table is configured.
- *
- * @param name Table name.
- * @return True if table configured, false otherwise.
- */
- private boolean isTableConfigured(String name) {
- return tableNamesConfigured().contains(name);
- }
-
- /**
* Waits for future result and return, or unwraps {@link
CompletionException} to {@link IgniteException} if failed.
*
* @param future Completable future.