This is an automated email from the ASF dual-hosted git repository. ishan pushed a commit to branch jira/SOLR15694 in repository https://gitbox.apache.org/repos/asf/solr.git
commit c69a564c7329524589fe35a43ede31ec292c18e6 Author: Noble Paul <[email protected]> AuthorDate: Wed Nov 10 00:10:09 2021 +1100 Node role impl --- .../java/org/apache/solr/cloud/ZkController.java | 38 +++++-- .../impl/SimpleClusterAbstractionsImpl.java | 27 ++++- .../java/org/apache/solr/core/CoreContainer.java | 2 + .../src/java/org/apache/solr/core/NodeRole.java | 122 +++++++++++++++++++++ .../test/org/apache/solr/core/TestNodeRole.java | 53 +++++++++ 5 files changed, 234 insertions(+), 8 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 75a4e83..8dee09e 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -46,6 +46,9 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.Supplier; import com.google.common.base.Strings; @@ -66,6 +69,7 @@ import org.apache.solr.common.cloud.*; import org.apache.solr.common.cloud.Replica.Type; import org.apache.solr.common.params.CollectionParams; import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.CoreAdminParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.IOUtils; @@ -74,12 +78,7 @@ import org.apache.solr.common.util.SolrNamedThreadFactory; import org.apache.solr.common.util.StrUtils; import org.apache.solr.common.util.URLUtil; import org.apache.solr.common.util.Utils; -import org.apache.solr.core.CloseHook; -import org.apache.solr.core.CloudConfig; -import org.apache.solr.core.CoreContainer; -import org.apache.solr.core.CoreDescriptor; -import org.apache.solr.core.SolrCore; -import org.apache.solr.core.SolrCoreInitializationException; +import org.apache.solr.core.*; import org.apache.solr.handler.component.HttpShardHandler; import org.apache.solr.logging.MDCLoggingContext; import org.apache.solr.search.SolrIndexSearcher; @@ -105,6 +104,8 @@ import static org.apache.solr.common.cloud.ZkStateReader.ELECTION_NODE_PROP; import static org.apache.solr.common.cloud.ZkStateReader.NODE_NAME_PROP; import static org.apache.solr.common.cloud.ZkStateReader.REJOIN_AT_HEAD_PROP; import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP; +import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDROLE; +import static org.apache.solr.common.params.CoreAdminParams.ACTION; import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE; /** @@ -465,6 +466,7 @@ public class ZkController implements Closeable { }); init(); + persistRoles(); if (distributedClusterStateUpdater.isDistributedStateUpdate()) { this.overseerJobQueue = null; @@ -479,6 +481,28 @@ public class ZkController implements Closeable { assert ObjectReleaseTracker.track(this); } + @SuppressWarnings("unchecked") + private void persistRoles() throws InterruptedException { + try { + zkClient.atomicUpdate(ZkStateReader.ROLES, + bytes -> { + Map<String, Object> map = bytes == null ? null : + (Map<String, Object>) Utils.fromJSON(bytes); + map = cc.nodeRole.modifyRoleData(map, + getNodeName()); + return map == null ? null : Utils.toJSON(map); + }); + if (cc.nodeRole.role() == NodeRole.Type.overseer) { + getOverseerCollectionQueue() + .offer(Utils.toJSON(Utils.makeMap(ACTION, ADDROLE.name(), + "role", NodeRole.Type.overseer, + "node", getNodeName()))); + } + } catch (KeeperException e) { + throw new SolrException(ErrorCode.SERVER_ERROR, "can't update roles"); + } + } + /** * <p>Verifies if /clusterstate.json exists in Zookeepeer, and if it does and is not empty, refuses to start and outputs * a helpful message regarding collection migration.</p> @@ -2198,7 +2222,7 @@ public class ZkController implements Closeable { List<?> nodeList = (List<?>) roles.get("overseer"); if (nodeList == null) return; if (nodeList.contains(getNodeName())) { - ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.ADDROLE.toString().toLowerCase(Locale.ROOT), + ZkNodeProps props = new ZkNodeProps(Overseer.QUEUE_OPERATION, ADDROLE.toString().toLowerCase(Locale.ROOT), "node", getNodeName(), "role", "overseer"); log.info("Going to add role {} ", props); diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsImpl.java index 48f3f50..1d5bec6 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsImpl.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/SimpleClusterAbstractionsImpl.java @@ -23,12 +23,18 @@ import java.util.stream.Collectors; import com.google.common.collect.Maps; import org.apache.solr.client.solrj.cloud.SolrCloudManager; +import org.apache.solr.client.solrj.cloud.VersionedData; import org.apache.solr.cluster.*; +import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CollectionAdminParams; import org.apache.solr.common.util.Pair; +import org.apache.solr.common.util.Utils; +import org.apache.solr.core.NodeRole; +import org.apache.zookeeper.KeeperException; import javax.annotation.Nonnull; @@ -56,7 +62,26 @@ class SimpleClusterAbstractionsImpl { private final ClusterState clusterState; ClusterImpl(SolrCloudManager solrCloudManager) throws IOException { - liveNodes = NodeImpl.getNodes(solrCloudManager.getClusterStateProvider().getLiveNodes()); + Set<String> liveNodes = solrCloudManager.getClusterStateProvider().getLiveNodes(); + try { + if (solrCloudManager.getDistribStateManager().hasData(ZkStateReader.ROLES)) { + VersionedData rolesData = solrCloudManager.getDistribStateManager().getData(ZkStateReader.ROLES); + if (rolesData != null && rolesData.getData().length > 0) { + Map<String, Object> roles = (Map<String, Object>) Utils.fromJSON(rolesData.getData()); + List<String> noReplicasNodes = (List<String>) roles.get(NodeRole.NO_REPLICAS); + if (noReplicasNodes != null && !noReplicasNodes.isEmpty()) { + liveNodes = new HashSet<>(liveNodes); + liveNodes.removeAll(noReplicasNodes); + } + } + } + + } catch (NoSuchElementException nsee) { + //no problem + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to communicate with Zookeeper"); + } + this.liveNodes = NodeImpl.getNodes(liveNodes); clusterState = solrCloudManager.getClusterStateProvider().getClusterState(); } diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index d4d2030..f2c2b78 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -246,6 +246,8 @@ public class CoreContainer { private final ObjectCache objectCache = new ObjectCache(); + public final NodeRole nodeRole = new NodeRole(System.getProperty("node.roles")); + private final ClusterSingletons clusterSingletons = new ClusterSingletons( () -> getZkController() != null && getZkController().getOverseer() != null && diff --git a/solr/core/src/java/org/apache/solr/core/NodeRole.java b/solr/core/src/java/org/apache/solr/core/NodeRole.java new file mode 100644 index 0000000..1de7832 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/core/NodeRole.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.core; + +import com.google.common.collect.ImmutableSet; +import java.util.*; +import org.apache.solr.common.StringUtils; +import org.apache.solr.common.util.StrUtils; + +public class NodeRole { + private boolean hasData; + private Type role; + + public NodeRole(String role) { + if (StringUtils.isEmpty(role)) { + hasData = true; + this.role = Type.data; + return; + } + Set<String> roles = new HashSet<>(StrUtils.split(role, ',')); + if (roles.isEmpty()) { + hasData = true; + return; + } + for (String s : roles) { + if (Type.data.name().equals(s)) { + hasData = true; + continue; + } + if (Type.overseer.name().equals(s)) { + this.role = Type.overseer; + } else { + throw new RuntimeException("Unknown type"); + } + } + + } + + public boolean hasData() { + return hasData; + } + + public Type role() { + return role; + } + + @SuppressWarnings("unchecked") + public Map<String, Object> modifyRoleData(Map<String, Object> rolesData, String nodeName) { + if (this.role == Type.data) { + //this is a normal data node, nothing to do + if (rolesData == null) return null;// no role info, return + if (removeStaleRoles(nodeName, rolesData, Collections.emptySet())) { + return rolesData; + } else { + return null; + } + } else { + if (rolesData == null) rolesData = new HashMap<>(); + List<String> nodes = (List<String>) rolesData.computeIfAbsent(role.name(), s -> new ArrayList<String>()); + boolean isModified = false; + if (!nodes.contains(nodeName)) { + nodes.add(nodeName); + isModified = true; + } + List<String> nonReplicaNodes = (List<String>) rolesData.computeIfAbsent(NO_REPLICAS, s -> new ArrayList<String>()); + if (hasData) { + if(nonReplicaNodes.contains(nodeName)) { + nonReplicaNodes.remove(nodeName); + isModified = true; + } + } else { + if(!nonReplicaNodes.contains(nodeName)){ + nonReplicaNodes.add(nodeName); + isModified = true; + } + } + if (isModified || removeStaleRoles(nodeName, rolesData, ImmutableSet.of(role.name(), NO_REPLICAS))) { + return rolesData; + } else { + return null; + } + } + + } + + @SuppressWarnings("unchecked") + private boolean removeStaleRoles(String nodeName, Map<String, Object> rolesData, Set<String> ignore) { + boolean[] isModified = new boolean[]{false}; + rolesData.forEach((s, o) -> { + if (ignore.contains(s)) return; + if (o instanceof List) { + List<String> list = (List<String>) o; + if (list.contains(nodeName)) { + isModified[0] = true; + list.remove(nodeName); + } + } + }); + return isModified[0]; + } + + public enum Type { + data, overseer; + } + + public static final String NO_REPLICAS = "no-replicas"; + +} diff --git a/solr/core/src/test/org/apache/solr/core/TestNodeRole.java b/solr/core/src/test/org/apache/solr/core/TestNodeRole.java new file mode 100644 index 0000000..d5bdba5 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/core/TestNodeRole.java @@ -0,0 +1,53 @@ +package org.apache.solr.core; + +import java.lang.invoke.MethodHandles; +import java.util.Collection; +import java.util.Map; +import org.apache.solr.SolrTestCase; +import org.apache.solr.common.util.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestNodeRole extends SolrTestCase { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + + @SuppressWarnings("rawtypes") + public void testZkData() { + //empty + Map<String,Object> rolesData = null; + //start a node with overseer role + rolesData = new NodeRole("overseer").modifyRoleData(rolesData, "node1"); + assertEquals("node1", _val(rolesData, "overseer[0]")); + assertEquals("node1", _val(rolesData, "no-replicas[0]")); + //now start another node with overseer + rolesData = new NodeRole("overseer").modifyRoleData(rolesData, "node2"); + assertEquals("node2", _val(rolesData, "overseer[1]")); + assertEquals("node2", _val(rolesData, "no-replicas[1]")); + //now start another node with overseer,data + rolesData = new NodeRole("overseer,data").modifyRoleData(rolesData, "node3"); + + assertEquals("node3", _val(rolesData, "overseer[2]")); + assertEquals(2, ((Collection) _val(rolesData, "no-replicas")).size() ); + //now restart node2 with no role + + rolesData = new NodeRole(null).modifyRoleData(rolesData, "node2"); + assertEquals("node3", _val(rolesData, "overseer[1]")); + assertEquals(1, ((Collection) _val(rolesData, "no-replicas")).size() ); + //now restart node1 with the original values + assertNull(new NodeRole("overseer").modifyRoleData(rolesData, "node1")); + + assertEquals("node1", _val(rolesData, "overseer[0]")); + assertEquals("node1", _val(rolesData, "no-replicas[0]")); + //now restart node1 with overseer,data + rolesData = new NodeRole("overseer,data").modifyRoleData(rolesData, "node1"); + assertEquals("node1", _val(rolesData, "overseer[0]")); + assertEquals(0, ((Collection) _val(rolesData, "no-replicas")).size() ); + + log.info("rolesdata : {}", Utils.toJSONString(rolesData)); + } + + private Object _val(Map<String, Object> rolesData, String path) { + return Utils.getObjectByPath(rolesData, false, path); + } +}
