This is an automated email from the ASF dual-hosted git repository. noble pushed a commit to branch jira/SOLR15715_1 in repository https://gitbox.apache.org/repos/asf/solr.git
commit 868088a954ab362efd03bf2a3e233f23eab527bf Author: Noble Paul <[email protected]> AuthorDate: Fri Feb 25 15:32:28 2022 +1100 Cooordinator role take 2 --- .../apache/solr/api/CoordinatorV2HttpSolrCall.java | 49 ++++ .../placement/impl/PlacementRequestImpl.java | 4 +- .../placement/plugins/SimplePlacementFactory.java | 2 +- .../src/java/org/apache/solr/core/NodeRoles.java | 12 + .../apache/solr/handler/RequestHandlerBase.java | 2 +- .../solr/handler/component/HttpShardHandler.java | 2 +- .../solr/servlet/CoordinatorHttpSolrCall.java | 257 +++++++++++++++++++++ .../apache/solr/servlet/SolrDispatchFilter.java | 12 +- .../apache/solr/search/TestCoordinatorRole.java | 86 +++++++ 9 files changed, 418 insertions(+), 8 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/api/CoordinatorV2HttpSolrCall.java b/solr/core/src/java/org/apache/solr/api/CoordinatorV2HttpSolrCall.java new file mode 100644 index 0000000..1b0eba9 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/api/CoordinatorV2HttpSolrCall.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.api; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrCore; +import org.apache.solr.servlet.CoordinatorHttpSolrCall; +import org.apache.solr.servlet.SolrDispatchFilter; + +public class CoordinatorV2HttpSolrCall extends V2HttpCall { + private String collectionName; + public CoordinatorV2HttpSolrCall(SolrDispatchFilter solrDispatchFilter, CoreContainer cc, HttpServletRequest request, + HttpServletResponse response, boolean retry) { + super(solrDispatchFilter, cc, request, response, retry); + } + @Override + protected SolrCore getCoreByCollection(String collectionName, boolean isPreferLeader) { + this.collectionName = collectionName; + SolrCore core = super.getCoreByCollection(collectionName, isPreferLeader); + if (core != null) return core; + return CoordinatorHttpSolrCall.getCore(this, collectionName, isPreferLeader); + } + + @Override + protected void init() throws Exception { + super.init(); + if(action == SolrDispatchFilter.Action.PROCESS && core != null) { + solrReq = CoordinatorHttpSolrCall.wrappedReq(solrReq, collectionName, this); + } + } +} \ No newline at end of file diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementRequestImpl.java b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementRequestImpl.java index 285ffba..d445c4f 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementRequestImpl.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/impl/PlacementRequestImpl.java @@ -83,11 +83,11 @@ public class PlacementRequestImpl implements PlacementRequest { if (assignRequest.nodes != null) { nodes = SimpleClusterAbstractionsImpl.NodeImpl.getNodes(assignRequest.nodes); - for (Node n: nodes) { + /* for (Node n: nodes) { if (!cluster.getLiveDataNodes().contains(n)) { throw new Assign.AssignmentException("Bad assign request: specified node is a non-data hosting node (" + n.getName() + ") for collection " + solrCollection.getName()); } - } + }*/ if (nodes.isEmpty()) { throw new Assign.AssignmentException("Bad assign request: empty list of nodes for collection " + solrCollection.getName()); } diff --git a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SimplePlacementFactory.java b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SimplePlacementFactory.java index 420d3cb..0ef7503 100644 --- a/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SimplePlacementFactory.java +++ b/solr/core/src/java/org/apache/solr/cluster/placement/plugins/SimplePlacementFactory.java @@ -99,7 +99,7 @@ public class SimplePlacementFactory implements PlacementPluginFactory<PlacementP private Map<Node, ReplicaCount> getNodeVsShardCount(PlacementContext placementContext) { HashMap<Node, ReplicaCount> nodeVsShardCount = new HashMap<>(); - for (Node s : placementContext.getCluster().getLiveDataNodes()) { + for (Node s : placementContext.getCluster().getLiveNodes()) { nodeVsShardCount.computeIfAbsent(s, ReplicaCount::new); } diff --git a/solr/core/src/java/org/apache/solr/core/NodeRoles.java b/solr/core/src/java/org/apache/solr/core/NodeRoles.java index d13b651..429553f 100644 --- a/solr/core/src/java/org/apache/solr/core/NodeRoles.java +++ b/solr/core/src/java/org/apache/solr/core/NodeRoles.java @@ -98,6 +98,18 @@ public class NodeRoles { public String modeWhenRoleIsAbsent() { return MODE_DISALLOWED; } + }, + + COORDINATOR("coordinator") { + @Override + public String modeWhenRoleIsAbsent() { + return MODE_OFF; + } + + @Override + public Set<String> supportedModes() { + return Set.of(MODE_ON, MODE_OFF); + } }; public final String roleName; diff --git a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java index eec7ef1..41d2774 100644 --- a/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java +++ b/solr/core/src/java/org/apache/solr/handler/RequestHandlerBase.java @@ -190,7 +190,7 @@ public abstract class RequestHandlerBase implements requests.inc(); // requests are distributed by default when ZK is in use, unless indicated otherwise boolean distrib = req.getParams().getBool(CommonParams.DISTRIB, - req.getCore() != null ? req.getCore().getCoreContainer().isZooKeeperAware() : false); + req.getCoreContainer() != null ? req.getCoreContainer().isZooKeeperAware() : false); if (req.getParams().getBool(ShardParams.IS_SHARD, false)) { shardPurposes.computeIfAbsent("total", name -> new Counter()).inc(); int purpose = req.getParams().getInt(ShardParams.SHARDS_PURPOSE, 0); diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java index 99d1297..76d4b02 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandler.java @@ -262,7 +262,7 @@ public class HttpShardHandler extends ShardHandler { final String shards = params.get(ShardParams.SHARDS); CoreDescriptor coreDescriptor = req.getCore().getCoreDescriptor(); - CloudDescriptor cloudDescriptor = coreDescriptor.getCloudDescriptor(); + CloudDescriptor cloudDescriptor = req.getCloudDescriptor(); ZkController zkController = req.getCoreContainer().getZkController(); final ReplicaListTransformer replicaListTransformer = httpShardHandlerFactory.getReplicaListTransformer(req); diff --git a/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java b/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java new file mode 100644 index 0000000..dce41a2 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/servlet/CoordinatorHttpSolrCall.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.servlet; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import java.lang.invoke.MethodHandles; +import java.security.Principal; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.cloud.CloudDescriptor; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.ContentStream; +import org.apache.solr.common.util.Utils; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.CoreDescriptor; +import org.apache.solr.core.SolrCore; +import org.apache.solr.request.LocalSolrQueryRequest; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.schema.IndexSchema; +import org.apache.solr.search.SolrIndexSearcher; +import org.apache.solr.util.RTimerTree; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CoordinatorHttpSolrCall extends HttpSolrCall { + public static final String SYNTHETIC_COLL_PREFIX = "SYNTHETIC-COLL-"; + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private String collectionName; + + public CoordinatorHttpSolrCall(SolrDispatchFilter solrDispatchFilter, CoreContainer cores, HttpServletRequest request, + HttpServletResponse response, boolean retry) { + super(solrDispatchFilter, cores, request, response, retry); + } + + @Override + protected SolrCore getCoreByCollection(String collectionName, boolean isPreferLeader) { + this.collectionName = collectionName; + SolrCore core = super.getCoreByCollection(collectionName, isPreferLeader); + if (core != null) return core; + return getCore(this, collectionName, isPreferLeader); + } + + @SuppressWarnings("unchecked") + public static SolrCore getCore(HttpSolrCall solrCall, String collectionName, boolean isPreferLeader) { + Map<String, String> coreNameMapping= (Map<String, String>) solrCall.cores.getObjectCache() + .computeIfAbsent(CoordinatorHttpSolrCall.class.getName(), + s -> new ConcurrentHashMap<>()); + String sytheticCoreName = coreNameMapping.get(collectionName); + if (sytheticCoreName != null) { + return solrCall.cores.getCore(sytheticCoreName); + } else { + ZkStateReader zkStateReader = solrCall.cores.getZkController().getZkStateReader(); + ClusterState clusterState = zkStateReader.getClusterState(); + DocCollection coll = clusterState.getCollectionOrNull(collectionName, true); + if (coll != null) { + String confName = coll.getConfigName(); + String syntheticCollectionName = SYNTHETIC_COLL_PREFIX + confName; + + DocCollection syntheticColl = clusterState.getCollectionOrNull(syntheticCollectionName); + if (syntheticColl == null) { + // no such collection. let's create one + log.info("synthetic collection: {} does not exist, creating.. ", syntheticCollectionName); + createColl(syntheticCollectionName, solrCall.cores, confName); + } + SolrCore core = solrCall.getCoreByCollection(syntheticCollectionName, isPreferLeader); + if (core != null) { + coreNameMapping.put(collectionName, core.getName()); + log.info("coordinator NODE , returns synthetic core " + core.getName()); + } else { + //this node does not have a replica. add one + log.info("this node does not have a replica of the synthetic collection: {} , adding replica ", syntheticCollectionName); + + addReplica(syntheticCollectionName, solrCall.cores); + core = solrCall.getCoreByCollection(syntheticCollectionName, isPreferLeader); + } + return core; + + } + return null; + } + + } + private static void addReplica(String syntheticCollectionName, CoreContainer cores) { + SolrQueryResponse rsp = new SolrQueryResponse(); + try { + cores.getCollectionsHandler().handleRequestBody(new LocalSolrQueryRequest(null, + CollectionAdminRequest.addReplicaToShard(syntheticCollectionName, "shard1") + .setCreateNodeSet(cores.getZkController().getNodeName()) + .getParams() + ), rsp); + if (rsp.getValues().get("success") == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not auto-create collection: " + Utils.toJSONString(rsp.getValues())); + } + } catch (SolrException e) { + throw e; + + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + + } + + private static void createColl(String syntheticCollectionName, CoreContainer cores, String confName) { + SolrQueryResponse rsp = new SolrQueryResponse(); + try { + SolrParams params = CollectionAdminRequest.createCollection(syntheticCollectionName, confName, 1, 1) + .setCreateNodeSet(cores.getZkController().getNodeName()).getParams(); + log.info("sending collection admin command : {}", Utils.toJSONString(params)); + cores.getCollectionsHandler() + .handleRequestBody(new LocalSolrQueryRequest(null, + params), rsp); + if (rsp.getValues().get("success") == null) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Could not create :" + syntheticCollectionName + " collection: " + Utils.toJSONString(rsp.getValues())); + } + } catch (SolrException e) { + throw e; + + } catch (Exception e) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e); + } + } + + @Override + protected void init() throws Exception { + super.init(); + if(action == SolrDispatchFilter.Action.PROCESS && core != null) { + solrReq = wrappedReq(solrReq, collectionName, this); + } + } + + public static SolrQueryRequest wrappedReq(SolrQueryRequest delegate, String collectionName, HttpSolrCall httpSolrCall) { + Properties p = new Properties(); + p.put(CoreDescriptor.CORE_COLLECTION, collectionName); + p.put(CloudDescriptor.REPLICA_TYPE, Replica.Type.PULL.toString()); + p.put(CoreDescriptor.CORE_SHARD, "_"); + + CloudDescriptor cloudDescriptor = new CloudDescriptor(delegate.getCore().getCoreDescriptor(), + delegate.getCore().getName(), p); + return new SolrQueryRequest() { + @Override + public SolrParams getParams() { + return delegate.getParams(); + } + + @Override + public void setParams(SolrParams params) { + + delegate.setParams(params); + } + + @Override + public Iterable<ContentStream> getContentStreams() { + return delegate.getContentStreams(); + } + + @Override + public SolrParams getOriginalParams() { + return delegate.getOriginalParams(); + } + + @Override + public Map<Object, Object> getContext() { + return delegate.getContext(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public long getStartTime() { + return delegate.getStartTime(); + } + + @Override + public RTimerTree getRequestTimer() { + return delegate.getRequestTimer(); + } + + @Override + public SolrIndexSearcher getSearcher() { + return delegate.getSearcher(); + } + + @Override + public SolrCore getCore() { + return delegate.getCore(); + } + + @Override + public IndexSchema getSchema() { + return delegate.getSchema(); + } + + @Override + public void updateSchemaToLatest() { + delegate.updateSchemaToLatest(); + + } + + @Override + public String getParamString() { + return delegate.getParamString(); + } + + @Override + public Map<String, Object> getJSON() { + return delegate.getJSON(); + } + + @Override + public void setJSON(Map<String, Object> json) { + delegate.setJSON(json); + } + + @Override + public Principal getUserPrincipal() { + return delegate.getUserPrincipal(); + } + + @Override + public HttpSolrCall getHttpSolrCall() { + return httpSolrCall; + } + @Override + public CloudDescriptor getCloudDescriptor() { + return cloudDescriptor; + } + }; + } +} \ No newline at end of file diff --git a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java index 2c9cb69..de85aa2 100644 --- a/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java +++ b/solr/core/src/java/org/apache/solr/servlet/SolrDispatchFilter.java @@ -27,6 +27,7 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.NodeRoles; import org.apache.solr.core.SolrCore; import org.apache.solr.logging.MDCLoggingContext; import org.apache.solr.logging.MDCSnapshot; @@ -91,6 +92,7 @@ public class SolrDispatchFilter extends BaseSolrFilter implements PathExcluder { private final boolean isV2Enabled = !"true".equals(System.getProperty("disable.v2.api", "false")); + private boolean isCoordinator; public HttpClient getHttpClient() { try { return coreService.getService().getHttpClient(); @@ -130,7 +132,7 @@ public class SolrDispatchFilter extends BaseSolrFilter implements PathExcluder { public void init(FilterConfig config) throws ServletException { try { coreService = CoreContainerProvider.serviceForContext(config.getServletContext()); - + this.isCoordinator = NodeRoles.MODE_ON.equals(coreService.getService().getCoreContainer().nodeRoles.getRoleMode(NodeRoles.Role.COORDINATOR)); if (log.isTraceEnabled()) { log.trace("SolrDispatchFilter.init(): {}", this.getClass().getClassLoader()); } @@ -255,9 +257,13 @@ public class SolrDispatchFilter extends BaseSolrFilter implements PathExcluder { throw new SolrException(ErrorCode.SERVER_ERROR, "Core Container Unavailable"); } if (isV2Enabled && (path.startsWith("/____v2/") || path.equals("/____v2"))) { - return new V2HttpCall(this, cores, request, response, false); + return isCoordinator ? + new CoordinatorHttpSolrCall(this, cores, request, response, false) : + new V2HttpCall(this, cores, request, response, false); } else { - return new HttpSolrCall(this, cores, request, response, retry); + return isCoordinator ? + new CoordinatorHttpSolrCall(this, cores, request, response, retry) : + new HttpSolrCall(this, cores, request, response, retry); } } diff --git a/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java new file mode 100644 index 0000000..55dd37c --- /dev/null +++ b/solr/core/src/test/org/apache/solr/search/TestCoordinatorRole.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.search; + +import java.util.Collection; +import java.util.Collections; + +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.NavigableObject; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.util.Utils; +import org.apache.solr.core.NodeRoles; +import org.apache.solr.servlet.CoordinatorHttpSolrCall; +import org.junit.BeforeClass; + +public class TestCoordinatorRole extends SolrCloudTestCase { + + @BeforeClass + public static void setupCluster() throws Exception { + configureCluster(4) + .addConfig("conf", configset("cloud-minimal")) + .configure(); + } + + public void testSimple() throws Exception { + CloudSolrClient client = cluster.getSolrClient(); + String COLLECTION_NAME = "test_coll"; + String SYNTHETIC_COLLECTION = CoordinatorHttpSolrCall.SYNTHETIC_COLL_PREFIX +"conf"; + CollectionAdminRequest + .createCollection(COLLECTION_NAME, "conf", 2, 2) + .process(cluster.getSolrClient()); + cluster.waitForActiveCollection(COLLECTION_NAME, 2, 4); + UpdateRequest ur = new UpdateRequest(); + for (int i = 0; i < 10; i++) { + SolrInputDocument doc2 = new SolrInputDocument(); + doc2.addField("id", "" + i); + doc2.addField("fld1_s", "1 value 1 value 1 value 1 value 1 value 1 value 1 value "); + doc2.addField("fld2_s", "2 value 2 value 2 value 2 value 2 value 2 value 2 value 2 value 2 value 2 value "); + doc2.addField("fld3_s", "3 value 3 value 3 value 3 value 3 value 3 value 3 value 3 value 3 value 3 value 3 value 3 value 3 value 3 value "); + doc2.addField("fld4_s", "4 value 4 value 4 value 4 value 4 value 4 value 4 value 4 value 4 value "); + doc2.addField("fld5_s", "5 value 5 value 5 value 5 value 5 value 5 value 5 value 5 value 5 value 5 value 5 value 5 value "); + ur.add(doc2); + } + + ur.commit(client, COLLECTION_NAME); + QueryResponse rsp = client.query(COLLECTION_NAME, new SolrQuery("*:*")); + assertEquals(10, rsp.getResults().getNumFound()); + + System.setProperty(NodeRoles.NODE_ROLES_PROP, "coordinator:on"); + JettySolrRunner coordinatorJetty = null; + try { + coordinatorJetty = cluster.startJettySolrRunner(); + } finally { + System.clearProperty(NodeRoles.NODE_ROLES_PROP); + } + NavigableObject result = (NavigableObject) Utils.executeGET(cluster.getSolrClient().getHttpClient(), + coordinatorJetty.getBaseUrl().toString()+"/"+COLLECTION_NAME +"/select?q=*:*&wt=javabin", Utils.JAVABINCONSUMER); + + assertEquals(10, ((Collection)result._get("response", Collections.emptyList())).size()); + + assertNotNull(cluster.getSolrClient().getClusterStateProvider().getCollection(SYNTHETIC_COLLECTION)); + + } + +} \ No newline at end of file
