This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3be6f10abe IGNITE-18203 Table index dependency deadlock fixed (#1770)
3be6f10abe is described below
commit 3be6f10abe6b975b4f91854214b33b2ebb965cf9
Author: Alexander Lapin <[email protected]>
AuthorDate: Fri Mar 17 13:12:44 2023 +0300
IGNITE-18203 Table index dependency deadlock fixed (#1770)
---
.../internal/raft/server/RaftGroupOptions.java | 22 ++++++
.../internal/raft/server/impl/JraftServerImpl.java | 2 +
.../apache/ignite/raft/jraft/core/NodeImpl.java | 27 +++++---
.../ignite/raft/jraft/option/NodeOptions.java | 15 ++++
.../raftsnapshot/ItTableRaftSnapshotsTest.java | 2 +-
.../ignite/internal/rebalance/ItRebalanceTest.java | 2 +-
.../internal/runner/app/ItDataSchemaSyncTest.java | 2 -
.../app/ItIgniteInMemoryNodeRestartTest.java | 1 +
.../runner/app/ItIgniteNodeRestartTest.java | 14 ++--
.../ItRaftCommandLeftInLogUntilRestartTest.java | 3 +-
.../internal/sql/api/ItSqlAsynchronousApiTest.java | 2 +-
.../sql/engine/ClusterPerClassIntegrationTest.java | 2 +-
.../internal/sql/engine/ItAggregatesTest.java | 2 +-
.../internal/sql/engine/ItIndexSpoolTest.java | 2 +-
.../ignite/internal/sql/engine/ItJoinTest.java | 6 +-
.../internal/sql/engine/ItMixedQueriesTest.java | 2 +-
.../internal/sql/engine/ItOrToUnionRuleTest.java | 2 +-
.../internal/sql/engine/ItSecondaryIndexTest.java | 10 +--
.../ignite/internal/table/ItTableScanTest.java | 2 +-
.../internal/table/distributed/TableManager.java | 24 ++++---
.../replicator/PartitionReplicaListener.java | 80 ++++++++++++++++++++++
21 files changed, 176 insertions(+), 48 deletions(-)
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
index d074ae35e8..dd6d218fde 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.raft.server;
+import java.util.concurrent.CountDownLatch;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.storage.RaftMetaStorageFactory;
import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
+import org.jetbrains.annotations.Nullable;
/**
* Options specific to a Raft group that is being started.
@@ -37,6 +39,10 @@ public class RaftGroupOptions {
/** Raft meta storage factory. */
private RaftMetaStorageFactory raftMetaStorageFactory;
+ /** Nullable latch that will be completed when storage is ready to process
user requests. */
+ @Nullable
+ private CountDownLatch storageReadyLatch;
+
/**
* Returns default options as defined by classic Raft (so stores are
persistent).
*
@@ -126,4 +132,20 @@ public class RaftGroupOptions {
return this;
}
+
+ /**
+ * Returns the latch that will be completed when storage is ready to
process user requests.
+ */
+ public CountDownLatch getStorageReadyLatch() {
+ return storageReadyLatch;
+ }
+
+ /**
+ * Adds storage ready latch to options.
+ */
+ public RaftGroupOptions setStorageReadyLatch(CountDownLatch
storageReadyLatch) {
+ this.storageReadyLatch = storageReadyLatch;
+
+ return this;
+ }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index b7ab4aef36..7e38be811f 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -414,6 +414,8 @@ public class JraftServerImpl implements RaftServer {
nodeOptions.setRaftGrpEvtsLsnr(new
RaftGroupEventsListenerAdapter(nodeId.groupId(), serviceEventInterceptor,
evLsnr));
+
nodeOptions.setStorageReadyLatch(groupOptions.getStorageReadyLatch());
+
LogStorageFactory logStorageFactory =
groupOptions.getLogStorageFactory() == null
? this.logStorageFactory :
groupOptions.getLogStorageFactory();
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 4899a10ece..3a4e9aebfe 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -1063,12 +1063,19 @@ public class NodeImpl implements Node,
RaftServerService {
// Wait committed.
long commitIdx = logManager.getLastLogIndex();
- if (commitIdx > fsmCaller.getLastAppliedIndex()) {
- CountDownLatch applyCommitLatch = new CountDownLatch(1);
+ boolean externalAwaitStorageLatch = opts.getStorageReadyLatch() !=
null;
- LastAppliedLogIndexListener lnsr = lastAppliedLogIndex -> {
- if (lastAppliedLogIndex >= commitIdx) {
- applyCommitLatch.countDown();
+ if (commitIdx > fsmCaller.getLastAppliedIndex()) {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19047 Meta
storage and cmg raft log re-application in async manner
+ CountDownLatch applyCommitLatch = externalAwaitStorageLatch ?
opts.getStorageReadyLatch() : new CountDownLatch(1);
+
+ LastAppliedLogIndexListener lnsr = new
LastAppliedLogIndexListener() {
+ @Override
+ public void onApplied( long lastAppliedLogIndex) {
+ if (lastAppliedLogIndex >= commitIdx) {
+ applyCommitLatch.countDown();
+ fsmCaller.removeLastAppliedLogIndexListener(this);
+ }
}
};
@@ -1077,14 +1084,18 @@ public class NodeImpl implements Node,
RaftServerService {
fsmCaller.onCommitted(commitIdx);
try {
- applyCommitLatch.await();
-
- fsmCaller.removeLastAppliedLogIndexListener(lnsr);
+ if (!externalAwaitStorageLatch) {
+ applyCommitLatch.await();
+ }
} catch (InterruptedException e) {
LOG.error("Fail to apply committed updates.", e);
return false;
}
+ } else {
+ if (externalAwaitStorageLatch) {
+ opts.getStorageReadyLatch().countDown();
+ }
}
if (!this.rpcClientService.init(this.options)) {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index a810fa131e..fe7394cec6 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -17,6 +17,7 @@
package org.apache.ignite.raft.jraft.option;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -41,6 +42,7 @@ import org.apache.ignite.raft.jraft.util.Utils;
import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
import org.apache.ignite.raft.jraft.util.timer.Timer;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Node options.
@@ -237,6 +239,10 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
/** A hybrid clock */
private HybridClock clock = new HybridClockImpl();
+ /** Nullable latch that will be completed when storage is ready to process
user requests. */
+ @Nullable
+ private CountDownLatch storageReadyLatch;
+
/**
* Amount of Disruptors that will handle the RAFT server.
*/
@@ -635,6 +641,7 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
nodeOptions.setRpcInstallSnapshotTimeout(this.getRpcInstallSnapshotTimeout());
nodeOptions.setElectionTimeoutStrategy(this.getElectionTimeoutStrategy());
nodeOptions.setClock(this.getClock());
+ nodeOptions.setStorageReadyLatch(this.getStorageReadyLatch());
return nodeOptions;
}
@@ -674,4 +681,12 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
public void setElectionTimeoutStrategy(TimeoutStrategy
electionTimeoutStrategy) {
this.electionTimeoutStrategy = electionTimeoutStrategy;
}
+
+ public CountDownLatch getStorageReadyLatch() {
+ return storageReadyLatch;
+ }
+
+ public void setStorageReadyLatch(CountDownLatch storageReadyLatch) {
+ this.storageReadyLatch = storageReadyLatch;
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 414eb9cdb1..64f7b35fc1 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -335,7 +335,7 @@ class ItTableRaftSnapshotsTest extends
IgniteIntegrationTest {
}
private void waitForTableToStart() throws InterruptedException {
- // TODO: IGNITE-18203 - remove this wait because when a table creation
query is executed, the table must be fully ready.
+ // TODO: IGNITE-18733 - remove this wait because when a table creation
query is executed, the table must be fully ready.
BooleanSupplier tableStarted = () -> {
int numberOfStartedRaftNodes = cluster.runningNodes()
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
index 722d1616e7..8e1f6e25fd 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
@@ -217,7 +217,7 @@ public class ItRebalanceTest extends IgniteIntegrationTest {
}
private void waitForTableToStart() throws InterruptedException {
- // TODO: IGNITE-18203 - remove this wait because when a table creation
query is executed, the table must be fully ready.
+ // TODO: IGNITE-18733 - remove this wait because when a table creation
query is executed, the table must be fully ready.
BooleanSupplier tableStarted = () -> {
int numberOfStartedRaftNodes = cluster.runningNodes()
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
index 3771263772..27ab012dc3 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
@@ -52,7 +52,6 @@ import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -232,7 +231,6 @@ public class ItDataSchemaSyncTest extends
IgniteAbstractTest {
* Test correctness of schemes recovery after node restart.
*/
@Test
- @Disabled("Enable when IGNITE-18203 is fixed")
public void checkSchemasCorrectlyRestore() throws Exception {
Ignite ignite1 = clusterNodes.get(1);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index 26f9622870..3c87348064 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -185,6 +185,7 @@ public class ItIgniteInMemoryNodeRestartTest extends
IgniteAbstractTest {
/**
* Restarts an in-memory node that is not a leader of the table's
partition.
*/
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-19044")
@Test
public void inMemoryNodeRestartNotLeader(TestInfo testInfo) throws
Exception {
// Start three nodes, the first one is going to be CMG and MetaStorage
leader.
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 548e113b96..9553a22a15 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -126,6 +126,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
/**
@@ -133,6 +134,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
*/
@WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value =
"0")
@ExtendWith(ConfigurationExtension.class)
+@Timeout(value = 120)
public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
/** Default node port. */
private static final int DEFAULT_NODE_PORT = 3344;
@@ -754,8 +756,6 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
/**
* Restarts the node which stores some data.
*/
- @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart,
because indexes are required to apply RAFT commands on restart, "
- + "but the table have not started yet.")
@Test
public void nodeWithDataTest() throws InterruptedException {
IgniteImpl ignite = startNode(0);
@@ -772,8 +772,6 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
/**
* Starts two nodes and checks that the data are storing through restarts.
Nodes restart in the same order when they started at first.
*/
- @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart,
because indexes are required to apply RAFT commands on restart, "
- + "but the table have not started yet.")
@Test
public void testTwoNodesRestartDirect() throws InterruptedException {
twoNodesRestart(true);
@@ -782,8 +780,6 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
/**
* Starts two nodes and checks that the data are storing through restarts.
Nodes restart in reverse order when they started at first.
*/
- @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart,
because indexes are required to apply RAFT commands on restart, "
- + "but the table have not started yet.")
@Test
public void testTwoNodesRestartReverse() throws InterruptedException {
twoNodesRestart(false);
@@ -922,8 +918,6 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
/**
* Checks that a cluster is able to restart when some changes were made in
configuration.
*/
- @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart,
because indexes are required to apply RAFT commands on restart, "
- + "but the table have not started yet.")
@Test
public void testRestartDiffConfig() throws InterruptedException {
List<IgniteImpl> ignites = startNodes(2);
@@ -1105,7 +1099,7 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
* @param replicas Replica factor.
*/
private void createTableWithData(List<IgniteImpl> nodes, String name, int
replicas) throws InterruptedException {
- createTableWithData(nodes, name, replicas, 10);
+ createTableWithData(nodes, name, replicas, 2);
}
/**
@@ -1133,7 +1127,7 @@ public class ItIgniteNodeRestartTest extends
IgniteAbstractTest {
private void waitForIndex(Collection<IgniteImpl> nodes, String indexName)
throws InterruptedException {
// FIXME: Wait for the index to be created on all nodes,
- // this is a workaround for
https://issues.apache.org/jira/browse/IGNITE-18203 to avoid missed updates to
the PK index.
+ // this is a workaround for
https://issues.apache.org/jira/browse/IGNITE-18733 to avoid missed updates to
the PK index.
Stream<TablesConfiguration> partialTablesConfiguration =
Stream.empty();
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
index 775a423dd7..331279f78a 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -56,8 +56,7 @@ import org.junit.jupiter.api.Test;
/**
* The class has tests of cluster recovery when no all committed RAFT commands
applied to the state machine.
*/
-@Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, because
indexes are required to apply RAFT commands on restart , "
- + "but the table have not started yet.")
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-19043")
public class ItRaftCommandLeftInLogUntilRestartTest extends
ClusterPerClassIntegrationTest {
private final Object[][] dataSet = {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 6e45c4e2e0..f8b6cb8907 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -362,7 +362,7 @@ public class ItSqlAsynchronousApiTest extends
ClusterPerClassIntegrationTest {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
sql("CREATE INDEX TEST_IDX ON TEST(VAL0)");
- // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+ // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
waitForIndex("TEST_IDX");
Matcher<String> planMatcher = containsIndexScan("PUBLIC", "TEST",
"TEST_IDX");
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
index 01d41615d1..6cab3df26b 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
@@ -446,7 +446,7 @@ public abstract class ClusterPerClassIntegrationTest
extends IgniteIntegrationTe
protected static void waitForIndex(String indexName) throws
InterruptedException {
// FIXME: Wait for the index to be created on all nodes,
- // this is a workaround for
https://issues.apache.org/jira/browse/IGNITE-18203 to avoid missed updates to
the index.
+ // this is a workaround for
https://issues.apache.org/jira/browse/IGNITE-18733 to avoid missed updates to
the index.
assertTrue(waitForCondition(
() -> CLUSTER_NODES.stream().map(node ->
getIndexConfiguration(node, indexName)).allMatch(Objects::nonNull),
10_000)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
index 429f92a7f4..a700b78ca1 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
@@ -55,7 +55,7 @@ public class ItAggregatesTest extends
ClusterPerClassIntegrationTest {
sql("CREATE INDEX test_idx ON test(grp0, grp1)");
sql("CREATE INDEX test_one_col_idx_idx ON test_one_col_idx(col0)");
- // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+ // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
waitForIndex("test_idx");
waitForIndex("test_one_col_idx_idx");
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
index b24aa43278..ca19319c48 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
@@ -96,7 +96,7 @@ public class ItIndexSpoolTest extends
ClusterPerClassIntegrationTest {
sql("CREATE INDEX " + name + "_jid_idx ON " + name + "(jid)");
- // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+ // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
waitForIndex(name + "_PK");
waitForIndex(name + "_jid_idx");
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItJoinTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItJoinTest.java
index 817646bbea..f99fb6f803 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItJoinTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItJoinTest.java
@@ -41,7 +41,7 @@ public class ItJoinTest extends
ClusterPerClassIntegrationTest {
sql("create index t1_idx on t1 (c3, c2, c1)");
sql("create index t2_idx on t2 (c3, c2, c1)");
- // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+ // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
waitForIndex("t1_idx");
waitForIndex("t2_idx");
@@ -800,7 +800,7 @@ public class ItJoinTest extends
ClusterPerClassIntegrationTest {
if (indexScan) {
sql("CREATE INDEX t11_idx ON t11(i1)");
- // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+ // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
waitForIndex("t11_idx");
}
@@ -810,7 +810,7 @@ public class ItJoinTest extends
ClusterPerClassIntegrationTest {
if (indexScan) {
sql("CREATE INDEX t22_idx ON t22(i3)");
- // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+ // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
waitForIndex("t22_idx");
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItMixedQueriesTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItMixedQueriesTest.java
index 74cab4acc8..1ec09e4bcc 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItMixedQueriesTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItMixedQueriesTest.java
@@ -295,7 +295,7 @@ public class ItMixedQueriesTest extends
ClusterPerClassIntegrationTest {
sql("create index idx_asc on test_tbl (c1)");
sql("create index idx_desc on test_tbl (c1 desc)");
- // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+ // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
waitForIndex("idx_asc");
waitForIndex("idx_desc");
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
index 7dbe265507..a80f83325e 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
@@ -59,7 +59,7 @@ public class ItOrToUnionRuleTest extends
ClusterPerClassIntegrationTest {
sql("CREATE INDEX " + IDX_SUBCATEGORY + " ON products (subcategory)");
sql("CREATE INDEX " + IDX_SUBCAT_ID + " ON products (subcat_id)");
- // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+ // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
waitForIndex(IDX_CATEGORY);
waitForIndex(IDX_CAT_ID);
waitForIndex(IDX_SUBCATEGORY);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
index b27eff0186..24c3502153 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
@@ -83,7 +83,7 @@ public class ItSecondaryIndexTest extends
ClusterPerClassIntegrationTest {
sql("CREATE TABLE birthday (id INT PRIMARY KEY, name VARCHAR, birthday
DATE)");
sql("CREATE INDEX " + NAME_DATE_IDX + " ON birthday (name, birthday)");
- // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+ // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
waitForIndex(DEPID_IDX);
waitForIndex(NAME_CITY_IDX);
waitForIndex(NAME_DEPID_CITY_IDX);
@@ -129,7 +129,7 @@ public class ItSecondaryIndexTest extends
ClusterPerClassIntegrationTest {
sql("CREATE TABLE unwrap_pk(f1 VARCHAR, f2 BIGINT, f3 BIGINT, f4
BIGINT, primary key(f2, f1))");
sql("CREATE INDEX " + PK_SORTED_IDX + " ON unwrap_pk(f2, f1)");
- // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+ // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
waitForIndex(PK_SORTED_IDX);
insertData("UNWRAP_PK", List.of("F1", "F2", "F3", "F4"), new
Object[][]{
@@ -145,7 +145,7 @@ public class ItSecondaryIndexTest extends
ClusterPerClassIntegrationTest {
sql("CREATE TABLE t1 (id INT PRIMARY KEY, val INT)");
sql("CREATE INDEX t1_idx on t1(val DESC)");
- // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+ // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
waitForIndex("t1_idx");
insertData("T1", List.of("ID", "VAL"), new Object[][]{
@@ -952,7 +952,7 @@ public class ItSecondaryIndexTest extends
ClusterPerClassIntegrationTest {
try {
sql("CREATE TABLE t(i0 INTEGER PRIMARY KEY, i1 INTEGER, i2
INTEGER)");
sql("CREATE INDEX t_idx ON t(i1)");
- // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+ // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
waitForIndex("t_idx");
sql("INSERT INTO t VALUES (1, 0, null), (2, 1, null), (3, 2, 2),
(4, 3, null), (5, 4, null), (6, null, 5)");
@@ -985,7 +985,7 @@ public class ItSecondaryIndexTest extends
ClusterPerClassIntegrationTest {
try {
sql("CREATE TABLE t(i0 INTEGER PRIMARY KEY, i1 INTEGER, i2
INTEGER)");
sql("CREATE INDEX t_idx ON t(i1, i2)");
- // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+ // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
waitForIndex("t_idx");
sql("INSERT INTO t VALUES (1, null, 0), (2, 1, null), (3, 2, 2),
(4, 3, null)");
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 42c6c55a4c..8413980496 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -98,7 +98,7 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
public void beforeTest() throws InterruptedException {
TableImpl table = getOrCreateTable();
- // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+ // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
waitForIndex(SORTED_IDX);
loadData(table);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 33ddae80fc..8655e5bb94 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -58,6 +58,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -774,19 +775,18 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
CompletableFuture<Void> startGroupFut;
+ CountDownLatch storageReadyLatch = new CountDownLatch(1);
+
// start new nodes, only if it is table creation, other cases
will be covered by rebalance logic
if (oldPartAssignment.isEmpty() && localMemberAssignment !=
null) {
startGroupFut =
partitionStoragesFut.thenComposeAsync(partitionStorages -> {
- MvPartitionStorage mvPartitionStorage =
partitionStorages.getMvPartitionStorage();
-
- boolean hasData =
mvPartitionStorage.lastAppliedIndex() > 0;
-
CompletableFuture<Boolean> fut;
// If Raft is running in in-memory mode or the PDS has
been cleared, we need to remove the current node
// from the Raft group in order to avoid the double
vote problem.
// <MUTED> See
https://issues.apache.org/jira/browse/IGNITE-16668 for details.
- if (internalTbl.storage().isVolatile() || !hasData) {
+ // TODO:
https://issues.apache.org/jira/browse/IGNITE-19046 Restore "|| !hasData"
+ if (internalTbl.storage().isVolatile()) {
fut = queryDataNodesCount(tblId, partId,
newConfiguration.peers()).thenApply(dataNodesCount -> {
boolean fullPartitionRestart = dataNodesCount
== 0;
@@ -826,7 +826,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
internalTbl.storage(),
internalTbl.txStateStorage(),
partitionKey(internalTbl,
partId),
- storageUpdateHandler
+ storageUpdateHandler,
+ storageReadyLatch
);
Peer serverPeer =
newConfiguration.peer(localMemberName);
@@ -909,7 +910,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
placementDriver,
storageUpdateHandler,
this::isLocalPeer,
-
schemaManager.schemaRegistry(causalityToken, tblId)
+
schemaManager.schemaRegistry(causalityToken, tblId),
+ storageReadyLatch
)
);
} catch (NodeStoppingException ex) {
@@ -980,7 +982,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
MvTableStorage mvTableStorage,
TxStateTableStorage txStateTableStorage,
PartitionKey partitionKey,
- StorageUpdateHandler storageUpdateHandler
+ StorageUpdateHandler storageUpdateHandler,
+ CountDownLatch storageReadyLatch
) {
RaftGroupOptions raftGroupOptions;
@@ -1006,6 +1009,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
incomingSnapshotsExecutor
));
+ raftGroupOptions.setStorageReadyLatch(storageReadyLatch);
+
return raftGroupOptions;
}
@@ -2066,7 +2071,8 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
internalTable.storage(),
internalTable.txStateStorage(),
partitionKey(internalTable, partId),
- storageUpdateHandler
+ storageUpdateHandler,
+ null
);
RaftGroupListener raftGrpLsnr = new PartitionListener(
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 0b81989af4..357b2ab430 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -204,6 +205,75 @@ public class PartitionReplicaListener implements
ReplicaListener {
private final CompletableFuture<SchemaRegistry> schemaFut;
+ /** Nullable latch that will be completed when storage is ready to process
user requests. */
+ @Nullable
+ private final CountDownLatch storageReadyLatch;
+
+ /**
+ * The constructor.
+ *
+ * @param mvDataStorage Data storage.
+ * @param raftClient Raft client.
+ * @param txManager Transaction manager.
+ * @param lockManager Lock manager.
+ * @param partId Partition id.
+ * @param tableId Table id.
+ * @param indexesLockers Index lock helper objects.
+ * @param pkIndexStorage Pk index storage.
+ * @param secondaryIndexStorages Secondary index storages.
+ * @param hybridClock Hybrid clock.
+ * @param safeTime Safe time clock.
+ * @param txStateStorage Transaction state storage.
+ * @param placementDriver Placement driver.
+ * @param storageUpdateHandler Handler that processes updates writing them
to storage.
+ * @param isLocalPeerChecker Function for checking that the given peer is
local.
+ * @param schemaFut Table schema.
+ * @param storageReadyLatch Nullable latch that will be completed when
storage is ready to process user requests.
+ */
+ public PartitionReplicaListener(
+ MvPartitionStorage mvDataStorage,
+ RaftGroupService raftClient,
+ TxManager txManager,
+ LockManager lockManager,
+ Executor scanRequestExecutor,
+ int partId,
+ UUID tableId,
+ Supplier<Map<UUID, IndexLocker>> indexesLockers,
+ Lazy<TableSchemaAwareIndexStorage> pkIndexStorage,
+ Supplier<Map<UUID, TableSchemaAwareIndexStorage>>
secondaryIndexStorages,
+ HybridClock hybridClock,
+ PendingComparableValuesTracker<HybridTimestamp> safeTime,
+ TxStateStorage txStateStorage,
+ PlacementDriver placementDriver,
+ StorageUpdateHandler storageUpdateHandler,
+ Function<Peer, Boolean> isLocalPeerChecker,
+ CompletableFuture<SchemaRegistry> schemaFut,
+ CountDownLatch storageReadyLatch
+ ) {
+ this.mvDataStorage = mvDataStorage;
+ this.raftClient = raftClient;
+ this.txManager = txManager;
+ this.lockManager = lockManager;
+ this.scanRequestExecutor = scanRequestExecutor;
+ this.partId = partId;
+ this.tableId = tableId;
+ this.indexesLockers = indexesLockers;
+ this.pkIndexStorage = pkIndexStorage;
+ this.secondaryIndexStorages = secondaryIndexStorages;
+ this.hybridClock = hybridClock;
+ this.safeTime = safeTime;
+ this.txStateStorage = txStateStorage;
+ this.placementDriver = placementDriver;
+ this.isLocalPeerChecker = isLocalPeerChecker;
+ this.storageUpdateHandler = storageUpdateHandler;
+ this.schemaFut = schemaFut;
+ this.storageReadyLatch = storageReadyLatch;
+
+ this.replicationGroupId = new TablePartitionId(tableId, partId);
+
+ cursors = new
ConcurrentSkipListMap<>(IgniteUuid.globalOrderComparator());
+ }
+
/**
* The constructor.
*
@@ -260,6 +330,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
this.isLocalPeerChecker = isLocalPeerChecker;
this.storageUpdateHandler = storageUpdateHandler;
this.schemaFut = schemaFut;
+ this.storageReadyLatch = null;
this.replicationGroupId = new TablePartitionId(tableId, partId);
@@ -268,6 +339,15 @@ public class PartitionReplicaListener implements
ReplicaListener {
@Override
public CompletableFuture<?> invoke(ReplicaRequest request) {
+ try {
+ if (storageReadyLatch != null) {
+ storageReadyLatch.await();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IgniteInternalException("Interrupted while awaiting the
storage initialization.", e);
+ }
+
if (request instanceof TxStateReplicaRequest) {
return processTxStateReplicaRequest((TxStateReplicaRequest)
request);
}