This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c9d3f77b1 IGNITE-20624 Fix race between getting logical topology and 
mapping fragments (#2710)
7c9d3f77b1 is described below

commit 7c9d3f77b1369433c7c4bc4142bba85fc7edeed5
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Thu Oct 26 10:57:58 2023 +0300

    IGNITE-20624 Fix race between getting logical topology and mapping 
fragments (#2710)
---
 .../internal/sql/engine/SqlQueryProcessor.java     |   2 +-
 .../engine/exec/mapping/MappingServiceImpl.java    |  74 +++++++++--
 .../internal/sql/engine/prepare/MultiStepPlan.java |   2 +-
 .../sql/engine/exec/ExecutionServiceImplTest.java  |   2 +-
 .../exec/mapping/MappingServiceImplTest.java       | 148 +++++++++++++++++++++
 .../sql/engine/framework/TestBuilders.java         |   2 +-
 6 files changed, 212 insertions(+), 18 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 3afc2a48ad..8ff3359a0b 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -310,7 +310,7 @@ public class SqlQueryProcessor implements QueryProcessor {
             }
         };
 
-        var mappingService = new MappingServiceImpl(nodeName, 
executionTargetProvider);
+        var mappingService = new MappingServiceImpl(nodeName, 
executionTargetProvider, taskExecutor);
 
         logicalTopologyService.addEventListener(mappingService);
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index e84f62c928..0c6c567a5e 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -29,9 +29,8 @@ import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
 import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
@@ -54,24 +53,44 @@ import org.apache.ignite.lang.ErrorGroups.Sql;
 public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventListener {
     private static final int MAPPING_ATTEMPTS = 3;
 
-    private final AtomicReference<Set<String>> nodesSetRef = new 
AtomicReference<>(Set.of());
+    private final LogicalTopologyHolder topologyHolder = new 
LogicalTopologyHolder();
+    private final CompletableFuture<Void> initialTopologyFuture = new 
CompletableFuture<>();
 
     private final String localNodeName;
     private final ExecutionTargetProvider targetProvider;
+    private final Executor taskExecutor;
 
+
+    /**
+     * Constructor.
+     *
+     * @param localNodeName Name of the current Ignite node.
+     * @param targetProvider Execution target provider.
+     * @param taskExecutor Mapper service task executor.
+     */
     public MappingServiceImpl(
             String localNodeName,
-            ExecutionTargetProvider targetProvider
+            ExecutionTargetProvider targetProvider,
+            Executor taskExecutor
     ) {
         this.localNodeName = localNodeName;
         this.targetProvider = targetProvider;
+        this.taskExecutor = taskExecutor;
     }
 
     @Override
     public CompletableFuture<List<MappedFragment>> map(MultiStepPlan 
multiStepPlan) {
+        if (initialTopologyFuture.isDone()) {
+            return map0(multiStepPlan);
+        }
+
+        return initialTopologyFuture.thenComposeAsync(ignore -> 
map0(multiStepPlan), taskExecutor);
+    }
+
+    private CompletableFuture<List<MappedFragment>> map0(MultiStepPlan 
multiStepPlan) {
         List<Fragment> fragments = multiStepPlan.fragments();
 
-        List<String> nodes = List.copyOf(nodesSetRef.get());
+        List<String> nodes = topologyHolder.nodes();
 
         MappingContext context = new MappingContext(localNodeName, nodes);
 
@@ -200,23 +219,17 @@ public class MappingServiceImpl implements 
MappingService, LogicalTopologyEventL
 
     @Override
     public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot 
newTopology) {
-        nodesSetRef.set(deriveNodeNames(newTopology));
+        topologyHolder.update(newTopology);
     }
 
     @Override
     public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot 
newTopology) {
-        nodesSetRef.set(deriveNodeNames(newTopology));
+        topologyHolder.update(newTopology);
     }
 
     @Override
     public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
-        nodesSetRef.set(deriveNodeNames(newTopology));
-    }
-
-    private static Set<String> deriveNodeNames(LogicalTopologySnapshot 
topology) {
-        return topology.nodes().stream()
-                .map(LogicalNode::name)
-                .collect(Collectors.toUnmodifiableSet());
+        topologyHolder.update(newTopology);
     }
 
     private static List<Fragment> replace(
@@ -258,4 +271,37 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
 
         return newFragments;
     }
