madrob commented on code in PR #996:
URL: https://github.com/apache/solr/pull/996#discussion_r963947450


##########
solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java:
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.servlet;
+
+import java.lang.invoke.MethodHandles;
+import java.security.Principal;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.solr.api.CoordinatorV2HttpSolrCall;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.api.collections.Assign;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ContentStream;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.util.RTimerTree;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CoordinatorHttpSolrCall extends HttpSolrCall {
+  public static final String SYNTHETIC_COLL_PREFIX =
+      Assign.SYSTEM_COLL_PREFIX + "COORDINATOR-COLL-";
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private String collectionName;
+  private final Factory factory;
+
+  public CoordinatorHttpSolrCall(
+      Factory factory,
+      SolrDispatchFilter solrDispatchFilter,
+      CoreContainer cores,
+      HttpServletRequest request,
+      HttpServletResponse response,
+      boolean retry) {
+    super(solrDispatchFilter, cores, request, response, retry);
+    this.factory = factory;
+  }
+
+  @Override
+  protected SolrCore getCoreByCollection(String collectionName, boolean 
isPreferLeader) {
+    this.collectionName = collectionName;
+    SolrCore core = super.getCoreByCollection(collectionName, isPreferLeader);
+    if (core != null) return core;
+    if (!path.endsWith("/select")) return null;
+    return getCore(factory, this, collectionName, isPreferLeader);
+  }
+
+  public static SolrCore getCore(
+      Factory factory, HttpSolrCall solrCall, String collectionName, boolean 
isPreferLeader) {
+    String sytheticCoreName = 
factory.collectionVsCoreNameMapping.get(collectionName);
+    if (sytheticCoreName != null) {
+      return solrCall.cores.getCore(sytheticCoreName);
+    } else {
+      ZkStateReader zkStateReader = 
solrCall.cores.getZkController().getZkStateReader();
+      ClusterState clusterState = zkStateReader.getClusterState();
+      DocCollection coll = clusterState.getCollectionOrNull(collectionName, 
true);
+      if (coll != null) {
+        String confName = coll.getConfigName();
+        String syntheticCollectionName = SYNTHETIC_COLL_PREFIX + confName;
+
+        DocCollection syntheticColl = 
clusterState.getCollectionOrNull(syntheticCollectionName);
+        if (syntheticColl == null) {
+          // no such collection. let's create one
+          if (log.isInfoEnabled()) {
+            log.info(
+                "synthetic collection: {} does not exist, creating.. ", 
syntheticCollectionName);
+          }
+          createColl(syntheticCollectionName, solrCall.cores, confName);
+        }
+        SolrCore core = solrCall.getCoreByCollection(syntheticCollectionName, 
isPreferLeader);
+        if (core != null) {
+          factory.collectionVsCoreNameMapping.put(collectionName, 
core.getName());
+          
solrCall.cores.getZkController().getZkStateReader().registerCore(collectionName);
+          if (log.isDebugEnabled()) {
+            log.debug("coordinator node, returns synthetic core: {}", 
core.getName());
+          }
+        } else {
+          // this node does not have a replica. add one
+          if (log.isInfoEnabled()) {
+            log.info(
+                "this node does not have a replica of the synthetic 
collection: {} , adding replica ",
+                syntheticCollectionName);
+          }
+
+          addReplica(syntheticCollectionName, solrCall.cores);
+          core = solrCall.getCoreByCollection(syntheticCollectionName, 
isPreferLeader);
+        }
+        return core;
+      }
+      return null;
+    }
+  }
+
+  private static void addReplica(String syntheticCollectionName, CoreContainer 
cores) {
+    SolrQueryResponse rsp = new SolrQueryResponse();
+    try {
+      cores
+          .getCollectionsHandler()
+          .handleRequestBody(
+              new LocalSolrQueryRequest(
+                  null,
+                  
CollectionAdminRequest.addReplicaToShard(syntheticCollectionName, "shard1")
+                      .setCreateNodeSet(cores.getZkController().getNodeName())
+                      .getParams()),
+              rsp);
+      if (rsp.getValues().get("success") == null) {
+        throw new SolrException(
+            SolrException.ErrorCode.SERVER_ERROR,
+            "Could not auto-create collection: " + 
Utils.toJSONString(rsp.getValues()));
+      }
+    } catch (SolrException e) {
+      throw e;
+
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  private static void createColl(
+      String syntheticCollectionName, CoreContainer cores, String confName) {
+    SolrQueryResponse rsp = new SolrQueryResponse();
+    try {
+      SolrParams params =
+          CollectionAdminRequest.createCollection(syntheticCollectionName, 
confName, 1, 1)
+              .setCreateNodeSet(cores.getZkController().getNodeName())
+              .getParams();
+      if (log.isInfoEnabled()) {
+        log.info("sending collection admin command : {}", 
Utils.toJSONString(params));
+      }
+      cores.getCollectionsHandler().handleRequestBody(new 
LocalSolrQueryRequest(null, params), rsp);
+      if (rsp.getValues().get("success") == null) {
+        throw new SolrException(
+            SolrException.ErrorCode.SERVER_ERROR,
+            "Could not create :"
+                + syntheticCollectionName
+                + " collection: "
+                + Utils.toJSONString(rsp.getValues()));
+      }
+    } catch (SolrException e) {
+      throw e;
+
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  @Override
+  protected void init() throws Exception {
+    super.init();
+    if (action == SolrDispatchFilter.Action.PROCESS && core != null) {
+      solrReq = wrappedReq(solrReq, collectionName, this);
+    }
+  }
+
+  public static SolrQueryRequest wrappedReq(
+      SolrQueryRequest delegate, String collectionName, HttpSolrCall 
httpSolrCall) {
+    Properties p = new Properties();
+    p.put(CoreDescriptor.CORE_COLLECTION, collectionName);
+    p.put(CloudDescriptor.REPLICA_TYPE, Replica.Type.PULL.toString());
+    p.put(CoreDescriptor.CORE_SHARD, "_");
+
+    CloudDescriptor cloudDescriptor =
+        new CloudDescriptor(
+            delegate.getCore().getCoreDescriptor(), 
delegate.getCore().getName(), p);
+    return new SolrQueryRequest() {
+      @Override
+      public SolrParams getParams() {
+        return delegate.getParams();
+      }
+
+      @Override
+      public void setParams(SolrParams params) {
+
+        delegate.setParams(params);
+      }
+
+      @Override
+      public Iterable<ContentStream> getContentStreams() {
+        return delegate.getContentStreams();
+      }
+
+      @Override
+      public SolrParams getOriginalParams() {
+        return delegate.getOriginalParams();
+      }
+
+      @Override
+      public Map<Object, Object> getContext() {
+        return delegate.getContext();
+      }
+
+      @Override
+      public void close() {
+        delegate.close();
+      }
+
+      @Override
+      public long getStartTime() {
+        return delegate.getStartTime();
+      }
+
+      @Override
+      public RTimerTree getRequestTimer() {
+        return delegate.getRequestTimer();
+      }
+
+      @Override
+      public SolrIndexSearcher getSearcher() {
+        return delegate.getSearcher();
+      }
+
+      @Override
+      public SolrCore getCore() {
+        return delegate.getCore();
+      }
+
+      @Override
+      public IndexSchema getSchema() {
+        return delegate.getSchema();
+      }
+
+      @Override
+      public void updateSchemaToLatest() {
+        delegate.updateSchemaToLatest();
+      }
+
+      @Override
+      public String getParamString() {
+        return delegate.getParamString();
+      }
+
+      @Override
+      public Map<String, Object> getJSON() {
+        return delegate.getJSON();
+      }
+
+      @Override
+      public void setJSON(Map<String, Object> json) {
+        delegate.setJSON(json);
+      }
+
+      @Override
+      public Principal getUserPrincipal() {
+        return delegate.getUserPrincipal();
+      }
+
+      @Override
+      public HttpSolrCall getHttpSolrCall() {
+        return httpSolrCall;
+      }
+
+      @Override
+      public CloudDescriptor getCloudDescriptor() {
+        return cloudDescriptor;
+      }
+    };
+  }
+
+  public static class Factory implements 
SolrDispatchFilter.HttpSolrCallFactory {

Review Comment:
   This needs docs



##########
solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java:
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.servlet;
+
+import java.lang.invoke.MethodHandles;
+import java.security.Principal;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.solr.api.CoordinatorV2HttpSolrCall;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.api.collections.Assign;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ContentStream;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.util.RTimerTree;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CoordinatorHttpSolrCall extends HttpSolrCall {
+  public static final String SYNTHETIC_COLL_PREFIX =
+      Assign.SYSTEM_COLL_PREFIX + "COORDINATOR-COLL-";
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private String collectionName;
+  private final Factory factory;
+
+  public CoordinatorHttpSolrCall(
+      Factory factory,
+      SolrDispatchFilter solrDispatchFilter,
+      CoreContainer cores,
+      HttpServletRequest request,
+      HttpServletResponse response,
+      boolean retry) {
+    super(solrDispatchFilter, cores, request, response, retry);
+    this.factory = factory;
+  }
+
+  @Override
+  protected SolrCore getCoreByCollection(String collectionName, boolean 
isPreferLeader) {
+    this.collectionName = collectionName;
+    SolrCore core = super.getCoreByCollection(collectionName, isPreferLeader);
+    if (core != null) return core;
+    if (!path.endsWith("/select")) return null;
+    return getCore(factory, this, collectionName, isPreferLeader);
+  }
+
+  public static SolrCore getCore(
+      Factory factory, HttpSolrCall solrCall, String collectionName, boolean 
isPreferLeader) {
+    String sytheticCoreName = 
factory.collectionVsCoreNameMapping.get(collectionName);
+    if (sytheticCoreName != null) {
+      return solrCall.cores.getCore(sytheticCoreName);
+    } else {
+      ZkStateReader zkStateReader = 
solrCall.cores.getZkController().getZkStateReader();
+      ClusterState clusterState = zkStateReader.getClusterState();
+      DocCollection coll = clusterState.getCollectionOrNull(collectionName, 
true);
+      if (coll != null) {
+        String confName = coll.getConfigName();
+        String syntheticCollectionName = SYNTHETIC_COLL_PREFIX + confName;
+
+        DocCollection syntheticColl = 
clusterState.getCollectionOrNull(syntheticCollectionName);
+        if (syntheticColl == null) {
+          // no such collection. let's create one
+          if (log.isInfoEnabled()) {
+            log.info(
+                "synthetic collection: {} does not exist, creating.. ", 
syntheticCollectionName);
+          }
+          createColl(syntheticCollectionName, solrCall.cores, confName);
+        }
+        SolrCore core = solrCall.getCoreByCollection(syntheticCollectionName, 
isPreferLeader);
+        if (core != null) {
+          factory.collectionVsCoreNameMapping.put(collectionName, 
core.getName());
+          
solrCall.cores.getZkController().getZkStateReader().registerCore(collectionName);
+          if (log.isDebugEnabled()) {
+            log.debug("coordinator node, returns synthetic core: {}", 
core.getName());
+          }
+        } else {
+          // this node does not have a replica. add one
+          if (log.isInfoEnabled()) {
+            log.info(
+                "this node does not have a replica of the synthetic 
collection: {} , adding replica ",
+                syntheticCollectionName);
+          }
+
+          addReplica(syntheticCollectionName, solrCall.cores);
+          core = solrCall.getCoreByCollection(syntheticCollectionName, 
isPreferLeader);
+        }
+        return core;
+      }
+      return null;
+    }
+  }
+
+  private static void addReplica(String syntheticCollectionName, CoreContainer 
cores) {
+    SolrQueryResponse rsp = new SolrQueryResponse();
+    try {
+      cores
+          .getCollectionsHandler()
+          .handleRequestBody(
+              new LocalSolrQueryRequest(
+                  null,
+                  
CollectionAdminRequest.addReplicaToShard(syntheticCollectionName, "shard1")
+                      .setCreateNodeSet(cores.getZkController().getNodeName())
+                      .getParams()),
+              rsp);
+      if (rsp.getValues().get("success") == null) {
+        throw new SolrException(
+            SolrException.ErrorCode.SERVER_ERROR,
+            "Could not auto-create collection: " + 
Utils.toJSONString(rsp.getValues()));
+      }
+    } catch (SolrException e) {
+      throw e;
+
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  private static void createColl(
+      String syntheticCollectionName, CoreContainer cores, String confName) {
+    SolrQueryResponse rsp = new SolrQueryResponse();
+    try {
+      SolrParams params =
+          CollectionAdminRequest.createCollection(syntheticCollectionName, 
confName, 1, 1)
+              .setCreateNodeSet(cores.getZkController().getNodeName())
+              .getParams();
+      if (log.isInfoEnabled()) {
+        log.info("sending collection admin command : {}", 
Utils.toJSONString(params));
+      }
+      cores.getCollectionsHandler().handleRequestBody(new 
LocalSolrQueryRequest(null, params), rsp);
+      if (rsp.getValues().get("success") == null) {
+        throw new SolrException(
+            SolrException.ErrorCode.SERVER_ERROR,
+            "Could not create :"
+                + syntheticCollectionName
+                + " collection: "
+                + Utils.toJSONString(rsp.getValues()));
+      }
+    } catch (SolrException e) {
+      throw e;
+
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  @Override
+  protected void init() throws Exception {
+    super.init();
+    if (action == SolrDispatchFilter.Action.PROCESS && core != null) {
+      solrReq = wrappedReq(solrReq, collectionName, this);
+    }
+  }
+
+  public static SolrQueryRequest wrappedReq(
+      SolrQueryRequest delegate, String collectionName, HttpSolrCall 
httpSolrCall) {
+    Properties p = new Properties();
+    p.put(CoreDescriptor.CORE_COLLECTION, collectionName);
+    p.put(CloudDescriptor.REPLICA_TYPE, Replica.Type.PULL.toString());
+    p.put(CoreDescriptor.CORE_SHARD, "_");
+
+    CloudDescriptor cloudDescriptor =
+        new CloudDescriptor(
+            delegate.getCore().getCoreDescriptor(), 
delegate.getCore().getName(), p);
+    return new SolrQueryRequest() {

Review Comment:
   Please create a `DelegatingSolrRequest` or `FilterSolrRequest` and put it in 
its own class so that we can override only the necessary methods here and the 
code is easier to review/maintain.



##########
solr/core/src/java/org/apache/solr/api/CoordinatorV2HttpSolrCall.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.api;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.servlet.CoordinatorHttpSolrCall;
+import org.apache.solr.servlet.SolrDispatchFilter;
+
+public class CoordinatorV2HttpSolrCall extends V2HttpCall {
+  private String collectionName;
+  CoordinatorHttpSolrCall.Factory factory;
+
+  public CoordinatorV2HttpSolrCall(
+      CoordinatorHttpSolrCall.Factory factory,
+      SolrDispatchFilter solrDispatchFilter,
+      CoreContainer cc,
+      HttpServletRequest request,
+      HttpServletResponse response,
+      boolean retry) {
+    super(solrDispatchFilter, cc, request, response, retry);
+    this.factory = factory;
+  }
+
+  @Override
+  protected SolrCore getCoreByCollection(String collectionName, boolean 
isPreferLeader) {
+    this.collectionName = collectionName;
+    SolrCore core = super.getCoreByCollection(collectionName, isPreferLeader);
+    if (core != null) return core;
+    if (!path.endsWith("/select")) return null;
+    return CoordinatorHttpSolrCall.getCore(factory, this, collectionName, 
isPreferLeader);
+  }
+
+  @Override
+  protected void init() throws Exception {
+    super.init();
+    if (action == SolrDispatchFilter.Action.PROCESS && core != null) {
+      solrReq = CoordinatorHttpSolrCall.wrappedReq(solrReq, collectionName, 
this);
+    }

Review Comment:
   What does this logic do?



##########
solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java:
##########
@@ -394,4 +401,21 @@ private boolean shouldAudit(AuditEvent.EventType 
eventType) {
   void replaceRateLimitManager(RateLimitManager rateLimitManager) {
     coreService.getService().setRateLimitManager(rateLimitManager);
   }
+
+  /** internal API */
+  public interface HttpSolrCallFactory {
+    default HttpSolrCall createInstance(
+        SolrDispatchFilter filter,
+        String path,
+        CoreContainer cores,
+        HttpServletRequest request,
+        HttpServletResponse response,
+        boolean retry) {
+      if (filter.isV2Enabled && (path.startsWith("/____v2/") || 
path.equals("/____v2"))) {
+        return new V2HttpCall(filter, cores, request, response, false);
+      } else {
+        return new HttpSolrCall(filter, cores, request, response, retry);

Review Comment:
   Why does V2 not retry and V1 does retry?



##########
solr/solrj/src/java/org/apache/solr/client/solrj/impl/CloudSolrClient.java:
##########
@@ -1092,6 +1092,21 @@ protected NamedList<Object> sendRequest(SolrRequest<?> 
request, List<String> inp
                 + inputCollections);
       }
 
+      List<String> preferredNodes = request.getPreferredNodes();
+      if (preferredNodes != null && !preferredNodes.isEmpty()) {
+        String joinedInputCollections = StrUtils.join(inputCollections, ',');
+        List<String> urlList = new ArrayList<>(preferredNodes.size());
+        for (String nodeName : preferredNodes) {
+          urlList.add(
+              Utils.getBaseUrlForNodeName(nodeName, urlScheme) + "/" + 
joinedInputCollections);
+        }
+        if (!urlList.isEmpty()) {
+          LBSolrClient.Req req = new LBSolrClient.Req(request, urlList);
+          LBSolrClient.Rsp rsp = getLbClient().request(req);
+          return rsp.getResponse();

Review Comment:
   I don't like this style of multiple return points from a method, can we 
combine this with the same duplicated code at the end of the method?



##########
solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java:
##########
@@ -137,7 +140,15 @@ public SolrDispatchFilter() {}
   public void init(FilterConfig config) throws ServletException {
     try {
       coreService = 
CoreContainerProvider.serviceForContext(config.getServletContext());
-
+      boolean isCoordinator =
+          NodeRoles.MODE_ON.equals(
+              coreService
+                  .getService()
+                  .getCoreContainer()
+                  .nodeRoles
+                  .getRoleMode(NodeRoles.Role.COORDINATOR));
+      solrCallFactory =
+          isCoordinator ? new CoordinatorHttpSolrCall.Factory() : new 
HttpSolrCallFactory() {};

Review Comment:
   This is odd to be creating a new factory each time, since the object is 
stateless. Can we create a singleton of each factory and re-use those?



##########
solr/solr-ref-guide/modules/deployment-guide/pages/node-roles.adoc:
##########
@@ -66,15 +69,37 @@ A node with this role (in mode "on") can host shards and 
replicas for collection
 === `overseer` role
 A node with this role can perform duties of an overseer node (unless mode is 
`disallowed`). When one or more nodes have the overseer role in `preferred` 
mode, the overseer leader will be elected from one of these nodes. In case no 
node is designated as a preferred overseer or no such node is live, the 
overseer leader will be elected from one of the nodes that have the overseer 
role in `allowed` mode. If all nodes that are designated with overseer role 
(allowed or preferred) are down, the cluster will be left without an overseer.
 
+=== `coordinator` role
+
+A node with this role can act as if it has replicas of all collections in the 
cluster when a query is performed. The workflow is as follows
+
+If the cluster has collections with very large no:of shards, performing 
distributed requests in your _`data node`_ will lead to
+
+* large heap utilization
+* frequent GC pauses
+
+In such cases, a few dedicated nodes can be started with a *`coordinator`* 
role and queries can be sent to that node and avoid intermittent and 
unpredictable load in data nodes. The coordinator node is stateless and does 
not host any data. So, we can create and destroy coordinator nodes without any 
data loass or down time.

Review Comment:
   Which client needs to be used to send queries to coordinator nodes? Do I 
need to set a routing property on my request? A shards.preference policy? How 
do I take advantage of this new node type? Especially if the set of coordinator 
nodes is dynamic, as suggested below, do I have to look up the live nodes in 
zookeeper?
   
   Some of these questions would be better answered in 
`solrcloud-distributed-requests.adoc`



##########
solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java:
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.servlet;
+
+import java.lang.invoke.MethodHandles;
+import java.security.Principal;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.solr.api.CoordinatorV2HttpSolrCall;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.api.collections.Assign;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.common.util.ContentStream;
+import org.apache.solr.common.util.Utils;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.LocalSolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.search.SolrIndexSearcher;
+import org.apache.solr.util.RTimerTree;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CoordinatorHttpSolrCall extends HttpSolrCall {
+  public static final String SYNTHETIC_COLL_PREFIX =
+      Assign.SYSTEM_COLL_PREFIX + "COORDINATOR-COLL-";
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private String collectionName;
+  private final Factory factory;
+
+  public CoordinatorHttpSolrCall(
+      Factory factory,
+      SolrDispatchFilter solrDispatchFilter,
+      CoreContainer cores,
+      HttpServletRequest request,
+      HttpServletResponse response,
+      boolean retry) {
+    super(solrDispatchFilter, cores, request, response, retry);
+    this.factory = factory;
+  }
+
+  @Override
+  protected SolrCore getCoreByCollection(String collectionName, boolean 
isPreferLeader) {
+    this.collectionName = collectionName;
+    SolrCore core = super.getCoreByCollection(collectionName, isPreferLeader);
+    if (core != null) return core;
+    if (!path.endsWith("/select")) return null;
+    return getCore(factory, this, collectionName, isPreferLeader);
+  }
+
+  public static SolrCore getCore(
+      Factory factory, HttpSolrCall solrCall, String collectionName, boolean 
isPreferLeader) {
+    String sytheticCoreName = 
factory.collectionVsCoreNameMapping.get(collectionName);
+    if (sytheticCoreName != null) {
+      return solrCall.cores.getCore(sytheticCoreName);
+    } else {
+      ZkStateReader zkStateReader = 
solrCall.cores.getZkController().getZkStateReader();
+      ClusterState clusterState = zkStateReader.getClusterState();
+      DocCollection coll = clusterState.getCollectionOrNull(collectionName, 
true);
+      if (coll != null) {
+        String confName = coll.getConfigName();
+        String syntheticCollectionName = SYNTHETIC_COLL_PREFIX + confName;
+
+        DocCollection syntheticColl = 
clusterState.getCollectionOrNull(syntheticCollectionName);
+        if (syntheticColl == null) {
+          // no such collection. let's create one
+          if (log.isInfoEnabled()) {
+            log.info(
+                "synthetic collection: {} does not exist, creating.. ", 
syntheticCollectionName);
+          }
+          createColl(syntheticCollectionName, solrCall.cores, confName);
+        }
+        SolrCore core = solrCall.getCoreByCollection(syntheticCollectionName, 
isPreferLeader);
+        if (core != null) {
+          factory.collectionVsCoreNameMapping.put(collectionName, 
core.getName());
+          
solrCall.cores.getZkController().getZkStateReader().registerCore(collectionName);
+          if (log.isDebugEnabled()) {
+            log.debug("coordinator node, returns synthetic core: {}", 
core.getName());
+          }
+        } else {
+          // this node does not have a replica. add one
+          if (log.isInfoEnabled()) {
+            log.info(
+                "this node does not have a replica of the synthetic 
collection: {} , adding replica ",
+                syntheticCollectionName);
+          }
+
+          addReplica(syntheticCollectionName, solrCall.cores);
+          core = solrCall.getCoreByCollection(syntheticCollectionName, 
isPreferLeader);
+        }
+        return core;
+      }
+      return null;
+    }
+  }
+
+  private static void addReplica(String syntheticCollectionName, CoreContainer 
cores) {
+    SolrQueryResponse rsp = new SolrQueryResponse();
+    try {
+      cores
+          .getCollectionsHandler()
+          .handleRequestBody(
+              new LocalSolrQueryRequest(
+                  null,
+                  
CollectionAdminRequest.addReplicaToShard(syntheticCollectionName, "shard1")
+                      .setCreateNodeSet(cores.getZkController().getNodeName())
+                      .getParams()),
+              rsp);
+      if (rsp.getValues().get("success") == null) {
+        throw new SolrException(
+            SolrException.ErrorCode.SERVER_ERROR,
+            "Could not auto-create collection: " + 
Utils.toJSONString(rsp.getValues()));
+      }
+    } catch (SolrException e) {
+      throw e;
+
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  private static void createColl(
+      String syntheticCollectionName, CoreContainer cores, String confName) {
+    SolrQueryResponse rsp = new SolrQueryResponse();
+    try {
+      SolrParams params =
+          CollectionAdminRequest.createCollection(syntheticCollectionName, 
confName, 1, 1)
+              .setCreateNodeSet(cores.getZkController().getNodeName())
+              .getParams();
+      if (log.isInfoEnabled()) {
+        log.info("sending collection admin command : {}", 
Utils.toJSONString(params));
+      }
+      cores.getCollectionsHandler().handleRequestBody(new 
LocalSolrQueryRequest(null, params), rsp);
+      if (rsp.getValues().get("success") == null) {
+        throw new SolrException(
+            SolrException.ErrorCode.SERVER_ERROR,
+            "Could not create :"
+                + syntheticCollectionName
+                + " collection: "
+                + Utils.toJSONString(rsp.getValues()));
+      }
+    } catch (SolrException e) {
+      throw e;
+
+    } catch (Exception e) {
+      throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
+    }
+  }
+
+  @Override
+  protected void init() throws Exception {
+    super.init();
+    if (action == SolrDispatchFilter.Action.PROCESS && core != null) {
+      solrReq = wrappedReq(solrReq, collectionName, this);
+    }
+  }
+
+  public static SolrQueryRequest wrappedReq(
+      SolrQueryRequest delegate, String collectionName, HttpSolrCall 
httpSolrCall) {
+    Properties p = new Properties();
+    p.put(CoreDescriptor.CORE_COLLECTION, collectionName);
+    p.put(CloudDescriptor.REPLICA_TYPE, Replica.Type.PULL.toString());
+    p.put(CoreDescriptor.CORE_SHARD, "_");
+
+    CloudDescriptor cloudDescriptor =
+        new CloudDescriptor(
+            delegate.getCore().getCoreDescriptor(), 
delegate.getCore().getName(), p);
+    return new SolrQueryRequest() {
+      @Override
+      public SolrParams getParams() {
+        return delegate.getParams();
+      }
+
+      @Override
+      public void setParams(SolrParams params) {
+
+        delegate.setParams(params);
+      }
+
+      @Override
+      public Iterable<ContentStream> getContentStreams() {
+        return delegate.getContentStreams();
+      }
+
+      @Override
+      public SolrParams getOriginalParams() {
+        return delegate.getOriginalParams();
+      }
+
+      @Override
+      public Map<Object, Object> getContext() {
+        return delegate.getContext();
+      }
+
+      @Override
+      public void close() {
+        delegate.close();
+      }
+
+      @Override
+      public long getStartTime() {
+        return delegate.getStartTime();
+      }
+
+      @Override
+      public RTimerTree getRequestTimer() {
+        return delegate.getRequestTimer();
+      }
+
+      @Override
+      public SolrIndexSearcher getSearcher() {
+        return delegate.getSearcher();
+      }
+
+      @Override
+      public SolrCore getCore() {
+        return delegate.getCore();
+      }
+
+      @Override
+      public IndexSchema getSchema() {
+        return delegate.getSchema();
+      }
+
+      @Override
+      public void updateSchemaToLatest() {
+        delegate.updateSchemaToLatest();
+      }
+
+      @Override
+      public String getParamString() {
+        return delegate.getParamString();
+      }
+
+      @Override
+      public Map<String, Object> getJSON() {
+        return delegate.getJSON();
+      }
+
+      @Override
+      public void setJSON(Map<String, Object> json) {
+        delegate.setJSON(json);
+      }
+
+      @Override
+      public Principal getUserPrincipal() {
+        return delegate.getUserPrincipal();
+      }
+
+      @Override
+      public HttpSolrCall getHttpSolrCall() {
+        return httpSolrCall;
+      }
+
+      @Override
+      public CloudDescriptor getCloudDescriptor() {
+        return cloudDescriptor;
+      }
+    };
+  }
+
+  public static class Factory implements 
SolrDispatchFilter.HttpSolrCallFactory {
+    private final Map<String, String> collectionVsCoreNameMapping = new 
ConcurrentHashMap<>();
+
+    @Override
+    public HttpSolrCall createInstance(
+        SolrDispatchFilter filter,
+        String path,
+        CoreContainer cores,
+        HttpServletRequest request,
+        HttpServletResponse response,
+        boolean retry) {
+      if ((path.startsWith("/____v2/") || path.equals("/____v2"))) {

Review Comment:
   Unlike HttpSolrCallFactory, why does this not need to check for v2 enable?



##########
solr/solr-ref-guide/modules/deployment-guide/pages/node-roles.adoc:
##########
@@ -66,15 +69,37 @@ A node with this role (in mode "on") can host shards and 
replicas for collection
 === `overseer` role
 A node with this role can perform duties of an overseer node (unless mode is 
`disallowed`). When one or more nodes have the overseer role in `preferred` 
mode, the overseer leader will be elected from one of these nodes. In case no 
node is designated as a preferred overseer or no such node is live, the 
overseer leader will be elected from one of the nodes that have the overseer 
role in `allowed` mode. If all nodes that are designated with overseer role 
(allowed or preferred) are down, the cluster will be left without an overseer.
 
+=== `coordinator` role
+
+A node with this role can act as if it has replicas of all collections in the 
cluster when a query is performed. The workflow is as follows
+
+If the cluster has collections with very large no:of shards, performing 
distributed requests in your _`data node`_ will lead to
+
+* large heap utilization
+* frequent GC pauses
+
+In such cases, a few dedicated nodes can be started with a *`coordinator`* 
role and queries can be sent to that node and avoid intermittent and 
unpredictable load in data nodes. The coordinator node is stateless and does 
not host any data. So, we can create and destroy coordinator nodes without any 
data loass or down time.
+
+==== The work-flow in a `coordinator` node
+
+1. A request for *`coll-A`* that uses configset *`configset-A`* comes to 
coordinator node
+2. It checks if there is a core that uses the configset *`configset-A`* is 
present. If yes, that core acts as a replica of *`coll-A`* and performs a 
distributed request to all shards of *`coll-A`* and sends back a response

Review Comment:
   This implies that if `collection-1` and `collection-2` are both using the 
same `configset` then a single synthetic core would be able to service requests 
for the both of them. I don't think that is accurate from my reading of the 
code in `CoordinatorHttpSolrCall.getCore` - we should either be more precise 
with the docs or improve the code to handle this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to