madrob commented on code in PR #996: URL: https://github.com/apache/solr/pull/996#discussion_r963947450
########## solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java: ########## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.servlet; + +import java.lang.invoke.MethodHandles; +import java.security.Principal; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.solr.api.CoordinatorV2HttpSolrCall; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.cloud.CloudDescriptor; +import org.apache.solr.cloud.api.collections.Assign; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.ContentStream; +import org.apache.solr.common.util.Utils; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.CoreDescriptor; +import org.apache.solr.core.SolrCore; +import org.apache.solr.request.LocalSolrQueryRequest; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.schema.IndexSchema; +import org.apache.solr.search.SolrIndexSearcher; +import org.apache.solr.util.RTimerTree; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CoordinatorHttpSolrCall extends HttpSolrCall { + public static final String SYNTHETIC_COLL_PREFIX = + Assign.SYSTEM_COLL_PREFIX + "COORDINATOR-COLL-"; + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private String collectionName; + private final Factory factory; + + public CoordinatorHttpSolrCall( + Factory factory, + SolrDispatchFilter solrDispatchFilter, + CoreContainer cores, + HttpServletRequest request, + HttpServletResponse response, + boolean retry) { + super(solrDispatchFilter, cores, request, response, retry); + this.factory = factory; + } + + @Override + protected SolrCore getCoreByCollection(String collectionName, boolean isPreferLeader) { + this.collectionName = collectionName; + SolrCore core = super.getCoreByCollection(collectionName, isPreferLeader); + if (core != null) return core; + if (!path.endsWith("/select")) return null; + return getCore(factory, this, collectionName, isPreferLeader); + } + + public static SolrCore getCore( + Factory factory, HttpSolrCall solrCall, String collectionName, boolean isPreferLeader) { + String sytheticCoreName = factory.collectionVsCoreNameMapping.get(collectionName); + if (sytheticCoreName != null) { + return solrCall.cores.getCore(sytheticCoreName); + } else { + ZkStateReader zkStateReader = solrCall.cores.getZkController().getZkStateReader(); + ClusterState clusterState = zkStateReader.getClusterState(); + DocCollection coll = clusterState.getCollectionOrNull(collectionName, true); + if (coll != null) { + String confName = coll.getConfigName(); + String syntheticCollectionName = SYNTHETIC_COLL_PREFIX + confName; + + DocCollection syntheticColl = clusterState.getCollectionOrNull(syntheticCollectionName); + if (syntheticColl == null) { + // no such collection. let's create one + if (log.isInfoEnabled()) { + log.info( + "synthetic collection: {} does not exist, creating.. ", syntheticCollectionName); + } + createColl(syntheticCollectionName, solrCall.cores, confName); + } + SolrCore core = solrCall.getCoreByCollection(syntheticCollectionName, isPreferLeader); + if (core != null) { + factory.collectionVsCoreNameMapping.put(collectionName, core.getName()); + solrCall.cores.getZkController().getZkStateReader().registerCore(collectionName); + if (log.isDebugEnabled()) { + log.debug("coordinator node, returns synthetic core: {}", core.getName()); + } + } else { + // this node does not have a replica. add one + if (log.isInfoEnabled()) { + log.info( + "this node does not have a replica of the synthetic collection: {} , adding replica ", + syntheticCollectionName); + } + + addReplica(syntheticCollectionName, solrCall.cores); + core = solrCall.getCoreByCollection(syntheticCollectionName, isPreferLeader); + } + return core; + } + return null; + } + } + + private static void addReplica(String syntheticCollectionName, CoreContainer cores) { + SolrQueryResponse rsp = new SolrQueryResponse(); + try { + cores + .getCollectionsHandler() + .handleRequestBody( + new LocalSolrQueryRequest( + null, + CollectionAdminRequest.addReplicaToShard(syntheticCollectionName, "shard1") + .setCreateNodeSet(cores.getZkController().getNodeName()) + .getParams()), + rsp); + if (rsp.getValues().get("success") == null) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Could not auto-create collection: " + Utils.toJSONString(rsp.getValues())); + } + } catch (SolrException e) { + throw e; + + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } + + private static void createColl( + String syntheticCollectionName, CoreContainer cores, String confName) { + SolrQueryResponse rsp = new SolrQueryResponse(); + try { + SolrParams params = + CollectionAdminRequest.createCollection(syntheticCollectionName, confName, 1, 1) + .setCreateNodeSet(cores.getZkController().getNodeName()) + .getParams(); + if (log.isInfoEnabled()) { + log.info("sending collection admin command : {}", Utils.toJSONString(params)); + } + cores.getCollectionsHandler().handleRequestBody(new LocalSolrQueryRequest(null, params), rsp); + if (rsp.getValues().get("success") == null) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Could not create :" + + syntheticCollectionName + + " collection: " + + Utils.toJSONString(rsp.getValues())); + } + } catch (SolrException e) { + throw e; + + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } + + @Override + protected void init() throws Exception { + super.init(); + if (action == SolrDispatchFilter.Action.PROCESS && core != null) { + solrReq = wrappedReq(solrReq, collectionName, this); + } + } + + public static SolrQueryRequest wrappedReq( + SolrQueryRequest delegate, String collectionName, HttpSolrCall httpSolrCall) { + Properties p = new Properties(); + p.put(CoreDescriptor.CORE_COLLECTION, collectionName); + p.put(CloudDescriptor.REPLICA_TYPE, Replica.Type.PULL.toString()); + p.put(CoreDescriptor.CORE_SHARD, "_"); + + CloudDescriptor cloudDescriptor = + new CloudDescriptor( + delegate.getCore().getCoreDescriptor(), delegate.getCore().getName(), p); + return new SolrQueryRequest() { + @Override + public SolrParams getParams() { + return delegate.getParams(); + } + + @Override + public void setParams(SolrParams params) { + + delegate.setParams(params); + } + + @Override + public Iterable<ContentStream> getContentStreams() { + return delegate.getContentStreams(); + } + + @Override + public SolrParams getOriginalParams() { + return delegate.getOriginalParams(); + } + + @Override + public Map<Object, Object> getContext() { + return delegate.getContext(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public long getStartTime() { + return delegate.getStartTime(); + } + + @Override + public RTimerTree getRequestTimer() { + return delegate.getRequestTimer(); + } + + @Override + public SolrIndexSearcher getSearcher() { + return delegate.getSearcher(); + } + + @Override + public SolrCore getCore() { + return delegate.getCore(); + } + + @Override + public IndexSchema getSchema() { + return delegate.getSchema(); + } + + @Override + public void updateSchemaToLatest() { + delegate.updateSchemaToLatest(); + } + + @Override + public String getParamString() { + return delegate.getParamString(); + } + + @Override + public Map<String, Object> getJSON() { + return delegate.getJSON(); + } + + @Override + public void setJSON(Map<String, Object> json) { + delegate.setJSON(json); + } + + @Override + public Principal getUserPrincipal() { + return delegate.getUserPrincipal(); + } + + @Override + public HttpSolrCall getHttpSolrCall() { + return httpSolrCall; + } + + @Override + public CloudDescriptor getCloudDescriptor() { + return cloudDescriptor; + } + }; + } + + public static class Factory implements SolrDispatchFilter.HttpSolrCallFactory { Review Comment: This needs docs ########## solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java: ########## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.servlet; + +import java.lang.invoke.MethodHandles; +import java.security.Principal; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.solr.api.CoordinatorV2HttpSolrCall; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.cloud.CloudDescriptor; +import org.apache.solr.cloud.api.collections.Assign; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.ContentStream; +import org.apache.solr.common.util.Utils; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.CoreDescriptor; +import org.apache.solr.core.SolrCore; +import org.apache.solr.request.LocalSolrQueryRequest; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.schema.IndexSchema; +import org.apache.solr.search.SolrIndexSearcher; +import org.apache.solr.util.RTimerTree; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CoordinatorHttpSolrCall extends HttpSolrCall { + public static final String SYNTHETIC_COLL_PREFIX = + Assign.SYSTEM_COLL_PREFIX + "COORDINATOR-COLL-"; + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private String collectionName; + private final Factory factory; + + public CoordinatorHttpSolrCall( + Factory factory, + SolrDispatchFilter solrDispatchFilter, + CoreContainer cores, + HttpServletRequest request, + HttpServletResponse response, + boolean retry) { + super(solrDispatchFilter, cores, request, response, retry); + this.factory = factory; + } + + @Override + protected SolrCore getCoreByCollection(String collectionName, boolean isPreferLeader) { + this.collectionName = collectionName; + SolrCore core = super.getCoreByCollection(collectionName, isPreferLeader); + if (core != null) return core; + if (!path.endsWith("/select")) return null; + return getCore(factory, this, collectionName, isPreferLeader); + } + + public static SolrCore getCore( + Factory factory, HttpSolrCall solrCall, String collectionName, boolean isPreferLeader) { + String sytheticCoreName = factory.collectionVsCoreNameMapping.get(collectionName); + if (sytheticCoreName != null) { + return solrCall.cores.getCore(sytheticCoreName); + } else { + ZkStateReader zkStateReader = solrCall.cores.getZkController().getZkStateReader(); + ClusterState clusterState = zkStateReader.getClusterState(); + DocCollection coll = clusterState.getCollectionOrNull(collectionName, true); + if (coll != null) { + String confName = coll.getConfigName(); + String syntheticCollectionName = SYNTHETIC_COLL_PREFIX + confName; + + DocCollection syntheticColl = clusterState.getCollectionOrNull(syntheticCollectionName); + if (syntheticColl == null) { + // no such collection. let's create one + if (log.isInfoEnabled()) { + log.info( + "synthetic collection: {} does not exist, creating.. ", syntheticCollectionName); + } + createColl(syntheticCollectionName, solrCall.cores, confName); + } + SolrCore core = solrCall.getCoreByCollection(syntheticCollectionName, isPreferLeader); + if (core != null) { + factory.collectionVsCoreNameMapping.put(collectionName, core.getName()); + solrCall.cores.getZkController().getZkStateReader().registerCore(collectionName); + if (log.isDebugEnabled()) { + log.debug("coordinator node, returns synthetic core: {}", core.getName()); + } + } else { + // this node does not have a replica. add one + if (log.isInfoEnabled()) { + log.info( + "this node does not have a replica of the synthetic collection: {} , adding replica ", + syntheticCollectionName); + } + + addReplica(syntheticCollectionName, solrCall.cores); + core = solrCall.getCoreByCollection(syntheticCollectionName, isPreferLeader); + } + return core; + } + return null; + } + } + + private static void addReplica(String syntheticCollectionName, CoreContainer cores) { + SolrQueryResponse rsp = new SolrQueryResponse(); + try { + cores + .getCollectionsHandler() + .handleRequestBody( + new LocalSolrQueryRequest( + null, + CollectionAdminRequest.addReplicaToShard(syntheticCollectionName, "shard1") + .setCreateNodeSet(cores.getZkController().getNodeName()) + .getParams()), + rsp); + if (rsp.getValues().get("success") == null) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Could not auto-create collection: " + Utils.toJSONString(rsp.getValues())); + } + } catch (SolrException e) { + throw e; + + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } + + private static void createColl( + String syntheticCollectionName, CoreContainer cores, String confName) { + SolrQueryResponse rsp = new SolrQueryResponse(); + try { + SolrParams params = + CollectionAdminRequest.createCollection(syntheticCollectionName, confName, 1, 1) + .setCreateNodeSet(cores.getZkController().getNodeName()) + .getParams(); + if (log.isInfoEnabled()) { + log.info("sending collection admin command : {}", Utils.toJSONString(params)); + } + cores.getCollectionsHandler().handleRequestBody(new LocalSolrQueryRequest(null, params), rsp); + if (rsp.getValues().get("success") == null) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Could not create :" + + syntheticCollectionName + + " collection: " + + Utils.toJSONString(rsp.getValues())); + } + } catch (SolrException e) { + throw e; + + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } + + @Override + protected void init() throws Exception { + super.init(); + if (action == SolrDispatchFilter.Action.PROCESS && core != null) { + solrReq = wrappedReq(solrReq, collectionName, this); + } + } + + public static SolrQueryRequest wrappedReq( + SolrQueryRequest delegate, String collectionName, HttpSolrCall httpSolrCall) { + Properties p = new Properties(); + p.put(CoreDescriptor.CORE_COLLECTION, collectionName); + p.put(CloudDescriptor.REPLICA_TYPE, Replica.Type.PULL.toString()); + p.put(CoreDescriptor.CORE_SHARD, "_"); + + CloudDescriptor cloudDescriptor = + new CloudDescriptor( + delegate.getCore().getCoreDescriptor(), delegate.getCore().getName(), p); + return new SolrQueryRequest() { Review Comment: Please create a `DelegatingSolrRequest` or `FilterSolrRequest` and put it in its own class so that we can override only the necessary methods here and the code is easier to review/maintain. ########## solr/core/src/java/org/apache/solr/api/CoordinatorV2HttpSolrCall.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.api; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrCore; +import org.apache.solr.servlet.CoordinatorHttpSolrCall; +import org.apache.solr.servlet.SolrDispatchFilter; + +public class CoordinatorV2HttpSolrCall extends V2HttpCall { + private String collectionName; + CoordinatorHttpSolrCall.Factory factory; + + public CoordinatorV2HttpSolrCall( + CoordinatorHttpSolrCall.Factory factory, + SolrDispatchFilter solrDispatchFilter, + CoreContainer cc, + HttpServletRequest request, + HttpServletResponse response, + boolean retry) { + super(solrDispatchFilter, cc, request, response, retry); + this.factory = factory; + } + + @Override + protected SolrCore getCoreByCollection(String collectionName, boolean isPreferLeader) { + this.collectionName = collectionName; + SolrCore core = super.getCoreByCollection(collectionName, isPreferLeader); + if (core != null) return core; + if (!path.endsWith("/select")) return null; + return CoordinatorHttpSolrCall.getCore(factory, this, collectionName, isPreferLeader); + } + + @Override + protected void init() throws Exception { + super.init(); + if (action == SolrDispatchFilter.Action.PROCESS && core != null) { + solrReq = CoordinatorHttpSolrCall.wrappedReq(solrReq, collectionName, this); + } Review Comment: What does this logic do? ########## solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java: ########## @@ -394,4 +401,21 @@ private boolean shouldAudit(AuditEvent.EventType eventType) { void replaceRateLimitManager(RateLimitManager rateLimitManager) { coreService.getService().setRateLimitManager(rateLimitManager); } + + /** internal API */ + public interface HttpSolrCallFactory { + default HttpSolrCall createInstance( + SolrDispatchFilter filter, + String path, + CoreContainer cores, + HttpServletRequest request, + HttpServletResponse response, + boolean retry) { + if (filter.isV2Enabled && (path.startsWith("/____v2/") || path.equals("/____v2"))) { + return new V2HttpCall(filter, cores, request, response, false); + } else { + return new HttpSolrCall(filter, cores, request, response, retry); Review Comment: Why does V2 not retry and V1 does retry? ########## solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java: ########## @@ -1092,6 +1092,21 @@ protected NamedList<Object> sendRequest(SolrRequest<?> request, List<String> inp + inputCollections); } + List<String> preferredNodes = request.getPreferredNodes(); + if (preferredNodes != null && !preferredNodes.isEmpty()) { + String joinedInputCollections = StrUtils.join(inputCollections, ','); + List<String> urlList = new ArrayList<>(preferredNodes.size()); + for (String nodeName : preferredNodes) { + urlList.add( + Utils.getBaseUrlForNodeName(nodeName, urlScheme) + "/" + joinedInputCollections); + } + if (!urlList.isEmpty()) { + LBSolrClient.Req req = new LBSolrClient.Req(request, urlList); + LBSolrClient.Rsp rsp = getLbClient().request(req); + return rsp.getResponse(); Review Comment: I don't like this style of multiple return points from a method, can we combine this with the same duplicated code at the end of the method? ########## solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java: ########## @@ -137,7 +140,15 @@ public SolrDispatchFilter() {} public void init(FilterConfig config) throws ServletException { try { coreService = CoreContainerProvider.serviceForContext(config.getServletContext()); - + boolean isCoordinator = + NodeRoles.MODE_ON.equals( + coreService + .getService() + .getCoreContainer() + .nodeRoles + .getRoleMode(NodeRoles.Role.COORDINATOR)); + solrCallFactory = + isCoordinator ? new CoordinatorHttpSolrCall.Factory() : new HttpSolrCallFactory() {}; Review Comment: This is odd to be creating a new factory each time, since the object is stateless. Can we create a singleton of each factory and re-use those? ########## solr/solr-ref-guide/modules/deployment-guide/pages/node-roles.adoc: ########## @@ -66,15 +69,37 @@ A node with this role (in mode "on") can host shards and replicas for collection === `overseer` role A node with this role can perform duties of an overseer node (unless mode is `disallowed`). When one or more nodes have the overseer role in `preferred` mode, the overseer leader will be elected from one of these nodes. In case no node is designated as a preferred overseer or no such node is live, the overseer leader will be elected from one of the nodes that have the overseer role in `allowed` mode. If all nodes that are designated with overseer role (allowed or preferred) are down, the cluster will be left without an overseer. +=== `coordinator` role + +A node with this role can act as if it has replicas of all collections in the cluster when a query is performed. The workflow is as follows + +If the cluster has collections with very large no:of shards, performing distributed requests in your _`data node`_ will lead to + +* large heap utilization +* frequent GC pauses + +In such cases, a few dedicated nodes can be started with a *`coordinator`* role and queries can be sent to that node and avoid intermittent and unpredictable load in data nodes. The coordinator node is stateless and does not host any data. So, we can create and destroy coordinator nodes without any data loass or down time. Review Comment: Which client needs to be used to send queries to coordinator nodes? Do I need to set a routing property on my request? A shards.preference policy? How do I take advantage of this new node type? Especially if the set of coordinator nodes is dynamic, as suggested below, do I have to look up the live nodes in zookeeper? Some of these questions would be better answered in `solrcloud-distributed-requests.adoc` ########## solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java: ########## @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.servlet; + +import java.lang.invoke.MethodHandles; +import java.security.Principal; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import org.apache.solr.api.CoordinatorV2HttpSolrCall; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.cloud.CloudDescriptor; +import org.apache.solr.cloud.api.collections.Assign; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.ContentStream; +import org.apache.solr.common.util.Utils; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.CoreDescriptor; +import org.apache.solr.core.SolrCore; +import org.apache.solr.request.LocalSolrQueryRequest; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.schema.IndexSchema; +import org.apache.solr.search.SolrIndexSearcher; +import org.apache.solr.util.RTimerTree; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CoordinatorHttpSolrCall extends HttpSolrCall { + public static final String SYNTHETIC_COLL_PREFIX = + Assign.SYSTEM_COLL_PREFIX + "COORDINATOR-COLL-"; + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private String collectionName; + private final Factory factory; + + public CoordinatorHttpSolrCall( + Factory factory, + SolrDispatchFilter solrDispatchFilter, + CoreContainer cores, + HttpServletRequest request, + HttpServletResponse response, + boolean retry) { + super(solrDispatchFilter, cores, request, response, retry); + this.factory = factory; + } + + @Override + protected SolrCore getCoreByCollection(String collectionName, boolean isPreferLeader) { + this.collectionName = collectionName; + SolrCore core = super.getCoreByCollection(collectionName, isPreferLeader); + if (core != null) return core; + if (!path.endsWith("/select")) return null; + return getCore(factory, this, collectionName, isPreferLeader); + } + + public static SolrCore getCore( + Factory factory, HttpSolrCall solrCall, String collectionName, boolean isPreferLeader) { + String sytheticCoreName = factory.collectionVsCoreNameMapping.get(collectionName); + if (sytheticCoreName != null) { + return solrCall.cores.getCore(sytheticCoreName); + } else { + ZkStateReader zkStateReader = solrCall.cores.getZkController().getZkStateReader(); + ClusterState clusterState = zkStateReader.getClusterState(); + DocCollection coll = clusterState.getCollectionOrNull(collectionName, true); + if (coll != null) { + String confName = coll.getConfigName(); + String syntheticCollectionName = SYNTHETIC_COLL_PREFIX + confName; + + DocCollection syntheticColl = clusterState.getCollectionOrNull(syntheticCollectionName); + if (syntheticColl == null) { + // no such collection. let's create one + if (log.isInfoEnabled()) { + log.info( + "synthetic collection: {} does not exist, creating.. ", syntheticCollectionName); + } + createColl(syntheticCollectionName, solrCall.cores, confName); + } + SolrCore core = solrCall.getCoreByCollection(syntheticCollectionName, isPreferLeader); + if (core != null) { + factory.collectionVsCoreNameMapping.put(collectionName, core.getName()); + solrCall.cores.getZkController().getZkStateReader().registerCore(collectionName); + if (log.isDebugEnabled()) { + log.debug("coordinator node, returns synthetic core: {}", core.getName()); + } + } else { + // this node does not have a replica. add one + if (log.isInfoEnabled()) { + log.info( + "this node does not have a replica of the synthetic collection: {} , adding replica ", + syntheticCollectionName); + } + + addReplica(syntheticCollectionName, solrCall.cores); + core = solrCall.getCoreByCollection(syntheticCollectionName, isPreferLeader); + } + return core; + } + return null; + } + } + + private static void addReplica(String syntheticCollectionName, CoreContainer cores) { + SolrQueryResponse rsp = new SolrQueryResponse(); + try { + cores + .getCollectionsHandler() + .handleRequestBody( + new LocalSolrQueryRequest( + null, + CollectionAdminRequest.addReplicaToShard(syntheticCollectionName, "shard1") + .setCreateNodeSet(cores.getZkController().getNodeName()) + .getParams()), + rsp); + if (rsp.getValues().get("success") == null) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Could not auto-create collection: " + Utils.toJSONString(rsp.getValues())); + } + } catch (SolrException e) { + throw e; + + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } + + private static void createColl( + String syntheticCollectionName, CoreContainer cores, String confName) { + SolrQueryResponse rsp = new SolrQueryResponse(); + try { + SolrParams params = + CollectionAdminRequest.createCollection(syntheticCollectionName, confName, 1, 1) + .setCreateNodeSet(cores.getZkController().getNodeName()) + .getParams(); + if (log.isInfoEnabled()) { + log.info("sending collection admin command : {}", Utils.toJSONString(params)); + } + cores.getCollectionsHandler().handleRequestBody(new LocalSolrQueryRequest(null, params), rsp); + if (rsp.getValues().get("success") == null) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Could not create :" + + syntheticCollectionName + + " collection: " + + Utils.toJSONString(rsp.getValues())); + } + } catch (SolrException e) { + throw e; + + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } + + @Override + protected void init() throws Exception { + super.init(); + if (action == SolrDispatchFilter.Action.PROCESS && core != null) { + solrReq = wrappedReq(solrReq, collectionName, this); + } + } + + public static SolrQueryRequest wrappedReq( + SolrQueryRequest delegate, String collectionName, HttpSolrCall httpSolrCall) { + Properties p = new Properties(); + p.put(CoreDescriptor.CORE_COLLECTION, collectionName); + p.put(CloudDescriptor.REPLICA_TYPE, Replica.Type.PULL.toString()); + p.put(CoreDescriptor.CORE_SHARD, "_"); + + CloudDescriptor cloudDescriptor = + new CloudDescriptor( + delegate.getCore().getCoreDescriptor(), delegate.getCore().getName(), p); + return new SolrQueryRequest() { + @Override + public SolrParams getParams() { + return delegate.getParams(); + } + + @Override + public void setParams(SolrParams params) { + + delegate.setParams(params); + } + + @Override + public Iterable<ContentStream> getContentStreams() { + return delegate.getContentStreams(); + } + + @Override + public SolrParams getOriginalParams() { + return delegate.getOriginalParams(); + } + + @Override + public Map<Object, Object> getContext() { + return delegate.getContext(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public long getStartTime() { + return delegate.getStartTime(); + } + + @Override + public RTimerTree getRequestTimer() { + return delegate.getRequestTimer(); + } + + @Override + public SolrIndexSearcher getSearcher() { + return delegate.getSearcher(); + } + + @Override + public SolrCore getCore() { + return delegate.getCore(); + } + + @Override + public IndexSchema getSchema() { + return delegate.getSchema(); + } + + @Override + public void updateSchemaToLatest() { + delegate.updateSchemaToLatest(); + } + + @Override + public String getParamString() { + return delegate.getParamString(); + } + + @Override + public Map<String, Object> getJSON() { + return delegate.getJSON(); + } + + @Override + public void setJSON(Map<String, Object> json) { + delegate.setJSON(json); + } + + @Override + public Principal getUserPrincipal() { + return delegate.getUserPrincipal(); + } + + @Override + public HttpSolrCall getHttpSolrCall() { + return httpSolrCall; + } + + @Override + public CloudDescriptor getCloudDescriptor() { + return cloudDescriptor; + } + }; + } + + public static class Factory implements SolrDispatchFilter.HttpSolrCallFactory { + private final Map<String, String> collectionVsCoreNameMapping = new ConcurrentHashMap<>(); + + @Override + public HttpSolrCall createInstance( + SolrDispatchFilter filter, + String path, + CoreContainer cores, + HttpServletRequest request, + HttpServletResponse response, + boolean retry) { + if ((path.startsWith("/____v2/") || path.equals("/____v2"))) { Review Comment: Unlike HttpSolrCallFactory, why does this not need to check for v2 enable? ########## solr/solr-ref-guide/modules/deployment-guide/pages/node-roles.adoc: ########## @@ -66,15 +69,37 @@ A node with this role (in mode "on") can host shards and replicas for collection === `overseer` role A node with this role can perform duties of an overseer node (unless mode is `disallowed`). When one or more nodes have the overseer role in `preferred` mode, the overseer leader will be elected from one of these nodes. In case no node is designated as a preferred overseer or no such node is live, the overseer leader will be elected from one of the nodes that have the overseer role in `allowed` mode. If all nodes that are designated with overseer role (allowed or preferred) are down, the cluster will be left without an overseer. +=== `coordinator` role + +A node with this role can act as if it has replicas of all collections in the cluster when a query is performed. The workflow is as follows + +If the cluster has collections with very large no:of shards, performing distributed requests in your _`data node`_ will lead to + +* large heap utilization +* frequent GC pauses + +In such cases, a few dedicated nodes can be started with a *`coordinator`* role and queries can be sent to that node and avoid intermittent and unpredictable load in data nodes. The coordinator node is stateless and does not host any data. So, we can create and destroy coordinator nodes without any data loass or down time. + +==== The work-flow in a `coordinator` node + +1. A request for *`coll-A`* that uses configset *`configset-A`* comes to coordinator node +2. It checks if there is a core that uses the configset *`configset-A`* is present. If yes, that core acts as a replica of *`coll-A`* and performs a distributed request to all shards of *`coll-A`* and sends back a response Review Comment: This implies that if `collection-1` and `collection-2` are both using the same `configset` then a single synthetic core would be able to service requests for the both of them. I don't think that is accurate from my reading of the code in `CoordinatorHttpSolrCall.getCore` - we should either be more precise with the docs or improve the code to handle this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
