This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new de02f025ce ensures sessions are always cleaned up (#3569)
de02f025ce is described below
commit de02f025ce5b25acfb86dbad9e9b96c7115d653f
Author: Keith Turner <[email protected]>
AuthorDate: Mon Jul 10 11:27:46 2023 -0400
ensures sessions are always cleaned up (#3569)
This is a potential fix for #3512. It ensures that when a sesssions
cleanup method returns false that cleanup will be attempted again later.
---
.../accumulo/tserver/session/ScanSession.java | 4 ++
.../apache/accumulo/tserver/session/Session.java | 6 ++
.../accumulo/tserver/session/SessionManager.java | 84 +++++++++++++---------
3 files changed, 62 insertions(+), 32 deletions(-)
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
index 3217fe1b8f..0fefcc1327 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/ScanSession.java
@@ -189,4 +189,8 @@ public abstract class ScanSession extends Session
implements ScanInfo {
return true;
}
+ @Override
+ public String toString() {
+ return super.toString() + " tableId:" + getTableId();
+ }
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
index b1c14ca6e8..6e49833729 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/Session.java
@@ -49,4 +49,10 @@ public class Session {
public boolean cleanup() {
return true;
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + " " + state + " startTime:" +
startTime + " lastAccessTime:"
+ + lastAccessTime + " client:" + client;
+ }
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index b8d605ebd8..f0f8a5de2a 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -18,15 +18,22 @@
*/
package org.apache.accumulo.tserver.session;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
@@ -42,6 +49,7 @@ import
org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
import org.apache.accumulo.core.tabletserver.thrift.ScanState;
import org.apache.accumulo.core.tabletserver.thrift.ScanType;
import org.apache.accumulo.core.util.MapCounter;
+import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.tserver.scan.ScanRunState;
@@ -61,7 +69,7 @@ public class SessionManager {
private final ConcurrentMap<Long,Session> sessions = new
ConcurrentHashMap<>();
private final long maxIdle;
private final long maxUpdateIdle;
- private final List<Session> idleSessions = new ArrayList<>();
+ private final BlockingQueue<Session> deferredCleanupQueue = new
ArrayBlockingQueue<>(5000);
private final Long expiredSessionMarker = (long) -1;
private final AccumuloConfiguration aconf;
private final ServerContext ctx;
@@ -209,15 +217,39 @@ public class SessionManager {
}
if (doCleanup) {
- session.cleanup();
+ cleanup(session);
}
}
return session;
}
+ private void cleanup(Session session) {
+ if (!session.cleanup()) {
+ var retry = Retry.builder().infiniteRetries().retryAfter(25,
MILLISECONDS)
+ .incrementBy(25, MILLISECONDS).maxWait(5, SECONDS).backOffFactor(1.5)
+ .logInterval(1, MINUTES).createRetry();
+
+ while (!deferredCleanupQueue.offer(session)) {
+ if (session.cleanup()) {
+ break;
+ }
+
+ try {
+ retry.waitForNextAttempt(log, "Unable to cleanup session or defer
cleanup " + session);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ retry.logRetry(log, "Unable to cleanup session or defer cleanup " +
session);
+ }
+
+ retry.logCompletion(log, "Cleaned up session or deferred cleanup " +
session);
+ }
+ }
+
private void sweep(final long maxIdle, final long maxUpdateIdle) {
- List<Session> sessionsToCleanup = new ArrayList<>();
+ List<Session> sessionsToCleanup = new LinkedList<>();
Iterator<Session> iter = sessions.values().iterator();
while (iter.hasNext()) {
Session session = iter.next();
@@ -239,22 +271,14 @@ public class SessionManager {
}
}
- // do clean up outside of lock for TabletServer in a synchronized block
for simplicity vice a
- // synchronized list
+ // do clean up outside of lock for TabletServer
+ deferredCleanupQueue.drainTo(sessionsToCleanup);
- synchronized (idleSessions) {
- sessionsToCleanup.addAll(idleSessions);
- idleSessions.clear();
- }
+ // make a pass through and remove everything that can be cleaned up before
calling the
+ // cleanup(Session) method which may block when it can not clean up a
session.
+ sessionsToCleanup.removeIf(Session::cleanup);
- // perform cleanup for all of the sessions
- for (Session session : sessionsToCleanup) {
- if (!session.cleanup()) {
- synchronized (idleSessions) {
- idleSessions.add(session);
- }
- }
- }
+ sessionsToCleanup.forEach(this::cleanup);
}
public void removeIfNotAccessed(final long sessionId, final long delay) {
@@ -282,7 +306,7 @@ public class SessionManager {
log.info("Closing not accessed session from user=" +
session2.getUser() + ", client="
+ session2.client + ", duration=" + delay + "ms");
sessions.remove(sessionId);
- session2.cleanup();
+ cleanup(session2);
}
}
}
@@ -299,13 +323,11 @@ public class SessionManager {
Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>();
- synchronized (idleSessions) {
- /**
- * Add sessions so that get the list returned in the active scans call
- */
- for (Session session : idleSessions) {
- copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker,
session));
- }
+ /**
+ * Add sessions so that get the list returned in the active scans call
+ */
+ for (Session session : deferredCleanupQueue) {
+ copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker,
session));
}
List.of(sessions.entrySet(), copiedIdleSessions).forEach(set ->
set.forEach(entry -> {
@@ -341,13 +363,11 @@ public class SessionManager {
final long ct = System.currentTimeMillis();
final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>();
- synchronized (idleSessions) {
- /**
- * Add sessions so that get the list returned in the active scans call
- */
- for (Session session : idleSessions) {
- copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker,
session));
- }
+ /**
+ * Add sessions so that get the list returned in the active scans call
+ */
+ for (Session session : deferredCleanupQueue) {
+ copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker,
session));
}
List.of(sessions.entrySet(), copiedIdleSessions).forEach(s ->
s.forEach(entry -> {