murblanc commented on a change in pull request #1504:
URL: https://github.com/apache/lucene-solr/pull/1504#discussion_r427921563



##########
File path: 
solr/solrj/src/java/org/apache/solr/client/solrj/cloud/autoscaling/PolicyHelper.java
##########
@@ -429,87 +440,124 @@ private void release(SessionWrapper sessionWrapper) {
      * The session can be used by others while the caller is performing 
operations
      */
     private void returnSession(SessionWrapper sessionWrapper) {
-      TimeSource timeSource = sessionWrapper.session != null ? 
sessionWrapper.session.cloudManager.getTimeSource() : TimeSource.NANO_TIME;
+      boolean present;
       synchronized (lockObj) {
         sessionWrapper.status = Status.EXECUTING;
-        if (log.isDebugEnabled()) {
-          log.debug("returnSession, curr-time {} sessionWrapper.createTime {}, 
this.sessionWrapper.createTime {} "
-              , time(timeSource, MILLISECONDS),
-              sessionWrapper.createTime,
-              this.sessionWrapper.createTime);
-        }
-        if (sessionWrapper.createTime == this.sessionWrapper.createTime) {
-          //this session was used for computing new operations and this can 
now be used for other
-          // computing
-          this.sessionWrapper = sessionWrapper;
+        present = sessionWrapperSet.contains(sessionWrapper);
 
-          //one thread who is waiting for this need to be notified.
-          lockObj.notify();
-        } else {
-          log.debug("create time NOT SAME {} ", 
SessionWrapper.DEFAULT_INSTANCE.createTime);
-          //else just ignore it
-        }
+        // wake up single thread waiting for a session return (ok if not woken 
up, wait is short)
+        lockObj.notify();
       }
 
+      // Logging
+      if (present) {
+        if (log.isDebugEnabled()) {
+          log.debug("returnSession {}", sessionWrapper.getCreateTime());
+        }
+      } else {
+        log.warn("returning unknown session {} ", 
sessionWrapper.getCreateTime());
+      }
     }
 
 
