This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7c9d3f77b1 IGNITE-20624 Fix race between getting logical topology and
mapping fragments (#2710)
7c9d3f77b1 is described below
commit 7c9d3f77b1369433c7c4bc4142bba85fc7edeed5
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Thu Oct 26 10:57:58 2023 +0300
IGNITE-20624 Fix race between getting logical topology and mapping
fragments (#2710)
---
.../internal/sql/engine/SqlQueryProcessor.java | 2 +-
.../engine/exec/mapping/MappingServiceImpl.java | 74 +++++++++--
.../internal/sql/engine/prepare/MultiStepPlan.java | 2 +-
.../sql/engine/exec/ExecutionServiceImplTest.java | 2 +-
.../exec/mapping/MappingServiceImplTest.java | 148 +++++++++++++++++++++
.../sql/engine/framework/TestBuilders.java | 2 +-
6 files changed, 212 insertions(+), 18 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 3afc2a48ad..8ff3359a0b 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -310,7 +310,7 @@ public class SqlQueryProcessor implements QueryProcessor {
}
};
- var mappingService = new MappingServiceImpl(nodeName,
executionTargetProvider);
+ var mappingService = new MappingServiceImpl(nodeName,
executionTargetProvider, taskExecutor);
logicalTopologyService.addEventListener(mappingService);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index e84f62c928..0c6c567a5e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -29,9 +29,8 @@ import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
@@ -54,24 +53,44 @@ import org.apache.ignite.lang.ErrorGroups.Sql;
public class MappingServiceImpl implements MappingService,
LogicalTopologyEventListener {
private static final int MAPPING_ATTEMPTS = 3;
- private final AtomicReference<Set<String>> nodesSetRef = new
AtomicReference<>(Set.of());
+ private final LogicalTopologyHolder topologyHolder = new
LogicalTopologyHolder();
+ private final CompletableFuture<Void> initialTopologyFuture = new
CompletableFuture<>();
private final String localNodeName;
private final ExecutionTargetProvider targetProvider;
+ private final Executor taskExecutor;
+
+ /**
+ * Constructor.
+ *
+ * @param localNodeName Name of the current Ignite node.
+ * @param targetProvider Execution target provider.
+ * @param taskExecutor Mapper service task executor.
+ */
public MappingServiceImpl(
String localNodeName,
- ExecutionTargetProvider targetProvider
+ ExecutionTargetProvider targetProvider,
+ Executor taskExecutor
) {
this.localNodeName = localNodeName;
this.targetProvider = targetProvider;
+ this.taskExecutor = taskExecutor;
}
@Override
public CompletableFuture<List<MappedFragment>> map(MultiStepPlan
multiStepPlan) {
+ if (initialTopologyFuture.isDone()) {
+ return map0(multiStepPlan);
+ }
+
+ return initialTopologyFuture.thenComposeAsync(ignore ->
map0(multiStepPlan), taskExecutor);
+ }
+
+ private CompletableFuture<List<MappedFragment>> map0(MultiStepPlan
multiStepPlan) {
List<Fragment> fragments = multiStepPlan.fragments();
- List<String> nodes = List.copyOf(nodesSetRef.get());
+ List<String> nodes = topologyHolder.nodes();
MappingContext context = new MappingContext(localNodeName, nodes);
@@ -200,23 +219,17 @@ public class MappingServiceImpl implements
MappingService, LogicalTopologyEventL
@Override
public void onNodeJoined(LogicalNode joinedNode, LogicalTopologySnapshot
newTopology) {
- nodesSetRef.set(deriveNodeNames(newTopology));
+ topologyHolder.update(newTopology);
}
@Override
public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot
newTopology) {
- nodesSetRef.set(deriveNodeNames(newTopology));
+ topologyHolder.update(newTopology);
}
@Override
public void onTopologyLeap(LogicalTopologySnapshot newTopology) {
- nodesSetRef.set(deriveNodeNames(newTopology));
- }
-
- private static Set<String> deriveNodeNames(LogicalTopologySnapshot
topology) {
- return topology.nodes().stream()
- .map(LogicalNode::name)
- .collect(Collectors.toUnmodifiableSet());
+ topologyHolder.update(newTopology);
}
private static List<Fragment> replace(
@@ -258,4 +271,37 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
return newFragments;
}
+
+ /**
+ * Holder for topology snapshots that guarantees monotonically increasing
versions.
+ */
+ class LogicalTopologyHolder {
+ private volatile List<String> nodes = List.of();
+ private long ver = Long.MIN_VALUE;
+
+ void update(LogicalTopologySnapshot topologySnapshot) {
+ synchronized (this) {
+ if (ver < topologySnapshot.version()) {
+ nodes = deriveNodeNames(topologySnapshot);
+ ver = topologySnapshot.version();
+ }
+
+ if (initialTopologyFuture.isDone() ||
!nodes.contains(localNodeName)) {
+ return;
+ }
+ }
+
+ initialTopologyFuture.complete(null);
+ }
+
+ List<String> nodes() {
+ return nodes;
+ }
+
+ private List<String> deriveNodeNames(LogicalTopologySnapshot topology)
{
+ return topology.nodes().stream()
+ .map(LogicalNode::name)
+ .collect(Collectors.toUnmodifiableList());
+ }
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
index 5765a50b97..7cca94a716 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/MultiStepPlan.java
@@ -34,7 +34,7 @@ public class MultiStepPlan implements QueryPlan {
protected final List<Fragment> fragments;
/** Constructor. */
- MultiStepPlan(SqlQueryType type, List<Fragment> fragments,
ResultSetMetadata meta) {
+ public MultiStepPlan(SqlQueryType type, List<Fragment> fragments,
ResultSetMetadata meta) {
this.type = type;
this.fragments = fragments;
this.meta = meta;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
index 741f6d8578..9860a7d4bb 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImplTest.java
@@ -755,7 +755,7 @@ public class ExecutionServiceImplTest extends
BaseIgniteAbstractTest {
}
};
- var mappingService = new MappingServiceImpl(nodeName, targetProvider);
+ var mappingService = new MappingServiceImpl(nodeName, targetProvider,
taskExecutor);
List<LogicalNode> logicalNodes = nodeNames.stream()
.map(name -> new LogicalNode(name, name,
NetworkAddress.from("127.0.0.1:10000")))
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
new file mode 100644
index 0000000000..eb002020e2
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.mapping;
+
+import static org.apache.ignite.internal.sql.engine.SqlQueryType.QUERY;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.sql.api.ResultSetMetadataImpl;
+import org.apache.ignite.internal.sql.engine.prepare.MultiStepPlan;
+import org.apache.ignite.internal.sql.engine.schema.IgniteSystemView;
+import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.network.NetworkAddress;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+/**
+ * Test class to verify {@link MappingServiceImpl}.
+ */
+public class MappingServiceImplTest extends BaseIgniteAbstractTest {
+ @Test
+ public void serviceInitializationTest() {
+ String localNodeName = "NODE0";
+
+ MappingServiceImpl mappingService =
createMappingService(localNodeName, List.of(localNodeName));
+ mappingService.onNodeJoined(Mockito.mock(LogicalNode.class), new
LogicalTopologySnapshot(1, logicalNodes(localNodeName)));
+
+ CompletableFuture<List<MappedFragment>> mappingFuture = mappingService
+ .map(new MultiStepPlan(QUERY, List.of(), new
ResultSetMetadataImpl(List.of())));
+
+ assertThat(mappingFuture, willSucceedFast());
+ }
+
+ @Test
+ public void lateServiceInitializationOnTopologyLeap() {
+ String localNodeName = "NODE";
+ List<String> nodeNames = List.of("NODE1");
+
+ MappingServiceImpl mappingService =
createMappingService(localNodeName, nodeNames);
+
+ CompletableFuture<List<MappedFragment>> mappingFuture = mappingService
+ .map(new MultiStepPlan(QUERY, List.of(), new
ResultSetMetadataImpl(List.of())));
+
+ // Mapping should wait for service initialization.
+ assertFalse(mappingFuture.isDone());
+
+ // Join another node affect nothing.
+ mappingService.onTopologyLeap(new LogicalTopologySnapshot(1,
logicalNodes("NODE1", "NODE2")));
+ assertThat(mappingFuture, willThrowFast(TimeoutException.class));
+
+ // Joining local node completes initialization.
+ mappingService.onTopologyLeap(new LogicalTopologySnapshot(2,
logicalNodes("NODE", "NODE1", "NODE2")));
+
+ assertThat(mappingFuture, willSucceedFast());
+ assertThat(mappingService.map(new MultiStepPlan(QUERY, List.of(), new
ResultSetMetadataImpl(List.of()))), willSucceedFast());
+ }
+
+ @Test
+ public void lateServiceInitializationOnNodeJoin() {
+ String localNodeName = "NODE";
+ List<String> nodeNames = List.of("NODE1");
+
+ MappingServiceImpl mappingService =
createMappingService(localNodeName, nodeNames);
+
+ CompletableFuture<List<MappedFragment>> mappingFuture = mappingService
+ .map(new MultiStepPlan(QUERY, List.of(), new
ResultSetMetadataImpl(List.of())));
+
+ // Mapping should wait for service initialization.
+ assertFalse(mappingFuture.isDone());
+
+ // Join another node affect nothing.
+ mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
+ new LogicalTopologySnapshot(1, logicalNodes("NODE1",
"NODE2")));
+
+ assertThat(mappingFuture, willThrowFast(TimeoutException.class));
+
+ // Joining local node completes initialization.
+ mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
+ new LogicalTopologySnapshot(2, logicalNodes("NODE", "NODE1",
"NODE2")));
+
+ assertThat(mappingFuture, willSucceedFast());
+ assertThat(mappingService.map(new MultiStepPlan(QUERY, List.of(), new
ResultSetMetadataImpl(List.of()))), willSucceedFast());
+ }
+
+ private static List<LogicalNode> logicalNodes(String... nodeNames) {
+ return Arrays.stream(nodeNames)
+ .map(name -> new LogicalNode(name, name,
NetworkAddress.from("127.0.0.1:10000")))
+ .collect(Collectors.toList());
+ }
+
+ private static MappingServiceImpl createMappingService(String
localNodeName, List<String> nodeNames) {
+ var targetProvider = new ExecutionTargetProvider() {
+ @Override
+ public CompletableFuture<ExecutionTarget>
forTable(ExecutionTargetFactory factory, IgniteTable table) {
+ return
CompletableFuture.completedFuture(factory.allOf(nodeNames));
+ }
+
+ @Override
+ public CompletableFuture<ExecutionTarget>
forSystemView(ExecutionTargetFactory factory, IgniteSystemView view) {
+ return CompletableFuture.failedFuture(new AssertionError("Not
supported"));
+ }
+ };
+
+ return new MappingServiceImpl(localNodeName, targetProvider,
Runnable::run);
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
index 03c8e032cc..33652960f6 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/framework/TestBuilders.java
@@ -388,7 +388,7 @@ public class TestBuilders {
Map<String, TestNode> nodes = nodeNames.stream()
.map(name -> {
- var mappingService = new MappingServiceImpl(name,
targetProvider);
+ var mappingService = new MappingServiceImpl(name,
targetProvider, Runnable::run);
mappingService.onTopologyLeap(new
LogicalTopologySnapshot(1L, logicalNodes));