+
+    /**
+     * Holder for topology snapshots that guarantees monotonically increasing 
versions.
+     */
+    class LogicalTopologyHolder {
+        private volatile List<String> nodes = List.of();
+        private long ver = Long.MIN_VALUE;
+
+        void update(LogicalTopologySnapshot topologySnapshot) {
+            synchronized (this) {
+                if (ver < topologySnapshot.version()) {
+                    nodes = deriveNodeNames(topologySnapshot);
+                    ver = topologySnapshot.version();
+                }
+
+                if (initialTopologyFuture.isDone() || 
!nodes.contains(localNodeName)) {
+                    return;
+                }
+            }
+
+            initialTopologyFuture.complete(null);
+        }
+
+        List<String> nodes() {
+            return nodes;
+        }
+
+        private List<String> deriveNodeNames(LogicalTopologySnapshot topology) 
{
+            return topology.nodes().stream()
+                    .map(LogicalNode::name)
+                    .collect(Collectors.toUnmodifiableList());
+        }
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
index 5765a50b97..7cca94a716 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
@@ -34,7 +34,7 @@ public class MultiStepPlan implements QueryPlan {
     protected final List<Fragment> fragments;
 
     /** Constructor. */
-    MultiStepPlan(SqlQueryType type, List<Fragment> fragments, 
ResultSetMetadata meta) {
+    public MultiStepPlan(SqlQueryType type, List<Fragment> fragments, 
ResultSetMetadata meta) {
         this.type = type;
         this.fragments = fragments;
         this.meta = meta;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 741f6d8578..9860a7d4bb 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -755,7 +755,7 @@ public class ExecutionServiceImplTest extends 
BaseIgniteAbstractTest {
             }
         };
 
-        var mappingService = new MappingServiceImpl(nodeName, targetProvider);
+        var mappingService = new MappingServiceImpl(nodeName, targetProvider, 
taskExecutor);
 
         List<LogicalNode> logicalNodes = nodeNames.stream()
                 .map(name -> new LogicalNode(name, name, 
NetworkAddress.from("127.0.0.1:10000")))
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
new file mode 100644
index 0000000000..eb002020e2
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.mapping;
+
+import static org.apache.ignite.internal.sql.engine.SqlQueryType.QUERY;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.sql.api.ResultSetMetadataImpl;
+import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+/**
+ * Test class to verify {@link MappingServiceImpl}.
+ */
+public class MappingServiceImplTest extends BaseIgniteAbstractTest {
+    @Test
+    public void serviceInitializationTest() {
+        String localNodeName = "NODE0";
+
+        MappingServiceImpl mappingService = 
createMappingService(localNodeName, List.of(localNodeName));
+        mappingService.onNodeJoined(Mockito.mock(LogicalNode.class), new 
LogicalTopologySnapshot(1, logicalNodes(localNodeName)));
+
+        CompletableFuture<List<MappedFragment>> mappingFuture = mappingService
+                .map(new MultiStepPlan(QUERY, List.of(), new 
ResultSetMetadataImpl(List.of())));
+
+        assertThat(mappingFuture, willSucceedFast());
+    }
+
+    @Test
+    public void lateServiceInitializationOnTopologyLeap() {
+        String localNodeName = "NODE";
+        List<String> nodeNames = List.of("NODE1");
+
+        MappingServiceImpl mappingService = 
createMappingService(localNodeName, nodeNames);
+
+        CompletableFuture<List<MappedFragment>> mappingFuture = mappingService
+                .map(new MultiStepPlan(QUERY, List.of(), new 
ResultSetMetadataImpl(List.of())));
+
+        // Mapping should wait for service initialization.
+        assertFalse(mappingFuture.isDone());
+
+        // Join another node affect nothing.
+        mappingService.onTopologyLeap(new LogicalTopologySnapshot(1, 
logicalNodes("NODE1", "NODE2")));
+        assertThat(mappingFuture, willThrowFast(TimeoutException.class));
+
+        // Joining local node completes initialization.
+        mappingService.onTopologyLeap(new LogicalTopologySnapshot(2, 
logicalNodes("NODE", "NODE1", "NODE2")));
+
+        assertThat(mappingFuture, willSucceedFast());
+        assertThat(mappingService.map(new MultiStepPlan(QUERY, List.of(), new 
ResultSetMetadataImpl(List.of()))), willSucceedFast());
+    }
+
+    @Test
+    public void lateServiceInitializationOnNodeJoin() {
+        String localNodeName = "NODE";
+        List<String> nodeNames = List.of("NODE1");
+
+        MappingServiceImpl mappingService = 
createMappingService(localNodeName, nodeNames);
+
+        CompletableFuture<List<MappedFragment>> mappingFuture = mappingService
+                .map(new MultiStepPlan(QUERY, List.of(), new 
ResultSetMetadataImpl(List.of())));
+
+        // Mapping should wait for service initialization.
+        assertFalse(mappingFuture.isDone());
+
+        // Join another node affect nothing.
+        mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
+                new LogicalTopologySnapshot(1, logicalNodes("NODE1", 
"NODE2")));
+
+        assertThat(mappingFuture, willThrowFast(TimeoutException.class));
+
+        // Joining local node completes initialization.
+        mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
+                new LogicalTopologySnapshot(2, logicalNodes("NODE", "NODE1", 
"NODE2")));
+
+        assertThat(mappingFuture, willSucceedFast());
+        assertThat(mappingService.map(new MultiStepPlan(QUERY, List.of(), new 
ResultSetMetadataImpl(List.of()))), willSucceedFast());
+    }
+
+    private static List<LogicalNode> logicalNodes(String... nodeNames) {
+        return Arrays.stream(nodeNames)
+                .map(name -> new LogicalNode(name, name, 
NetworkAddress.from("127.0.0.1:10000")))
+                .collect(Collectors.toList());
+    }
+
+    private static MappingServiceImpl createMappingService(String 
localNodeName, List<String> nodeNames) {
+        var targetProvider = new ExecutionTargetProvider() {
+            @Override
+            public CompletableFuture<ExecutionTarget> 
forTable(ExecutionTargetFactory factory, IgniteTable table) {
+                return 
CompletableFuture.completedFuture(factory.allOf(nodeNames));
+            }
+
+            @Override
+            public CompletableFuture<ExecutionTarget> 
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
+                return CompletableFuture.failedFuture(new AssertionError("Not 
supported"));
+            }
+        };
+
+        return new MappingServiceImpl(localNodeName, targetProvider, 
Runnable::run);
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 03c8e032cc..33652960f6 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -388,7 +388,7 @@ public class TestBuilders {
 
             Map<String, TestNode> nodes = nodeNames.stream()
                     .map(name -> {
-                        var mappingService = new MappingServiceImpl(name, 
targetProvider);
+                        var mappingService = new MappingServiceImpl(name, 
targetProvider, Runnable::run);
 
                         mappingService.onTopologyLeap(new 
LogicalTopologySnapshot(1L, logicalNodes));
 

Reply via email to