-    public SessionWrapper get(SolrCloudManager cloudManager) throws 
IOException, InterruptedException {
+    public SessionWrapper get(SolrCloudManager cloudManager, boolean 
allowWait) throws IOException, InterruptedException {
       TimeSource timeSource = cloudManager.getTimeSource();
+      long oldestUpdateTimeNs = 
TimeUnit.SECONDS.convert(timeSource.getTimeNs(), TimeUnit.NANOSECONDS) - 
SESSION_EXPIRY;
+      int zkVersion = 
cloudManager.getDistribStateManager().getAutoScalingConfig().getZkVersion();
+
       synchronized (lockObj) {
-        if (sessionWrapper.status == Status.NULL ||
-            sessionWrapper.zkVersion != 
cloudManager.getDistribStateManager().getAutoScalingConfig().getZkVersion() ||
-            TimeUnit.SECONDS.convert(timeSource.getTimeNs() - 
sessionWrapper.lastUpdateTime, TimeUnit.NANOSECONDS) > SESSION_EXPIRY) {
-          //no session available or the session is expired
+        // If nothing in the cache can possibly work, create a new session
+        if (!hasNonExpiredSession(zkVersion, oldestUpdateTimeNs)) {
           return createSession(cloudManager);
-        } else {
+        }
+
+        // Try to find a session available right away
+        SessionWrapper sw = getAvailableSession(zkVersion, oldestUpdateTimeNs);
+
+        if (sw != null) {
+          if (log.isDebugEnabled()) {
+            log.debug("reusing session {}", sw.getCreateTime());
+          }
+          return sw;
+        } else if (allowWait) {
+          // No session available, but if we wait a bit, maybe one can become 
available
+          // wait 1 to 10 secs in case a session is returned. Random to spread 
wakeup otherwise sessions not reused
+          long waitForMs = (long) (Math.random() * 9 * 1000 + 1000);
+
+          if (log.isDebugEnabled()) {
+            log.debug("No sessions are available, all busy COMPUTING. starting 
wait of {}ms", waitForMs);
+          }
           long waitStart = time(timeSource, MILLISECONDS);
-          //the session is not expired
-          log.debug("reusing a session {}", this.sessionWrapper.createTime);
-          if (this.sessionWrapper.status == Status.UNUSED || 
this.sessionWrapper.status == Status.EXECUTING) {
-            this.sessionWrapper.status = Status.COMPUTING;
-            return sessionWrapper;
-          } else {
-            //status= COMPUTING it's being used for computing. computing is
-            if (log.isDebugEnabled()) {
-              log.debug("session being used. waiting... current time {} ", 
time(timeSource, MILLISECONDS));
-            }
-            try {
-              lockObj.wait(10 * 1000);//wait for a max of 10 seconds
-            } catch (InterruptedException e) {
-              log.info("interrupted... ");
-            }
+          try {
+            lockObj.wait(waitForMs);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+
+          if (log.isDebugEnabled()) {
+            log.debug("out of waiting. wait of {}ms, actual time elapsed 
{}ms", waitForMs, timeElapsed(timeSource, waitStart, MILLISECONDS));
+          }
+
+          // Try again to find an available session
+          sw = getAvailableSession(zkVersion, oldestUpdateTimeNs);
+
+          if (sw != null) {
             if (log.isDebugEnabled()) {
-              log.debug("out of waiting curr-time:{} time-elapsed {}"
-                  , time(timeSource, MILLISECONDS), timeElapsed(timeSource, 
waitStart, MILLISECONDS));
-            }
-            // now this thread has woken up because it got timed out after 10 
seconds or it is notified after
-            // the session was returned from another COMPUTING operation
-            if (this.sessionWrapper.status == Status.UNUSED || 
this.sessionWrapper.status == Status.EXECUTING) {
-              log.debug("Wait over. reusing the existing session ");
-              this.sessionWrapper.status = Status.COMPUTING;
-              return sessionWrapper;
-            } else {
-              //create a new Session
-              return createSession(cloudManager);
+              log.debug("Wait over. reusing an existing session {}", 
sw.getCreateTime());
             }
+            return sw;
+          } else {
+            return createSession(cloudManager);
           }
+        } else {
+          return createSession(cloudManager);
         }
       }
     }
 
+    /**
+     * Returns an available session from the cache (the best one once cache 
strategies are defined), or null if no session
+     * from the cache is available (i.e. all are still COMPUTING, are too old, 
wrong zk version or the cache is empty).<p>
+     * This method must be called while holding the monitor on {@link 
#lockObj}.<p>
+     * The method updates the session status to computing.
+     */
+    private SessionWrapper getAvailableSession(int zkVersion, long 
oldestUpdateTimeNs) {
+      for (SessionWrapper sw : sessionWrapperSet) {
+        if (sw.status == Status.EXECUTING && sw.getLastUpdateTime() >= 
oldestUpdateTimeNs && sw.zkVersion == zkVersion) {
+          sw.status = Status.COMPUTING;
+          return sw;
+        }
+      }
+      return null;
+    }
+
+    /**
+     * Returns true if there's a session in the cache that could be returned 
(if it was free). This is required to
+     * know if there's any point in waiting or if a new session should better 
be created right away.
+     */
+    private boolean hasNonExpiredSession(int zkVersion, long 
oldestUpdateTimeNs) {
+      for (SessionWrapper sw : sessionWrapperSet) {
+        if (sw.getLastUpdateTime() >= oldestUpdateTimeNs && sw.zkVersion == 
zkVersion) {
+          return true;
+        }
+      }
+      return false;
+    }
+
     private SessionWrapper createSession(SolrCloudManager cloudManager) throws 
InterruptedException, IOException {
-      synchronized (lockObj) {
+      if (log.isDebugEnabled()) {

Review comment:
       From a timing perspective you're right. I was trying to minimize non 
necessary activity done while holding the lock. I'll put the log back inside 
the synchronized block since the logging delay is negligible compared to the 
wait for a session to become free.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@lucene.apache.org
For additional commands, e-mail: issues-h...@lucene.apache.org

Reply via email to