This is an automated email from the ASF dual-hosted git repository.

magibney pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new d195e4c  SOLR-15045: DistributedZkUpdateProcessor should issue commits 
to local shards and remote shards in parallel (#545)
d195e4c is described below

commit d195e4c2b1d5f57cc33b7f75d91a483d5abda4b2
Author: Michael Gibney <[email protected]>
AuthorDate: Tue Jan 25 13:59:11 2022 -0500

    SOLR-15045: DistributedZkUpdateProcessor should issue commits to local 
shards and remote shards in parallel (#545)
---
 solr/CHANGES.txt                                   |   3 +-
 .../processor/DistributedZkUpdateProcessor.java    |  24 +++-
 .../conf/solrconfig-parallel-commit.xml            |  52 ++++++++
 .../solr/cloud/ParallelCommitExecutionTest.java    | 139 +++++++++++++++++++++
 4 files changed, 212 insertions(+), 6 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 2a84ba3..fbef2ff 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -22,7 +22,8 @@ Optimizations
 
 Bug Fixes
 ---------------------
-(No changes)
+* SOLR-15045: `DistributedZkUpdateProcessor` now issues commits to local 
shards and remote shards in parallel,
+  halving the latency of synchronous commits (Michael Gibney)
 
 Other Changes
 ---------------------
diff --git 
a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
 
b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
index c58bca8..ab86400 100644
--- 
a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
+++ 
b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java
@@ -195,6 +195,8 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
       // zk
       ModifiableSolrParams params = new 
ModifiableSolrParams(filterParams(req.getParams()));
 
+      // TODO: revisit the need for tracking `issuedDistribCommit` -- see 
below, and SOLR-15045
+      boolean issuedDistribCommit = false;
       List<SolrCmdDistributor.Node> useNodes = null;
       if (req.getParams().get(COMMIT_END_POINT) == null) {
         useNodes = nodes;
@@ -204,11 +206,16 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
           params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
               zkController.getBaseUrl(), req.getCore().getName()));
           cmdDistrib.distribCommit(cmd, useNodes, params);
-          cmdDistrib.blockAndDoRetries();
+          issuedDistribCommit = true;
         }
       }
 
       if (isLeader) {
+        if (issuedDistribCommit) {
+          // defensive copy of params, which was passed into 
distribCommit(...) above; will unconditionally replace
+          // DISTRIB_UPDATE_PARAM, COMMIT_END_POINT, and DISTRIB_FROM if the 
new `params` val will actually be used
+          params = new ModifiableSolrParams(params);
+        }
         params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
 
         params.set(COMMIT_END_POINT, "replicas");
@@ -219,14 +226,21 @@ public class DistributedZkUpdateProcessor extends 
DistributedUpdateProcessor {
           params.set(DISTRIB_FROM, ZkCoreNodeProps.getCoreUrl(
               zkController.getBaseUrl(), req.getCore().getName()));
 
+          // NOTE: distribCommit(...) internally calls `blockAndDoRetries()`, 
flushing any TOLEADER distrib commits
           cmdDistrib.distribCommit(cmd, useNodes, params);
+          issuedDistribCommit = true;
         }
 
         doLocalCommit(cmd);
-
-        if (useNodes != null) {
-          cmdDistrib.blockAndDoRetries();
-        }
+      }
+      if (issuedDistribCommit) {
+        // TODO: according to discussion on SOLR-15045, this call (and all 
tracking of `issuedDistribCommit`) may
+        //  well be superfluous, and can probably simply be removed. It is 
left in place for now, intentionally
+        //  punting on the question of whether this internal 
`blockAndDoRetries()` is necessary. At worst, its
+        //  presence is misleading; but it should be harmless, and allows the 
change fixing SOLR-15045 to be as
+        //  tightly scoped as possible, leaving the behavior of the code 
otherwise functionally equivalent (for
+        //  better or worse!)
+        cmdDistrib.blockAndDoRetries();
       }
     }
   }
