This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6aa6c158d4 IGNITE-19747 Fixed flaky
ItRebalanceDistributedTest.testOnLeaderElectedRebalanceRestart (#2221)
6aa6c158d4 is described below
commit 6aa6c158d40b1551ea549e3c224360b251b34665
Author: Denis Chudov <[email protected]>
AuthorDate: Tue Jun 20 13:54:14 2023 +0300
IGNITE-19747 Fixed flaky
ItRebalanceDistributedTest.testOnLeaderElectedRebalanceRestart (#2221)
---
.../internal/table/distributed/TableManager.java | 28 ++++++++++++----------
1 file changed, 15 insertions(+), 13 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index db0cdc8cb2..d30135a0e3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1970,7 +1970,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
return completedFuture(null);
}
- TableImpl tbl = tablesByIdVv.latest().get(id);
+ TableImpl tbl = latestTablesById().get(id);
if (tbl != null) {
return completedFuture(tbl);
@@ -1978,37 +1978,39 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
CompletableFuture<TableImpl> getTblFut = new CompletableFuture<>();
- CompletionListener<Map<Integer, TableImpl>> tablesListener =
(token, tables, th) -> {
+ CompletionListener<Void> tablesListener = (token, v, th) -> {
if (th == null) {
- TableImpl table = tables.get(id);
+ CompletableFuture<Map<Integer, TableImpl>> tablesFut =
tablesByIdVv.get(token);
- if (table != null) {
- assignmentsUpdatedVv.get(token).whenComplete((v, e) ->
{
- if (e != null) {
- getTblFut.completeExceptionally(e);
- } else {
+ tablesFut.whenComplete((tables, e) -> {
+ if (e != null) {
+ getTblFut.completeExceptionally(e);
+ } else {
+ TableImpl table = tables.get(id);
+
+ if (table != null) {
getTblFut.complete(table);
}
- });
- }
+ }
+ });
} else {
getTblFut.completeExceptionally(th);
}
};
- tablesByIdVv.whenComplete(tablesListener);
+ assignmentsUpdatedVv.whenComplete(tablesListener);
// This check is needed for the case when we have registered
tablesListener,
// but tablesByIdVv has already been completed, so listener would
be triggered only for the next versioned value update.
tbl = latestTablesById().get(id);
if (tbl != null) {
- tablesByIdVv.removeWhenComplete(tablesListener);
+ assignmentsUpdatedVv.removeWhenComplete(tablesListener);
return completedFuture(tbl);
}
- return getTblFut.whenComplete((unused, throwable) ->
tablesByIdVv.removeWhenComplete(tablesListener));
+ return getTblFut.whenComplete((unused, throwable) ->
assignmentsUpdatedVv.removeWhenComplete(tablesListener));
}));
}