This is an automated email from the ASF dual-hosted git repository.
dmollitor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 7725d08 HIVE-23602: Use Java Concurrent Package for Operation Handle
Set (David Mollitor, reviewed by Peter Vary)
7725d08 is described below
commit 7725d08f926ee38626e9e3ee1639fea58ba6d181
Author: belugabehr <[email protected]>
AuthorDate: Thu Jun 18 18:34:45 2020 -0400
HIVE-23602: Use Java Concurrent Package for Operation Handle Set (David
Mollitor, reviewed by Peter Vary)
---
.../hive/service/cli/session/HiveSessionImpl.java | 40 ++++++++--------------
1 file changed, 14 insertions(+), 26 deletions(-)
diff --git
a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 9c7ee54..c25b63f 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.Semaphore;
@@ -114,7 +115,7 @@ public class HiveSessionImpl implements HiveSession {
private SessionManager sessionManager;
private OperationManager operationManager;
// Synchronized by locking on itself.
- private final Set<OperationHandle> opHandleSet = new
HashSet<OperationHandle>();
+ private final Set<OperationHandle> opHandleSet =
ConcurrentHashMap.newKeySet();
private boolean isOperationLogEnabled;
private File sessionLogDir;
// TODO: the control flow for this needs to be defined. Hive is supposed to
be thread-local.
@@ -719,15 +720,11 @@ public class HiveSessionImpl implements HiveSession {
}
private void addOpHandle(OperationHandle opHandle) {
- synchronized (opHandleSet) {
- opHandleSet.add(opHandle);
- }
+ opHandleSet.add(opHandle);
}
private void removeOpHandle(OperationHandle opHandle) {
- synchronized (opHandleSet) {
- opHandleSet.remove(opHandle);
- }
+ opHandleSet.remove(opHandle);
}
@Override
@@ -757,14 +754,13 @@ public class HiveSessionImpl implements HiveSession {
try {
acquire(true, false);
// Iterate through the opHandles and close their operations
- List<OperationHandle> ops = null;
- synchronized (opHandleSet) {
- ops = new ArrayList<>(opHandleSet);
- opHandleSet.clear();
- }
- for (OperationHandle opHandle : ops) {
+ List<OperationHandle> closedOps = new ArrayList<>();
+ for (OperationHandle opHandle : opHandleSet) {
operationManager.closeOperation(opHandle);
+ closedOps.add(opHandle);
}
+ opHandleSet.removeAll(closedOps);
+
// Cleanup session log directory.
cleanupSessionLogDir();
HiveHistory hiveHist = sessionState.getHiveHistory();
@@ -849,12 +845,9 @@ public class HiveSessionImpl implements HiveSession {
@Override
public void closeExpiredOperations() {
- OperationHandle[] handles;
- synchronized (opHandleSet) {
- handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]);
- }
- if (handles.length > 0) {
- List<Operation> operations =
operationManager.removeExpiredOperations(handles);
+ List<OperationHandle> handles = new ArrayList<>(opHandleSet);
+ if (!handles.isEmpty()) {
+ List<Operation> operations =
operationManager.removeExpiredOperations(handles.toArray(new
OperationHandle[0]));
if (!operations.isEmpty()) {
closeTimedOutOperations(operations);
}
@@ -863,10 +856,7 @@ public class HiveSessionImpl implements HiveSession {
@Override
public long getNoOperationTime() {
- boolean noMoreOpHandle = false;
- synchronized (opHandleSet) {
- noMoreOpHandle = opHandleSet.isEmpty();
- }
+ boolean noMoreOpHandle = opHandleSet.isEmpty();
return noMoreOpHandle && !lockedByUser ? System.currentTimeMillis() -
lastAccessTime : 0;
}
@@ -906,9 +896,7 @@ public class HiveSessionImpl implements HiveSession {
acquire(true, false);
try {
operationManager.closeOperation(opHandle);
- synchronized (opHandleSet) {
- opHandleSet.remove(opHandle);
- }
+ opHandleSet.remove(opHandle);
} finally {
release(true, false);
}