This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 3be6f10abe IGNITE-18203 Table index dependency deadlock fixed (#1770)
3be6f10abe is described below

commit 3be6f10abe6b975b4f91854214b33b2ebb965cf9
Author: Alexander Lapin <[email protected]>
AuthorDate: Fri Mar 17 13:12:44 2023 +0300

    IGNITE-18203 Table index dependency deadlock fixed (#1770)
---
 .../internal/raft/server/RaftGroupOptions.java     | 22 ++++++
 .../internal/raft/server/impl/JraftServerImpl.java |  2 +
 .../apache/ignite/raft/jraft/core/NodeImpl.java    | 27 +++++---
 .../ignite/raft/jraft/option/NodeOptions.java      | 15 ++++
 .../raftsnapshot/ItTableRaftSnapshotsTest.java     |  2 +-
 .../ignite/internal/rebalance/ItRebalanceTest.java |  2 +-
 .../internal/runner/app/ItDataSchemaSyncTest.java  |  2 -
 .../app/ItIgniteInMemoryNodeRestartTest.java       |  1 +
 .../runner/app/ItIgniteNodeRestartTest.java        | 14 ++--
 .../ItRaftCommandLeftInLogUntilRestartTest.java    |  3 +-
 .../internal/sql/api/ItSqlAsynchronousApiTest.java |  2 +-
 .../sql/engine/ClusterPerClassIntegrationTest.java |  2 +-
 .../internal/sql/engine/ItAggregatesTest.java      |  2 +-
 .../internal/sql/engine/ItIndexSpoolTest.java      |  2 +-
 .../ignite/internal/sql/engine/ItJoinTest.java     |  6 +-
 .../internal/sql/engine/ItMixedQueriesTest.java    |  2 +-
 .../internal/sql/engine/ItOrToUnionRuleTest.java   |  2 +-
 .../internal/sql/engine/ItSecondaryIndexTest.java  | 10 +--
 .../ignite/internal/table/ItTableScanTest.java     |  2 +-
 .../internal/table/distributed/TableManager.java   | 24 ++++---
 .../replicator/PartitionReplicaListener.java       | 80 ++++++++++++++++++++++
 21 files changed, 176 insertions(+), 48 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
index d074ae35e8..dd6d218fde 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.raft.server;
 
+import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
 import org.apache.ignite.internal.raft.storage.RaftMetaStorageFactory;
 import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Options specific to a Raft group that is being started.
@@ -37,6 +39,10 @@ public class RaftGroupOptions {
     /** Raft meta storage factory. */
     private RaftMetaStorageFactory raftMetaStorageFactory;
 
+    /** Nullable latch that will be completed when storage is ready to process 
user requests. */
+    @Nullable
+    private CountDownLatch storageReadyLatch;
+
     /**
      * Returns default options as defined by classic Raft (so stores are 
persistent).
      *
@@ -126,4 +132,20 @@ public class RaftGroupOptions {
 
         return this;
     }
+
+    /**
+     * Returns the latch that will be completed when storage is ready to 
process user requests.
+     */
+    public CountDownLatch getStorageReadyLatch() {
+        return storageReadyLatch;
+    }
+
+    /**
+     * Adds storage ready latch to options.
+     */
+    public RaftGroupOptions setStorageReadyLatch(CountDownLatch 
storageReadyLatch) {
+        this.storageReadyLatch = storageReadyLatch;
+
+        return this;
+    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index b7ab4aef36..7e38be811f 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -414,6 +414,8 @@ public class JraftServerImpl implements RaftServer {
 
             nodeOptions.setRaftGrpEvtsLsnr(new 
RaftGroupEventsListenerAdapter(nodeId.groupId(), serviceEventInterceptor, 
evLsnr));
 
+            
nodeOptions.setStorageReadyLatch(groupOptions.getStorageReadyLatch());
+
             LogStorageFactory logStorageFactory = 
groupOptions.getLogStorageFactory() == null
                     ? this.logStorageFactory : 
groupOptions.getLogStorageFactory();
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 4899a10ece..3a4e9aebfe 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -1063,12 +1063,19 @@ public class NodeImpl implements Node, 
RaftServerService {
         // Wait committed.
         long commitIdx = logManager.getLastLogIndex();
 
-        if (commitIdx > fsmCaller.getLastAppliedIndex()) {
-            CountDownLatch applyCommitLatch = new CountDownLatch(1);
+        boolean externalAwaitStorageLatch = opts.getStorageReadyLatch() != 
null;
 
-            LastAppliedLogIndexListener lnsr = lastAppliedLogIndex -> {
-                if (lastAppliedLogIndex >= commitIdx) {
-                    applyCommitLatch.countDown();
+        if (commitIdx > fsmCaller.getLastAppliedIndex()) {
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-19047 Meta 
storage and cmg raft log re-application in async manner
+            CountDownLatch applyCommitLatch = externalAwaitStorageLatch ? 
opts.getStorageReadyLatch() : new CountDownLatch(1);
+
+            LastAppliedLogIndexListener lnsr = new 
LastAppliedLogIndexListener() {
+                @Override
+                public void onApplied( long lastAppliedLogIndex) {
+                    if (lastAppliedLogIndex >= commitIdx) {
+                        applyCommitLatch.countDown();
+                        fsmCaller.removeLastAppliedLogIndexListener(this);
+                    }
                 }
             };
 
@@ -1077,14 +1084,18 @@ public class NodeImpl implements Node, 
RaftServerService {
             fsmCaller.onCommitted(commitIdx);
 
             try {
-                applyCommitLatch.await();
-
-                fsmCaller.removeLastAppliedLogIndexListener(lnsr);
+                if (!externalAwaitStorageLatch) {
+                    applyCommitLatch.await();
+                }
             } catch (InterruptedException e) {
                 LOG.error("Fail to apply committed updates.", e);
 
                 return false;
             }
+        } else {
+            if (externalAwaitStorageLatch) {
+                opts.getStorageReadyLatch().countDown();
+            }
         }
 
         if (!this.rpcClientService.init(this.options)) {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index a810fa131e..fe7394cec6 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -17,6 +17,7 @@
 package org.apache.ignite.raft.jraft.option;
 
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridClock;
@@ -41,6 +42,7 @@ import org.apache.ignite.raft.jraft.util.Utils;
 import org.apache.ignite.raft.jraft.util.concurrent.FixedThreadsExecutorGroup;
 import org.apache.ignite.raft.jraft.util.timer.Timer;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Node options.
@@ -237,6 +239,10 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
     /** A hybrid clock */
     private HybridClock clock = new HybridClockImpl();
 
+    /** Nullable latch that will be completed when storage is ready to process 
user requests. */
+    @Nullable
+    private CountDownLatch storageReadyLatch;
+
     /**
      * Amount of Disruptors that will handle the RAFT server.
      */
@@ -635,6 +641,7 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
         
nodeOptions.setRpcInstallSnapshotTimeout(this.getRpcInstallSnapshotTimeout());
         
nodeOptions.setElectionTimeoutStrategy(this.getElectionTimeoutStrategy());
         nodeOptions.setClock(this.getClock());
+        nodeOptions.setStorageReadyLatch(this.getStorageReadyLatch());
 
         return nodeOptions;
     }
@@ -674,4 +681,12 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
     public void setElectionTimeoutStrategy(TimeoutStrategy 
electionTimeoutStrategy) {
         this.electionTimeoutStrategy = electionTimeoutStrategy;
     }
+
+    public CountDownLatch getStorageReadyLatch() {
+        return storageReadyLatch;
+    }
+
+    public void setStorageReadyLatch(CountDownLatch storageReadyLatch) {
+        this.storageReadyLatch = storageReadyLatch;
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 414eb9cdb1..64f7b35fc1 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -335,7 +335,7 @@ class ItTableRaftSnapshotsTest extends 
IgniteIntegrationTest {
     }
 
     private void waitForTableToStart() throws InterruptedException {
-        // TODO: IGNITE-18203 - remove this wait because when a table creation 
query is executed, the table must be fully ready.
+        // TODO: IGNITE-18733 - remove this wait because when a table creation 
query is executed, the table must be fully ready.
 
         BooleanSupplier tableStarted = () -> {
             int numberOfStartedRaftNodes = cluster.runningNodes()
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
index 722d1616e7..8e1f6e25fd 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
@@ -217,7 +217,7 @@ public class ItRebalanceTest extends IgniteIntegrationTest {
     }
 
     private void waitForTableToStart() throws InterruptedException {
-        // TODO: IGNITE-18203 - remove this wait because when a table creation 
query is executed, the table must be fully ready.
+        // TODO: IGNITE-18733 - remove this wait because when a table creation 
query is executed, the table must be fully ready.
 
         BooleanSupplier tableStarted = () -> {
             int numberOfStartedRaftNodes = cluster.runningNodes()
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
index 3771263772..27ab012dc3 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
@@ -52,7 +52,6 @@ import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.Tuple;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
@@ -232,7 +231,6 @@ public class ItDataSchemaSyncTest extends 
IgniteAbstractTest {
      * Test correctness of schemes recovery after node restart.
      */
     @Test
-    @Disabled("Enable when IGNITE-18203 is fixed")
     public void checkSchemasCorrectlyRestore() throws Exception {
         Ignite ignite1 = clusterNodes.get(1);
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
index 26f9622870..3c87348064 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteInMemoryNodeRestartTest.java
@@ -185,6 +185,7 @@ public class ItIgniteInMemoryNodeRestartTest extends 
IgniteAbstractTest {
     /**
      * Restarts an in-memory node that is not a leader of the table's 
partition.
      */
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19044";)
     @Test
     public void inMemoryNodeRestartNotLeader(TestInfo testInfo) throws 
Exception {
         // Start three nodes, the first one is going to be CMG and MetaStorage 
leader.
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 548e113b96..9553a22a15 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -126,6 +126,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 /**
@@ -133,6 +134,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
  */
 @WithSystemProperty(key = CONFIGURATION_CATCH_UP_DIFFERENCE_PROPERTY, value = 
"0")
 @ExtendWith(ConfigurationExtension.class)
+@Timeout(value = 120)
 public class ItIgniteNodeRestartTest extends IgniteAbstractTest {
     /** Default node port. */
     private static final int DEFAULT_NODE_PORT = 3344;
@@ -754,8 +756,6 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
     /**
      * Restarts the node which stores some data.
      */
-    @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, 
because indexes are required to apply RAFT commands on restart, "
-            + "but the table have not started yet.")
     @Test
     public void nodeWithDataTest() throws InterruptedException {
         IgniteImpl ignite = startNode(0);
@@ -772,8 +772,6 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
     /**
      * Starts two nodes and checks that the data are storing through restarts. 
Nodes restart in the same order when they started at first.
      */
-    @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, 
because indexes are required to apply RAFT commands on restart, "
-            + "but the table have not started yet.")
     @Test
     public void testTwoNodesRestartDirect() throws InterruptedException {
         twoNodesRestart(true);
@@ -782,8 +780,6 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
     /**
      * Starts two nodes and checks that the data are storing through restarts. 
Nodes restart in reverse order when they started at first.
      */
-    @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, 
because indexes are required to apply RAFT commands on restart, "
-            + "but the table have not started yet.")
     @Test
     public void testTwoNodesRestartReverse() throws InterruptedException {
         twoNodesRestart(false);
@@ -922,8 +918,6 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
     /**
      * Checks that a cluster is able to restart when some changes were made in 
configuration.
      */
-    @Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, 
because indexes are required to apply RAFT commands on restart, "
-            + "but the table have not started yet.")
     @Test
     public void testRestartDiffConfig() throws InterruptedException {
         List<IgniteImpl> ignites = startNodes(2);
@@ -1105,7 +1099,7 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
      * @param replicas Replica factor.
      */
     private void createTableWithData(List<IgniteImpl> nodes, String name, int 
replicas) throws InterruptedException {
-        createTableWithData(nodes, name, replicas, 10);
+        createTableWithData(nodes, name, replicas, 2);
     }
 
     /**
@@ -1133,7 +1127,7 @@ public class ItIgniteNodeRestartTest extends 
IgniteAbstractTest {
 
     private void waitForIndex(Collection<IgniteImpl> nodes, String indexName) 
throws InterruptedException {
         // FIXME: Wait for the index to be created on all nodes,
-        //  this is a workaround for 
https://issues.apache.org/jira/browse/IGNITE-18203 to avoid missed updates to 
the PK index.
+        //  this is a workaround for 
https://issues.apache.org/jira/browse/IGNITE-18733 to avoid missed updates to 
the PK index.
 
         Stream<TablesConfiguration> partialTablesConfiguration = 
Stream.empty();
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
index 775a423dd7..331279f78a 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -56,8 +56,7 @@ import org.junit.jupiter.api.Test;
 /**
  * The class has tests of cluster recovery when no all committed RAFT commands 
applied to the state machine.
  */
-@Disabled("IGNITE-18203 The test goes to deadlock in cluster restart, because 
indexes are required to apply RAFT commands on restart , "
-        + "but the table have not started yet.")
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-19043";)
 public class ItRaftCommandLeftInLogUntilRestartTest extends 
ClusterPerClassIntegrationTest {
 
     private final Object[][] dataSet = {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 6e45c4e2e0..f8b6cb8907 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -362,7 +362,7 @@ public class ItSqlAsynchronousApiTest extends 
ClusterPerClassIntegrationTest {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
         sql("CREATE INDEX TEST_IDX ON TEST(VAL0)");
 
-        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
         waitForIndex("TEST_IDX");
 
         Matcher<String> planMatcher = containsIndexScan("PUBLIC", "TEST", 
"TEST_IDX");
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
index 01d41615d1..6cab3df26b 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ClusterPerClassIntegrationTest.java
@@ -446,7 +446,7 @@ public abstract class ClusterPerClassIntegrationTest 
extends IgniteIntegrationTe
 
     protected static void waitForIndex(String indexName) throws 
InterruptedException {
         // FIXME: Wait for the index to be created on all nodes,
-        //  this is a workaround for 
https://issues.apache.org/jira/browse/IGNITE-18203 to avoid missed updates to 
the index.
+        //  this is a workaround for 
https://issues.apache.org/jira/browse/IGNITE-18733 to avoid missed updates to 
the index.
         assertTrue(waitForCondition(
                 () -> CLUSTER_NODES.stream().map(node -> 
getIndexConfiguration(node, indexName)).allMatch(Objects::nonNull),
                 10_000)
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
index 429f92a7f4..a700b78ca1 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItAggregatesTest.java
@@ -55,7 +55,7 @@ public class ItAggregatesTest extends 
ClusterPerClassIntegrationTest {
         sql("CREATE INDEX test_idx ON test(grp0, grp1)");
         sql("CREATE INDEX test_one_col_idx_idx ON test_one_col_idx(col0)");
 
-        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
         waitForIndex("test_idx");
         waitForIndex("test_one_col_idx_idx");
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
index b24aa43278..ca19319c48 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexSpoolTest.java
@@ -96,7 +96,7 @@ public class ItIndexSpoolTest extends 
ClusterPerClassIntegrationTest {
 
             sql("CREATE INDEX " + name + "_jid_idx ON " + name + "(jid)");
 
-            // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+            // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
             waitForIndex(name + "_PK");
             waitForIndex(name + "_jid_idx");
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItJoinTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItJoinTest.java
index 817646bbea..f99fb6f803 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItJoinTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItJoinTest.java
@@ -41,7 +41,7 @@ public class ItJoinTest extends 
ClusterPerClassIntegrationTest {
         sql("create index t1_idx on t1 (c3, c2, c1)");
         sql("create index t2_idx on t2 (c3, c2, c1)");
 
-        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
         waitForIndex("t1_idx");
         waitForIndex("t2_idx");
 
@@ -800,7 +800,7 @@ public class ItJoinTest extends 
ClusterPerClassIntegrationTest {
 
             if (indexScan) {
                 sql("CREATE INDEX t11_idx ON t11(i1)");
-                // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+                // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
                 waitForIndex("t11_idx");
             }
 
@@ -810,7 +810,7 @@ public class ItJoinTest extends 
ClusterPerClassIntegrationTest {
 
             if (indexScan) {
                 sql("CREATE INDEX t22_idx ON t22(i3)");
-                // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+                // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
                 waitForIndex("t22_idx");
             }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItMixedQueriesTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItMixedQueriesTest.java
index 74cab4acc8..1ec09e4bcc 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItMixedQueriesTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItMixedQueriesTest.java
@@ -295,7 +295,7 @@ public class ItMixedQueriesTest extends 
ClusterPerClassIntegrationTest {
         sql("create index idx_asc on test_tbl (c1)");
         sql("create index idx_desc on test_tbl (c1 desc)");
 
-        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
         waitForIndex("idx_asc");
         waitForIndex("idx_desc");
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
index 7dbe265507..a80f83325e 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
@@ -59,7 +59,7 @@ public class ItOrToUnionRuleTest extends 
ClusterPerClassIntegrationTest {
         sql("CREATE INDEX " + IDX_SUBCATEGORY + " ON products (subcategory)");
         sql("CREATE INDEX " + IDX_SUBCAT_ID + " ON products (subcat_id)");
 
-        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
         waitForIndex(IDX_CATEGORY);
         waitForIndex(IDX_CAT_ID);
         waitForIndex(IDX_SUBCATEGORY);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
index b27eff0186..24c3502153 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSecondaryIndexTest.java
@@ -83,7 +83,7 @@ public class ItSecondaryIndexTest extends 
ClusterPerClassIntegrationTest {
         sql("CREATE TABLE birthday (id INT PRIMARY KEY, name VARCHAR, birthday 
DATE)");
         sql("CREATE INDEX " + NAME_DATE_IDX + " ON birthday (name, birthday)");
 
-        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
         waitForIndex(DEPID_IDX);
         waitForIndex(NAME_CITY_IDX);
         waitForIndex(NAME_DEPID_CITY_IDX);
@@ -129,7 +129,7 @@ public class ItSecondaryIndexTest extends 
ClusterPerClassIntegrationTest {
         sql("CREATE TABLE unwrap_pk(f1 VARCHAR, f2 BIGINT, f3 BIGINT, f4 
BIGINT, primary key(f2, f1))");
         sql("CREATE INDEX " + PK_SORTED_IDX + " ON unwrap_pk(f2, f1)");
 
-        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
         waitForIndex(PK_SORTED_IDX);
 
         insertData("UNWRAP_PK", List.of("F1", "F2", "F3", "F4"), new 
Object[][]{
@@ -145,7 +145,7 @@ public class ItSecondaryIndexTest extends 
ClusterPerClassIntegrationTest {
         sql("CREATE TABLE t1 (id INT PRIMARY KEY, val INT)");
         sql("CREATE INDEX t1_idx on t1(val DESC)");
 
-        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
         waitForIndex("t1_idx");
 
         insertData("T1", List.of("ID", "VAL"), new Object[][]{
@@ -952,7 +952,7 @@ public class ItSecondaryIndexTest extends 
ClusterPerClassIntegrationTest {
         try {
             sql("CREATE TABLE t(i0 INTEGER PRIMARY KEY, i1 INTEGER, i2 
INTEGER)");
             sql("CREATE INDEX t_idx ON t(i1)");
-            // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+            // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
             waitForIndex("t_idx");
             sql("INSERT INTO t VALUES (1, 0, null), (2, 1, null), (3, 2, 2), 
(4, 3, null), (5, 4, null), (6, null, 5)");
 
@@ -985,7 +985,7 @@ public class ItSecondaryIndexTest extends 
ClusterPerClassIntegrationTest {
         try {
             sql("CREATE TABLE t(i0 INTEGER PRIMARY KEY, i1 INTEGER, i2 
INTEGER)");
             sql("CREATE INDEX t_idx ON t(i1, i2)");
-            // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+            // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
             waitForIndex("t_idx");
             sql("INSERT INTO t VALUES (1, null, 0), (2, 1, null), (3, 2, 2), 
(4, 3, null)");
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 42c6c55a4c..8413980496 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -98,7 +98,7 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
     public void beforeTest() throws InterruptedException {
         TableImpl table = getOrCreateTable();
 
-        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18203
+        // FIXME: https://issues.apache.org/jira/browse/IGNITE-18733
         waitForIndex(SORTED_IDX);
 
         loadData(table);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 33ddae80fc..8655e5bb94 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -58,6 +58,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -774,19 +775,18 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                 CompletableFuture<Void> startGroupFut;
 
+                CountDownLatch storageReadyLatch = new CountDownLatch(1);
+
                 // start new nodes, only if it is table creation, other cases 
will be covered by rebalance logic
                 if (oldPartAssignment.isEmpty() && localMemberAssignment != 
null) {
                     startGroupFut = 
partitionStoragesFut.thenComposeAsync(partitionStorages -> {
-                        MvPartitionStorage mvPartitionStorage = 
partitionStorages.getMvPartitionStorage();
-
-                        boolean hasData = 
mvPartitionStorage.lastAppliedIndex() > 0;
-
                         CompletableFuture<Boolean> fut;
 
                         // If Raft is running in in-memory mode or the PDS has 
been cleared, we need to remove the current node
                         // from the Raft group in order to avoid the double 
vote problem.
                         // <MUTED> See 
https://issues.apache.org/jira/browse/IGNITE-16668 for details.
-                        if (internalTbl.storage().isVolatile() || !hasData) {
+                        // TODO: 
https://issues.apache.org/jira/browse/IGNITE-19046 Restore "|| !hasData"
+                        if (internalTbl.storage().isVolatile()) {
                             fut = queryDataNodesCount(tblId, partId, 
newConfiguration.peers()).thenApply(dataNodesCount -> {
                                 boolean fullPartitionRestart = dataNodesCount 
== 0;
 
@@ -826,7 +826,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                                 internalTbl.storage(),
                                                 internalTbl.txStateStorage(),
                                                 partitionKey(internalTbl, 
partId),
-                                                storageUpdateHandler
+                                                storageUpdateHandler,
+                                                storageReadyLatch
                                         );
 
                                         Peer serverPeer = 
newConfiguration.peer(localMemberName);
@@ -909,7 +910,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                                     placementDriver,
                                                     storageUpdateHandler,
                                                     this::isLocalPeer,
-                                                    
schemaManager.schemaRegistry(causalityToken, tblId)
+                                                    
schemaManager.schemaRegistry(causalityToken, tblId),
+                                                    storageReadyLatch
                                             )
                                     );
                                 } catch (NodeStoppingException ex) {
@@ -980,7 +982,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             MvTableStorage mvTableStorage,
             TxStateTableStorage txStateTableStorage,
             PartitionKey partitionKey,
-            StorageUpdateHandler storageUpdateHandler
+            StorageUpdateHandler storageUpdateHandler,
+            CountDownLatch storageReadyLatch
     ) {
         RaftGroupOptions raftGroupOptions;
 
@@ -1006,6 +1009,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 incomingSnapshotsExecutor
         ));
 
+        raftGroupOptions.setStorageReadyLatch(storageReadyLatch);
+
         return raftGroupOptions;
     }
 
@@ -2066,7 +2071,8 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                 internalTable.storage(),
                                 internalTable.txStateStorage(),
                                 partitionKey(internalTable, partId),
-                                storageUpdateHandler
+                                storageUpdateHandler,
+                                null
                         );
 
                         RaftGroupListener raftGrpLsnr = new PartitionListener(
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 0b81989af4..357b2ab430 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -43,6 +43,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -204,6 +205,75 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
     private final CompletableFuture<SchemaRegistry> schemaFut;
 
+    /** Nullable latch that will be completed when storage is ready to process 
user requests. */
+    @Nullable
+    private final CountDownLatch storageReadyLatch;
+
+    /**
+     * The constructor.
+     *
+     * @param mvDataStorage Data storage.
+     * @param raftClient Raft client.
+     * @param txManager Transaction manager.
+     * @param lockManager Lock manager.
+     * @param partId Partition id.
+     * @param tableId Table id.
+     * @param indexesLockers Index lock helper objects.
+     * @param pkIndexStorage Pk index storage.
+     * @param secondaryIndexStorages Secondary index storages.
+     * @param hybridClock Hybrid clock.
+     * @param safeTime Safe time clock.
+     * @param txStateStorage Transaction state storage.
+     * @param placementDriver Placement driver.
+     * @param storageUpdateHandler Handler that processes updates writing them 
to storage.
+     * @param isLocalPeerChecker Function for checking that the given peer is 
local.
+     * @param schemaFut Table schema.
+     * @param storageReadyLatch Nullable latch that will be completed when 
storage is ready to process user requests.
+     */
+    public PartitionReplicaListener(
+            MvPartitionStorage mvDataStorage,
+            RaftGroupService raftClient,
+            TxManager txManager,
+            LockManager lockManager,
+            Executor scanRequestExecutor,
+            int partId,
+            UUID tableId,
+            Supplier<Map<UUID, IndexLocker>> indexesLockers,
+            Lazy<TableSchemaAwareIndexStorage> pkIndexStorage,
+            Supplier<Map<UUID, TableSchemaAwareIndexStorage>> 
secondaryIndexStorages,
+            HybridClock hybridClock,
+            PendingComparableValuesTracker<HybridTimestamp> safeTime,
+            TxStateStorage txStateStorage,
+            PlacementDriver placementDriver,
+            StorageUpdateHandler storageUpdateHandler,
+            Function<Peer, Boolean> isLocalPeerChecker,
+            CompletableFuture<SchemaRegistry> schemaFut,
+            CountDownLatch storageReadyLatch
+    ) {
+        this.mvDataStorage = mvDataStorage;
+        this.raftClient = raftClient;
+        this.txManager = txManager;
+        this.lockManager = lockManager;
+        this.scanRequestExecutor = scanRequestExecutor;
+        this.partId = partId;
+        this.tableId = tableId;
+        this.indexesLockers = indexesLockers;
+        this.pkIndexStorage = pkIndexStorage;
+        this.secondaryIndexStorages = secondaryIndexStorages;
+        this.hybridClock = hybridClock;
+        this.safeTime = safeTime;
+        this.txStateStorage = txStateStorage;
+        this.placementDriver = placementDriver;
+        this.isLocalPeerChecker = isLocalPeerChecker;
+        this.storageUpdateHandler = storageUpdateHandler;
+        this.schemaFut = schemaFut;
+        this.storageReadyLatch = storageReadyLatch;
+
+        this.replicationGroupId = new TablePartitionId(tableId, partId);
+
+        cursors = new 
ConcurrentSkipListMap<>(IgniteUuid.globalOrderComparator());
+    }
+
     /**
      * The constructor.
      *
@@ -260,6 +330,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         this.isLocalPeerChecker = isLocalPeerChecker;
         this.storageUpdateHandler = storageUpdateHandler;
         this.schemaFut = schemaFut;
+        this.storageReadyLatch = null;
 
         this.replicationGroupId = new TablePartitionId(tableId, partId);
 
@@ -268,6 +339,15 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
     @Override
     public CompletableFuture<?> invoke(ReplicaRequest request) {
+        try {
+            if (storageReadyLatch != null) {
+                storageReadyLatch.await();
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IgniteInternalException("Interrupted while awaiting the 
storage initialization.", e);
+        }
+
         if (request instanceof TxStateReplicaRequest) {
             return processTxStateReplicaRequest((TxStateReplicaRequest) 
request);
         }


Reply via email to