This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 15a4383367 IGNITE-22083 Sql. Invalidate fragment mapping cache when
the mapped node has left the cluster (#3637)
15a4383367 is described below
commit 15a4383367bd832729f19e447889f6ce3491a695
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Mon Apr 22 15:57:38 2024 +0300
IGNITE-22083 Sql. Invalidate fragment mapping cache when the mapped node
has left the cluster (#3637)
---
.../engine/exec/mapping/MappingServiceImpl.java | 53 ++++++++++++++++------
.../exec/mapping/MappingServiceImplTest.java | 28 ++++++++++--
2 files changed, 63 insertions(+), 18 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index 13bf05cb35..7c28ac249e 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -30,7 +30,9 @@ import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
@@ -152,10 +154,10 @@ public class MappingServiceImpl implements
MappingService, LogicalTopologyEventL
return val;
});
- return cacheValue.mappedFragments.thenApply(mappedFragments ->
applyPartitionPruning(mappedFragments, parameters));
+ return cacheValue.mappedFragments.thenApply(mappedFragments ->
applyPartitionPruning(mappedFragments.fragments, parameters));
}
- private CompletableFuture<List<MappedFragment>>
mapFragments(MappingContext context, FragmentsTemplate template) {
+ private CompletableFuture<MappedFragments> mapFragments(MappingContext
context, FragmentsTemplate template) {
IdGenerator idGenerator = new IdGenerator(template.nextId);
List<Fragment> fragments = new ArrayList<>(template.fragments);
@@ -242,7 +244,8 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
throw new IgniteInternalException(Sql.MAPPING_ERR,
"Unable to map query: " + ex.getMessage(), ex);
}
- List<MappedFragment> result = new
ArrayList<>(fragmentsToMap.size());
+ List<MappedFragment> mappedFragmentsList = new
ArrayList<>(fragmentsToMap.size());
+ Set<String> targetNodes = new HashSet<>();
for (Fragment fragment : fragmentsToMap) {
FragmentMapping mapping =
mappingByFragmentId.get(fragment.fragmentId());
@@ -264,18 +267,20 @@ public class MappingServiceImpl implements
MappingService, LogicalTopologyEventL
sourcesByExchangeId.put(exchangeId,
allSourcesByExchangeId.get(exchangeId));
}
- result.add(
- new MappedFragment(
- fragment,
- mapping.groups(),
- sourcesByExchangeId,
- targetGroup,
- null
- )
+ MappedFragment mappedFragment = new MappedFragment(
+ fragment,
+ mapping.groups(),
+ sourcesByExchangeId,
+ targetGroup,
+ null
);
+
+ mappedFragmentsList.add(mappedFragment);
+
+ targetNodes.addAll(mappedFragment.nodes());
}
- return result;
+ return new MappedFragments(mappedFragmentsList,
targetNodes);
});
}
@@ -287,6 +292,15 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
@Override
public void onNodeLeft(LogicalNode leftNode, LogicalTopologySnapshot
newTopology) {
topologyHolder.update(newTopology);
+
+ mappingsCache.removeIfValue(value -> {
+ if (value.mappedFragments.isDone()) {
+ return
value.mappedFragments.join().nodes.contains(leftNode.name());
+ }
+
+ // Invalidate non-completed mappings to reduce the chance of
getting stale value
+ return true;
+ });
}
@Override
@@ -412,12 +426,23 @@ public class MappingServiceImpl implements
MappingService, LogicalTopologyEventL
}
}
+ /** Wraps list of mapped fragments with target node names. */
+ private static class MappedFragments {
+ final List<MappedFragment> fragments;
+ final Set<String> nodes;
+
+ MappedFragments(List<MappedFragment> fragments, Set<String> nodes) {
+ this.fragments = fragments;
+ this.nodes = nodes;
+ }
+ }
+
private static class MappingsCacheValue {
private final long topVer;
private final IntSet tableIds;
- private final CompletableFuture<List<MappedFragment>> mappedFragments;
+ private final CompletableFuture<MappedFragments> mappedFragments;
- MappingsCacheValue(long topVer, IntSet tableIds,
CompletableFuture<List<MappedFragment>> mappedFragments) {
+ MappingsCacheValue(long topVer, IntSet tableIds,
CompletableFuture<MappedFragments> mappedFragments) {
this.topVer = topVer;
this.tableIds = tableIds;
this.mappedFragments = mappedFragments;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
index fa89d462f3..4979f14057 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImplTest.java
@@ -194,15 +194,35 @@ public class MappingServiceImplTest extends
BaseIgniteAbstractTest {
assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS)));
assertSame(sysViewMapping,
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
- mappingService.onNodeLeft(Mockito.mock(LogicalNode.class),
- new LogicalTopologySnapshot(2, logicalNodes("NODE")));
- // Plan with tables only must not be invalidated.
+ LogicalNode newNode = Mockito.mock(LogicalNode.class);
+ Mockito.when(newNode.name()).thenReturn("NODE2");
+
+ mappingService.onNodeJoined(Mockito.mock(LogicalNode.class),
+ new LogicalTopologySnapshot(3, logicalNodes("NODE", "NODE1",
"NODE2")));
+
+ // Plan with tables only must not be invalidated on node join.
assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS)));
verifyNoMoreInteractions(targetProvider);
// Plan with system views must be invalidated.
assertNotSame(sysViewMapping,
await(mappingService.map(PLAN_WITH_SYSTEM_VIEW, PARAMS)));
- verify(targetProvider, times(3)).forTable(any(), any());
+
+ mappingService.onNodeLeft(newNode,
+ new LogicalTopologySnapshot(3, logicalNodes("NODE", "NODE1")));
+
+ // Plan with tables that don't include a left node should not be
invalidated.
+ assertSame(tableOnlyMapping, await(mappingService.map(PLAN, PARAMS)));
+
+ LogicalNode node1 = Mockito.mock(LogicalNode.class);
+ Mockito.when(node1.name()).thenReturn("NODE1");
+
+ mappingService.onNodeLeft(node1,
+ new LogicalTopologySnapshot(3, logicalNodes("NODE")));
+
+ // Plan with tables that include left node must be invalidated.
+ assertNotSame(tableOnlyMapping, await(mappingService.map(PLAN,
PARAMS)));
+
+ verify(targetProvider, times(4)).forTable(any(), any());
verify(targetProvider, times(2)).forSystemView(any(), any());
}