bruno-roustant commented on code in PR #2548:
URL: https://github.com/apache/solr/pull/2548#discussion_r1663930370


##########
solr/core/src/java/org/apache/solr/update/UpdateLocks.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.util.IOFunction;
+
+/**
+ * Locks associated with updates in connection with the {@link UpdateLog}.
+ *
+ * @lucene.internal
+ */
+public class UpdateLocks {
+  // names are legacy oriented; TODO rename and use EnvUtils
+  private static final String SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS =
+      "bucketVersionLockTimeoutMs";
+  public static final int DEFAULT_TIMEOUT =
+      
Integer.parseInt(System.getProperty(SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS, 
"0"));
+
+  private final long docLockTimeoutMs;
+
+  private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+  private final ConcurrentHashMap<BytesRef, LockAndCondition> idToLock = new 
ConcurrentHashMap<>();
+
+  public UpdateLocks(long docLockTimeoutMs) {
+    this.docLockTimeoutMs = docLockTimeoutMs;
+  }
+
+  public <R> R runWithLock(BytesRef id, IOFunction<Condition, R> function) 
throws IOException {
+    final var startTimeNanos = System.nanoTime();
+
+    lockForUpdate();
+    try {
+
+      LockAndCondition lock;
+      while (true) {
+        lock = idToLock.computeIfAbsent(id, (k) -> new LockAndCondition());
+        synchronized (lock) {
+          if (lock.refCount < 0) {
+            continue; // is being removed; try again
+          }
+          lock.refCount++;
+          break;
+        }
+      }
+      // try-finally ensuring we decrement the refCount
+      try {
+        return runWithLockInternal(id, function, lock, startTimeNanos);
+      } finally {
+        synchronized (lock) {
+          assert lock.refCount > 0; // because we incremented it
+          lock.refCount--;
+          if (lock.refCount == 0) {
+            idToLock.remove(id);
+          }
+        }
+      }
+
+    } finally {
+      unlockForUpdate();
+    }
+  }
+
+  private <R> R runWithLockInternal(
+      BytesRef id, IOFunction<Condition, R> function, LockAndCondition lock, 
long startTimeNanos)
+      throws IOException {
+    // Acquire the lock
+    try {
+      if (docLockTimeoutMs == 0) {
+        lock.lock.lockInterruptibly();
+      } else {
+        long remainingNs =
+            TimeUnit.MILLISECONDS.toNanos(docLockTimeoutMs) - 
(System.nanoTime() - startTimeNanos);
+        boolean timedOut =
+            !lock.lock.tryLock(remainingNs - System.nanoTime(), 
TimeUnit.NANOSECONDS);

Review Comment:
   Typo: System.nanoTime() subtracted a second time.



##########
solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java:
##########
@@ -652,35 +631,35 @@ private long doWaitForDependentUpdates(
       AddUpdateCommand cmd,
       long versionOnUpdate,
       boolean isReplayOrPeersync,
-      VersionBucket bucket,
+      Condition condition,
       TimeOut waitTimeout) {
-    long lastFoundVersion;
-    try {
-      Long lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
-      lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
+    Long lookedUpVersion = vinfo.lookupVersion(cmd.getIndexedId());
+    long lastFoundVersion = lookedUpVersion == null ? 0L : lookedUpVersion;
 
-      if (Math.abs(lastFoundVersion) < cmd.prevVersion) {
-        if (log.isDebugEnabled()) {
-          log.debug(
-              "Re-ordered inplace update. version={}, prevVersion={}, 
lastVersion={}, replayOrPeerSync={}, id={}",
-              (cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()),
-              cmd.prevVersion,
-              lastFoundVersion,
-              isReplayOrPeersync,
-              cmd.getPrintableId());
-        }
+    if (Math.abs(lastFoundVersion) < cmd.prevVersion) {
+      if (log.isDebugEnabled()) {
+        log.debug(
+            "Re-ordered inplace update. version={}, prevVersion={}, 
lastVersion={}, replayOrPeerSync={}, id={}",
+            (cmd.getVersion() == 0 ? versionOnUpdate : cmd.getVersion()),
+            cmd.prevVersion,
+            lastFoundVersion,
+            isReplayOrPeersync,
+            cmd.getPrintableId());
       }
+    }
 
-      while (Math.abs(lastFoundVersion) < cmd.prevVersion && 
!waitTimeout.hasTimedOut()) {
-        long timeLeftInNanos = waitTimeout.timeLeft(TimeUnit.NANOSECONDS);
-        if (timeLeftInNanos > 0) { // 0 means: wait forever until notified, 
but we don't want that.
-          bucket.awaitNanos(timeLeftInNanos);
+    while (Math.abs(lastFoundVersion) < cmd.prevVersion && 
!waitTimeout.hasTimedOut()) {
+      long timeLeftInNanos = waitTimeout.timeLeft(TimeUnit.NANOSECONDS);
+      if (timeLeftInNanos > 0) { // 0 means: wait forever until notified, but 
we don't want that.
+        try {
+          condition.await(timeLeftInNanos, TimeUnit.NANOSECONDS);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();

Review Comment:
   This exception should stop the request. I think we should not set the 
interrupt flag, unless it is handled by the callers? It was not set before in 
VersionBucket.



##########
solr/core/src/java/org/apache/solr/update/UpdateLocks.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.util.IOFunction;
+
+/**
+ * Locks associated with updates in connection with the {@link UpdateLog}.
+ *
+ * @lucene.internal
+ */
+public class UpdateLocks {
+  // names are legacy oriented; TODO rename and use EnvUtils
+  private static final String SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS =
+      "bucketVersionLockTimeoutMs";
+  public static final int DEFAULT_TIMEOUT =
+      
Integer.parseInt(System.getProperty(SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS, 
"0"));
+
+  private final long docLockTimeoutMs;
+
+  private final ReadWriteLock lock = new ReentrantReadWriteLock(true);

Review Comment:
   Maybe rename to blockUpdatesLock for clarity of the purpose?



##########
solr/core/src/java/org/apache/solr/update/UpdateLocks.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.update;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.lucene.util.BytesRef;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrException.ErrorCode;
+import org.apache.solr.util.IOFunction;
+
+/**
+ * Locks associated with updates in connection with the {@link UpdateLog}.
+ *
+ * @lucene.internal
+ */
+public class UpdateLocks {
+  // names are legacy oriented; TODO rename and use EnvUtils
+  private static final String SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS =
+      "bucketVersionLockTimeoutMs";
+  public static final int DEFAULT_TIMEOUT =
+      
Integer.parseInt(System.getProperty(SYS_PROP_BUCKET_VERSION_LOCK_TIMEOUT_MS, 
"0"));
+
+  private final long docLockTimeoutMs;
+
+  private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+  private final ConcurrentHashMap<BytesRef, LockAndCondition> idToLock = new 
ConcurrentHashMap<>();
+
+  public UpdateLocks(long docLockTimeoutMs) {
+    this.docLockTimeoutMs = docLockTimeoutMs;
+  }
+
+  public <R> R runWithLock(BytesRef id, IOFunction<Condition, R> function) 
throws IOException {
+    final var startTimeNanos = System.nanoTime();
+
+    lockForUpdate();
+    try {
+
+      LockAndCondition lock;
+      while (true) {
+        lock = idToLock.computeIfAbsent(id, (k) -> new LockAndCondition());
+        synchronized (lock) {
+          if (lock.refCount < 0) {

Review Comment:
   In which conditions could it be < 0?



##########
solr/core/src/test/org/apache/solr/update/processor/DistributedUpdateProcessorTest.java:
##########
@@ -220,4 +185,45 @@ protected boolean tryLock(int lockTimeoutMs) {
       return succeeded;
     }
   }
+
+  private static DistributedUpdateProcessor newDurp(SolrQueryRequest req, long 
lockTimeoutMs) {
+    if (lockTimeoutMs <= 0) {
+      // default
+      return new DistributedUpdateProcessor(req, null, null);
+    }
+    // customize UpdateLocks with the provided timeout.  And simulate docs 
taking longer to index
+    final var sleepMs = 5000;

Review Comment:
   Could it be sleepMs = lockTimeoutMs + 1000 ?



##########
solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java:
##########
@@ -202,6 +202,10 @@ public DistributedUpdateProcessor(
     // this.rsp = reqInfo != null ? reqInfo.getRsp() : null;
   }
 
+  protected UpdateLocks getUpdateLocks() {

Review Comment:
   Maybe @VisibleForTesting ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to