diff --git 
a/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml 
b/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml
new file mode 100644
index 0000000..3e61994
--- /dev/null
+++ 
b/solr/core/src/test-files/solr/collection1/conf/solrconfig-parallel-commit.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+   Test Config for a simple Classification Update Request Processor Chain
+  -->
+<config>
+  <luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
+  <xi:include xmlns:xi="http://www.w3.org/2001/XInclude"; 
href="solrconfig.snippet.randomindexconfig.xml"/>
+  <requestHandler name="/select" class="solr.SearchHandler"></requestHandler>
+  <directoryFactory name="DirectoryFactory" 
class="${solr.directoryFactory:solr.RAMDirectoryFactory}"/>
+  <schemaFactory class="ClassicIndexSchemaFactory"/>
+  
+  <updateHandler class="solr.DirectUpdateHandler2">
+    <updateLog enable="${enable.update.log:true}">
+      <str name="dir">${solr.ulog.dir:}</str>
+    </updateLog>
+
+    <commitWithin>
+      <softCommit>${solr.commitwithin.softcommit:true}</softCommit>
+    </commitWithin>
+
+  </updateHandler>
+
+  <requestHandler name="/update" class="solr.UpdateRequestHandler">
+    <lst name="invariants">
+      <str name="update.chain">ensure-parallel-commit</str>
+    </lst>
+  </requestHandler>
+
+  <updateProcessor 
class="org.apache.solr.cloud.ParallelCommitExecutionTest$CheckFactory" 
name="check"/>
+
+  <updateRequestProcessorChain name="ensure-parallel-commit" 
post-processor="check">
+    <processor class="solr.RunUpdateProcessorFactory"/>
+  </updateRequestProcessorChain>
+</config>
diff --git 
a/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java 
b/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java
new file mode 100644
index 0000000..b92a5c6
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cloud/ParallelCommitExecutionTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cloud;
+
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.lucene.util.TestUtil;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ParallelCommitExecutionTest extends SolrCloudTestCase {
+
+  private static final String DEBUG_LABEL = 
MethodHandles.lookup().lookupClass().getName();
+  private static final String COLLECTION_NAME = DEBUG_LABEL + "_collection";
+
+  /** A basic client for operations at the cloud level, default collection 
will be set */
+  private static CloudSolrClient CLOUD_CLIENT;
+  private static int expectCount;
+
+  private static volatile CountDownLatch countdown;
+  private static final AtomicInteger countup = new AtomicInteger();
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    // multi replicas matters; for the initial parallel commit execution 
tests, only consider repFactor=1
+    final int repFactor = 1;//random().nextBoolean() ? 1 : 2;
+    final int numShards = TestUtil.nextInt(random(), 1, 4);
+    final int numNodes = (numShards * repFactor);
+    expectCount = numNodes;
+
+    final String configName = DEBUG_LABEL + "_config-set";
+    final Path configDir = Paths.get(TEST_HOME(), "collection1", "conf");
+
+    configureCluster(numNodes).addConfig(configName, configDir).configure();
+
+    Map<String, String> collectionProperties = new LinkedHashMap<>();
+    collectionProperties.put("config", "solrconfig-parallel-commit.xml");
+    collectionProperties.put("schema", "schema_latest.xml");
+    CollectionAdminRequest.createCollection(COLLECTION_NAME, configName, 
numShards, repFactor)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .setProperties(collectionProperties)
+        .process(cluster.getSolrClient());
+
+    CLOUD_CLIENT = cluster.getSolrClient();
+    CLOUD_CLIENT.setDefaultCollection(COLLECTION_NAME);
+    waitForRecoveriesToFinish(CLOUD_CLIENT);
+  }
+
+  @AfterClass
+  private static void afterClass() throws Exception {
+    if (null != CLOUD_CLIENT) {
+      CLOUD_CLIENT.close();
+      CLOUD_CLIENT = null;
+    }
+  }
+
+  private static void initSyncVars() {
+    final int ct;
+    ct = expectCount;
+    countdown = new CountDownLatch(ct);
+    countup.set(0);
+  }
+
+  @Test
+  public void testParallelOk() throws Exception {
+    initSyncVars();
+    CLOUD_CLIENT.commit(true, true);
+    assertEquals(0, countdown.getCount());
+    assertEquals(expectCount, countup.get());
+  }
+
+  public static void waitForRecoveriesToFinish(CloudSolrClient client) throws 
Exception {
+    assert null != client.getDefaultCollection();
+    
AbstractDistribZkTestBase.waitForRecoveriesToFinish(client.getDefaultCollection(),
+                                                        
client.getZkStateReader(),
+                                                        true, true, 330);
+  }
+
+  public static class CheckFactory extends UpdateRequestProcessorFactory {
+    @Override
+    public UpdateRequestProcessor getInstance(SolrQueryRequest req, 
SolrQueryResponse rsp, UpdateRequestProcessor next) {
+      return new Check(next);
+    }
+  }
+
+  public static class Check extends UpdateRequestProcessor {
+
+    public Check(UpdateRequestProcessor next) {
+      super(next);
+    }
+
+    @Override
+    public void processCommit(CommitUpdateCommand cmd) throws IOException {
+      super.processCommit(cmd);
+      countdown.countDown();
+      try {
+        // NOTE: this ensures that all commits are executed in parallel; no 
commit can complete successfully
+        // until all commits have entered the `processCommit(...)` method.
+        if (!countdown.await(5, TimeUnit.SECONDS)) {
+          throw new RuntimeException("done waiting");
+        }
+        countup.incrementAndGet();
+      } catch (InterruptedException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+  }
+}

Reply via email to