This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new fef7a24c2a IGNITE-19812 Do not swallow exceptions during start of a
partition (#2240)
fef7a24c2a is described below
commit fef7a24c2a029cac720d2fea3815c2a70a86b72f
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Wed Jul 12 19:19:47 2023 +0400
IGNITE-19812 Do not swallow exceptions during start of a partition (#2240)
---
.../runner/app/ItIgniteNodeRestartTest.java | 16 ++++++++-----
.../internal/table/distributed/TableManager.java | 26 +++++++++++++---------
2 files changed, 26 insertions(+), 16 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 47f2d5f4f7..7a9b323950 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -28,7 +28,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.nio.file.Files;
@@ -313,6 +312,12 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
Consumer<LongFunction<CompletableFuture<?>>> registry = (c) ->
metaStorageMgr.registerRevisionUpdateListener(c::apply);
+ var baselineManager = new BaselineManager(
+ clusterCfgMgr,
+ metaStorageMgr,
+ clusterSvc
+ );
+
DataStorageModules dataStorageModules = new
DataStorageModules(ServiceLoader.load(DataStorageModule.class));
Path storagePath = getPartitionsStorePath(dir);
@@ -362,7 +367,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
replicaMgr,
lockManager,
replicaService,
- mock(BaselineManager.class),
+ baselineManager,
clusterSvc.topologyService(),
txManager,
dataStorageManager,
@@ -416,6 +421,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
cmgManager,
replicaMgr,
txManager,
+ baselineManager,
metaStorageMgr,
clusterCfgMgr,
dataStorageManager,
@@ -967,7 +973,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* Checks that the table created in cluster of 2 nodes, is recovered on a
node after restart of this node.
*/
@Test
- public void testRecoveryOnOneNode() throws InterruptedException {
+ public void testRecoveryOnOneNode() {
IgniteImpl ignite = startNode(0);
IgniteImpl node = startNode(1);
@@ -1048,7 +1054,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
*/
@Test
@Disabled(value = "https://issues.apache.org/jira/browse/IGNITE-18919")
- public void testMetastorageStop() throws InterruptedException {
+ public void testMetastorageStop() {
int cfgGap = 4;
List<IgniteImpl> nodes = startNodes(3);
@@ -1182,7 +1188,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
* @param name Table name.
* @param replicas Replica factor.
*/
- private void createTableWithData(List<IgniteImpl> nodes, String name, int
replicas) throws InterruptedException {
+ private void createTableWithData(List<IgniteImpl> nodes, String name, int
replicas) {
createTableWithData(nodes, name, replicas, 2);
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 9e038c48f0..e7bfde3fd0 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -141,6 +141,7 @@ import org.apache.ignite.internal.schema.event.SchemaEvent;
import org.apache.ignite.internal.schema.event.SchemaEventParameters;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.engine.StorageEngine;
@@ -570,9 +571,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
// If node's recovery process is incomplete (no partition
storage), then we consider this node's
// partition storage empty.
if (mvPartition != null) {
- // If applied index of a storage is greater than 0,
- // then there is data.
- contains = mvPartition.lastAppliedIndex() > 0;
+ contains =
mvPartition.closestRowId(RowId.lowestRowId(partitionId)) != null;
}
}
@@ -775,7 +774,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
mvGc.addStorage(replicaGrpId,
partitionUpdateHandlers.gcUpdateHandler);
- CompletableFuture<Void> startGroupFut;
+ CompletableFuture<Boolean> startGroupFut;
// start new nodes, only if it is table creation, other cases
will be covered by rebalance logic
if (localMemberAssignment != null) {
@@ -811,9 +810,9 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
shouldStartGroupFut = completedFuture(true);
}
- startGroupFut =
shouldStartGroupFut.thenAcceptAsync(startGroup -> inBusyLock(busyLock, () -> {
+ startGroupFut =
shouldStartGroupFut.thenApplyAsync(startGroup -> inBusyLock(busyLock, () -> {
if (!startGroup) {
- return;
+ return false;
}
TxStateStorage txStatePartitionStorage =
partitionStorages.getTxStateStorage();
@@ -850,12 +849,14 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
),
groupOptions
);
+
+ return true;
} catch (NodeStoppingException ex) {
throw new CompletionException(ex);
}
}), ioExecutor);
} else {
- startGroupFut = completedFuture(null);
+ startGroupFut = completedFuture(false);
}
startGroupFut
@@ -870,7 +871,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
.thenAcceptAsync(updatedRaftGroupService ->
inBusyLock(busyLock, () -> {
((InternalTableImpl)
internalTbl).updateInternalTableRaftGroupService(partId,
updatedRaftGroupService);
- if (localMemberAssignment == null) {
+ boolean startedRaftNode = startGroupFut.join();
+ if (localMemberAssignment == null ||
!startedRaftNode) {
return;
}
@@ -895,10 +897,12 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
}), ioExecutor)
.whenComplete((res, ex) -> {
if (ex != null) {
- LOG.warn("Unable to update raft groups on the
node", ex);
- }
+ LOG.warn("Unable to update raft groups on the
node [tableId={}, partitionId={}]", ex, tableId, partId);
- futures[partId].complete(null);
+ futures[partId].completeExceptionally(ex);
+ } else {
+ futures[partId].complete(null);
+ }
});
}