This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6efbcae38d6 HIVE-26564: Separate query live operation log and 
historical operation log (#3621)
6efbcae38d6 is described below

commit 6efbcae38d6ef201eab6c5a4e425ac771b9cec12
Author: yigress <[email protected]>
AuthorDate: Thu Oct 6 15:57:04 2022 -0700

    HIVE-26564: Separate query live operation log and historical operation log 
(#3621)
---
 .../java/org/apache/hadoop/hive/ql/QueryInfo.java  |   8 +-
 .../hive/service/cli/operation/Operation.java      |  11 +-
 .../service/cli/operation/OperationLogManager.java | 230 ++++++---------------
 .../hive/service/cli/operation/SQLOperation.java   |   3 +-
 .../hive/service/cli/session/SessionManager.java   |   2 +-
 .../cli/operation/TestOperationLogManager.java     |  49 ++---
 .../TestQueryLifeTimeHooksWithSQLOperation.java    |   5 +
 .../cli/operation/TestSQLOperationMetrics.java     |   5 +
 8 files changed, 107 insertions(+), 206 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryInfo.java 
b/ql/src/java/org/apache/hadoop/hive/ql/QueryInfo.java
index 376037a241d..ee2bc77f8ff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryInfo.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryInfo.java
@@ -25,6 +25,7 @@ public class QueryInfo {
   private final String userName;
   private final String executionEngine;
   private final long beginTime;
+  private final String sessionId;
   private final String operationId;
   private Long runtime;  // tracks only running portion of the query.
 
@@ -34,11 +35,12 @@ public class QueryInfo {
 
   private String operationLogLocation;
 
-  public QueryInfo(String state, String userName, String executionEngine, 
String operationId) {
+  public QueryInfo(String state, String userName, String executionEngine, 
String sessionId, String operationId) {
     this.state = state;
     this.userName = userName;
     this.executionEngine = executionEngine;
     this.beginTime = System.currentTimeMillis();
+    this.sessionId = sessionId;
     this.operationId = operationId;
   }
 
@@ -86,6 +88,10 @@ public class QueryInfo {
     this.state = state;
   }
 
+  public String getSessionId() {
+    return sessionId;
+  }
+
   public String getOperationId() {
     return operationId;
   }
diff --git 
a/service/src/java/org/apache/hive/service/cli/operation/Operation.java 
b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
index 91dfa955f3b..b48809e8f7b 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
@@ -226,7 +226,8 @@ public abstract class Operation {
 
   protected void createOperationLog() {
     if (parentSession.isOperationLogEnabled()) {
-      operationLog = OperationLogManager.createOperationLog(this, queryState);
+      File operationLogFile = new 
File(parentSession.getOperationLogSessionDir(), queryState.getQueryId());
+      operationLog = new OperationLog(opHandle.toString(), operationLogFile, 
parentSession.getHiveConf());
       isOperationLogEnabled = true;
     }
   }
@@ -287,8 +288,10 @@ public abstract class Operation {
   private static class OperationLogCleaner implements Runnable {
     public static final Logger LOG = 
LoggerFactory.getLogger(OperationLogCleaner.class.getName());
     private OperationLog operationLog;
+    private Operation operation;
 
-    public OperationLogCleaner(OperationLog operationLog) {
+    public OperationLogCleaner(Operation operation, OperationLog operationLog) 
{
+      this.operation = operation;
       this.operationLog = operationLog;
     }
 
@@ -297,6 +300,7 @@ public abstract class Operation {
       if (operationLog != null) {
         LOG.info("Closing operation log {}", operationLog);
         operationLog.close();
+        OperationLogManager.closeOperation(operation);
       }
     }
   }
@@ -314,12 +318,13 @@ public abstract class Operation {
       } else {
         if (operationLogCleanupDelayMs > 0) {
           ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(1);
-          scheduledExecutorService.schedule(new 
OperationLogCleaner(operationLog), operationLogCleanupDelayMs,
+          scheduledExecutorService.schedule(new OperationLogCleaner(this, 
operationLog), operationLogCleanupDelayMs,
             TimeUnit.MILLISECONDS);
           scheduledExecutorService.shutdown();
         } else {
           log.info("Closing operation log {} without delay", operationLog);
           operationLog.close();
+          OperationLogManager.closeOperation(this);
         }
       }
     }
diff --git 
a/service/src/java/org/apache/hive/service/cli/operation/OperationLogManager.java
 
b/service/src/java/org/apache/hive/service/cli/operation/OperationLogManager.java
index 114f247c4f0..04543581316 100644
--- 
a/service/src/java/org/apache/hive/service/cli/operation/OperationLogManager.java
+++ 
b/service/src/java/org/apache/hive/service/cli/operation/OperationLogManager.java
@@ -22,19 +22,16 @@ import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileFilter;
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.stream.Collectors;
 import org.apache.hadoop.hive.common.ServerUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,11 +39,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.QueryInfo;
-import org.apache.hadoop.hive.ql.QueryState;
-import org.apache.hadoop.hive.ql.session.OperationLog;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.service.cli.OperationHandle;
-import org.apache.hive.service.cli.session.HiveSession;
 import org.apache.hive.service.cli.session.HiveSessionImpl;
 import org.apache.hive.service.cli.session.SessionManager;
 
@@ -81,16 +74,14 @@ public class OperationLogManager {
   private static long maxBytesToFetch;
 
   private final HiveConf hiveConf;
-  private final SessionManager sessionManager;
   private final OperationManager operationManager;
   private OperationLogDirCleaner cleaner;
   private String historicParentLogDir;
   private String serverInstance;
 
-  public OperationLogManager(SessionManager sessionManager, HiveConf hiveConf) 
{
-    this.operationManager = sessionManager.getOperationManager();
+  public OperationLogManager(OperationManager operationManager, HiveConf 
hiveConf) {
+    this.operationManager = operationManager;
     this.hiveConf = hiveConf;
-    this.sessionManager = sessionManager;
     if (HiveConf.getBoolVar(hiveConf, 
HiveConf.ConfVars.HIVE_SERVER2_HISTORIC_OPERATION_LOG_ENABLED)
         && 
hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)
         && hiveConf.isWebUiQueryInfoCacheEnabled()) {
@@ -143,168 +134,39 @@ public class OperationLogManager {
     historicLogRootDir = logRootDir;
   }
 
-  public static OperationLog createOperationLog(Operation operation, 
QueryState queryState) {
-    HiveSession session = operation.getParentSession();
-    File parentFile = session.getOperationLogSessionDir();
-    boolean isHistoricLogEnabled = historicLogRootDir != null;
-    if (isHistoricLogEnabled && operation instanceof SQLOperation) {
-      String sessionId = 
session.getSessionHandle().getHandleIdentifier().toString();
-      parentFile = new File(historicLogRootDir + "/" + sessionId);
-      if (!parentFile.exists()) {
-        if (!parentFile.mkdirs()) {
-          LOG.warn("Unable to create the historic operation log session dir: " 
+ parentFile +
-              ", fall back to the original operation log session dir.");
-          parentFile = session.getOperationLogSessionDir();
-          isHistoricLogEnabled = false;
-        }
-      } else if (!parentFile.isDirectory()) {
-        LOG.warn("The historic operation log session dir: " + parentFile + " 
is exist, but it's not a directory, " +
-            "fall back to the original operation log session dir.");
-        parentFile = session.getOperationLogSessionDir();
-        isHistoricLogEnabled = false;
-      }
-    }
-
-    OperationHandle opHandle = operation.getHandle();
-    File operationLogFile = new File(parentFile, queryState.getQueryId());
-    OperationLog operationLog;
-    HiveConf.setBoolVar(queryState.getConf(),
-        HiveConf.ConfVars.HIVE_SERVER2_HISTORIC_OPERATION_LOG_ENABLED, 
isHistoricLogEnabled);
-    if (isHistoricLogEnabled) {
-      // dynamically setting the log location to route the operation log
-      HiveConf.setVar(queryState.getConf(),
-          HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION, 
historicLogRootDir);
-      if (HiveConf.getBoolVar(queryState.getConf(), 
HiveConf.ConfVars.HIVE_IN_TEST)) {
-        HiveConf.setBoolVar(queryState.getConf(), 
HiveConf.ConfVars.HIVE_TESTING_REMOVE_LOGS, false);
-      }
-      LOG.info("The operation log location changes from {} to {}.", new 
File(session.getOperationLogSessionDir(),
-          queryState.getQueryId()), operationLogFile);
-    }
-    operationLog = new OperationLog(opHandle.toString(), operationLogFile, 
queryState.getConf());
-    return operationLog;
-  }
-
-  private Set<String> getLiveSessions() {
-    Collection<HiveSession> hiveSessions = sessionManager.getSessions();
-    Set<String> liveSessions = new HashSet<>();
-    for (HiveSession session : hiveSessions) {
-      
liveSessions.add(session.getSessionHandle().getHandleIdentifier().toString());
+  // Delete historical query logs that are not in use by Web UI.
+  public void deleteHistoricQueryLogs() {
+    if (historicLogRootDir == null) {
+      return;
     }
-    return liveSessions;
-  }
-
-  private Set<String> getHistoricSessions() {
-    assert historicLogRootDir != null;
     File logDir = new File(historicLogRootDir);
-    Set<String> results = new HashSet<>();
-    if (logDir.exists() && logDir.isDirectory()) {
-      File[] subFiles = logDir.listFiles();
-      if (subFiles != null) {
-        for (File f : subFiles) {
-          results.add(f.getName());
-        }
-      }
-    }
-    return results;
-  }
-
-
-  @VisibleForTesting
-  public List<File> getExpiredOperationLogFiles() {
-    if (historicLogRootDir == null) {
-      return Collections.emptyList();
+    if (!logDir.exists() || !logDir.isDirectory()) {
+      return;
     }
-
-    List<File> results = new ArrayList<>();
-    Collection<File> files = FileUtils.listFiles(new File(historicLogRootDir)
-        , null, true);
-    Set<String> queryIds = operationManager.getAllCachedQueryIds();
-    for (File logFile : files) {
-      if (queryIds.contains(logFile.getName())) {
-        continue;
-      }
-      // if the query info is not cached,
-      // add the corresponding historic operation log file into the results.
-      results.add(logFile);
+    File[] subDirs = logDir.listFiles();
+    if (subDirs == null || subDirs.length==0) {
+      return;
     }
-    return results;
-  }
 
-  @VisibleForTesting
-  public List<File> getExpiredSessionLogDirs() {
-    if (historicLogRootDir == null) {
-      return Collections.emptyList();
-    }
-    List<File> results = new ArrayList<>();
-    // go through the original log root dir and historic log root dir for dead 
sessions
-    Set<String> liveSessions = getLiveSessions();
-    Set<String> historicSessions = getHistoricSessions();
-    historicSessions.removeAll(liveSessions);
-    Set<String> queryIds = operationManager.getAllCachedQueryIds();
-    // add the historic log session dir into the results if the session is 
dead and
-    // no historic operation log under the dir
-    for (String sessionId : historicSessions) {
-      File sessionLogDir = new File(historicLogRootDir, sessionId);
-      if (sessionLogDir.exists()) {
-        File[] logFiles = sessionLogDir.listFiles();
-        if (logFiles == null || logFiles.length == 0) {
-          results.add(sessionLogDir);
-        } else {
-          boolean found = false;
-          for (File logFile : logFiles) {
-            if (queryIds.contains(logFile.getName())) {
-              found = true;
-              break;
+    Set<String> sessionIds = 
operationManager.getHistoricalQueryInfos().stream()
+        .map(QueryInfo::getSessionId).collect(Collectors.toSet());
+    Set<String> queryIds = operationManager.getHistoricalQueryInfos().stream()
+        .map(queryInfo -> 
queryInfo.getQueryDisplay().getQueryId()).collect(Collectors.toSet());
+
+    for (File dir : subDirs) {
+      if (dir.isDirectory()) {
+        if (sessionIds.contains(dir.getName())) {
+          for (File f : dir.listFiles()) {
+            if (!queryIds.contains(f.getName()) ) {
+              LOG.debug("delete file not in hist: " + f.getName());
+              FileUtils.deleteQuietly(f);
             }
           }
-          if (!found) {
-            results.add(sessionLogDir);
-          }
+        } else {
+          FileUtils.deleteQuietly(dir);
         }
       }
     }
-    return results;
-  }
-
-  private List<String> getFileNames(List<File> fileList) {
-    List<String> results = new ArrayList<>();
-    for (File file : fileList) {
-      results.add(file.getName());
-    }
-    return results;
-  }
-
-  @VisibleForTesting
-  public void removeExpiredOperationLogAndDir() {
-    if (historicLogRootDir == null) {
-      return;
-    }
-    // remove the expired operation logs firstly
-    List<File> operationLogFiles = getExpiredOperationLogFiles();
-    if (operationLogFiles.isEmpty()) {
-      LOG.info("No expired operation logs found under the dir: {}", 
historicLogRootDir);
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Trying to delete the expired operation logs: {} ", 
getFileNames(operationLogFiles));
-      }
-      for (File logFile : operationLogFiles) {
-        FileUtils.deleteQuietly(logFile);
-      }
-      LOG.info("Deleted {} expired operation logs", operationLogFiles.size());
-    }
-    // remove the historic operation log session dirs
-    List<File> sessionLogDirs = getExpiredSessionLogDirs();
-    if (sessionLogDirs.isEmpty()) {
-      LOG.info("No expired operation log session dir under the dir: {}", 
historicLogRootDir);
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Trying to delete the expired operation log session dirs: {} 
", getFileNames(sessionLogDirs));
-      }
-      for (File logDir : sessionLogDirs) {
-        FileUtils.deleteQuietly(logDir);
-      }
-      LOG.info("Deleted {} expired operation log session dirs", 
sessionLogDirs.size());
-    }
   }
 
   // delete the older historic log root dirs on restart
@@ -346,7 +208,7 @@ public class OperationLogManager {
       sleepFor(interval);
       while (!shutdown) {
         try {
-          removeExpiredOperationLogAndDir();
+          deleteHistoricQueryLogs();
           sleepFor(interval);
         } catch (Exception e) {
           LOG.warn("OperationLogDir cleaner caught exception: " + 
e.getMessage(), e);
@@ -389,13 +251,41 @@ public class OperationLogManager {
     return logLocation.startsWith(historicLogRootDir);
   }
 
+  public static void closeOperation(Operation operation) {
+    String queryId = operation.getQueryId();
+    File originOpLogFile = new 
File(operation.parentSession.getOperationLogSessionDir(), queryId);
+    if (!originOpLogFile.exists()) {
+      return;
+    }
+    HiveConf hiveConf = operation.queryState.getConf();
+    if 
(hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_HISTORIC_OPERATION_LOG_ENABLED)
+        && operation instanceof SQLOperation) {
+      String sessionHandle = 
operation.getParentSession().getSessionHandle().getHandleIdentifier().toString();
+      String histOpLogFileLocation = new StringBuilder(historicLogRootDir)
+          .append("/").append(sessionHandle)
+          .append("/").append(queryId).toString();
+      try {
+        FileUtils.moveFile(originOpLogFile, new File(histOpLogFileLocation));
+        QueryInfo queryInfo = ((SQLOperation) operation).getQueryInfo();
+        queryInfo.setOperationLogLocation(histOpLogFileLocation);
+        LOG.info("The operation log location changes from {} to {}.", 
originOpLogFile, histOpLogFileLocation);
+      } catch (IOException e) {
+        LOG.error("Failed to move operation log location from {} to {}: {}.",
+            originOpLogFile, histOpLogFileLocation, e.getMessage());
+      }
+    } else {
+      FileUtils.deleteQuietly(originOpLogFile);
+    }
+    LOG.debug(queryId + ": closeOperation");
+  }
+
   public static String getOperationLog(QueryInfo queryInfo) {
     String logLocation = queryInfo.getOperationLogLocation();
     StringBuilder builder = new StringBuilder();
-    if (!isHistoricOperationLogEnabled(logLocation)) {
-      if (logLocation == null) {
-        return "Operation log is disabled, please set 
hive.server2.logging.operation.enabled = true to enable it";
-      }
+    if (logLocation == null) {
+      return "Operation log is disabled, please set 
hive.server2.logging.operation.enabled = true to enable it";
+    }
+    if (historicLogRootDir == null) {
       builder.append("Operation Log - will be deleted after query completes, ")
           .append("set hive.server2.historic.operation.log.enabled = true ")
           .append("and hive.server2.webui.max.historic.queries > 0 to disable 
it")
diff --git 
a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java 
b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index 9b7b49999db..04076b29a4a 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -131,7 +131,8 @@ public class SQLOperation extends ExecuteStatementOperation 
{
     setupSessionIO(parentSession.getSessionState());
 
     queryInfo = new QueryInfo(getState().toString(), 
getParentSession().getUserName(),
-            getExecutionEngine(), 
getHandle().getHandleIdentifier().toString());
+        getExecutionEngine(), 
getParentSession().getSessionHandle().getHandleIdentifier().toString(),
+        getHandle().getHandleIdentifier().toString());
 
     final Metrics metrics = MetricsFactory.getInstance();
     this.submittedQryScp =
diff --git 
a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java 
b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
index 4b81398854b..694768ae7da 100644
--- a/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
+++ b/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
@@ -285,7 +285,7 @@ public class SessionManager extends CompositeService {
         LOG.warn("Failed to schedule cleanup HS2 operation logging root dir: " 
+
             operationLogRootDir.getAbsolutePath(), e);
       }
-      logManager = Optional.of(new OperationLogManager(this, hiveConf));
+      logManager = Optional.of(new OperationLogManager(operationManager, 
hiveConf));
     }
   }
 
diff --git 
a/service/src/test/org/apache/hive/service/cli/operation/TestOperationLogManager.java
 
b/service/src/test/org/apache/hive/service/cli/operation/TestOperationLogManager.java
index 9cfcac05c35..ddb65b997fd 100644
--- 
a/service/src/test/org/apache/hive/service/cli/operation/TestOperationLogManager.java
+++ 
b/service/src/test/org/apache/hive/service/cli/operation/TestOperationLogManager.java
@@ -23,7 +23,6 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -61,6 +60,7 @@ public class TestOperationLogManager {
     HiveConf.setIntVar(hiveConf, 
HiveConf.ConfVars.HIVE_SERVER2_WEBUI_MAX_HISTORIC_QUERIES, 1);
     HiveConf.setIntVar(hiveConf, HiveConf.ConfVars.HIVE_SERVER2_WEBUI_PORT, 
8080);
     HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_IN_TEST, true);
+    HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_TESTING_REMOVE_LOGS, 
false);
     HiveConf.setVar(hiveConf, 
HiveConf.ConfVars.HIVE_SERVER2_HISTORIC_OPERATION_LOG_FETCH_MAXBYTES, "128B");
     HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, 
false);
     HiveConf.setVar(hiveConf, 
HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION,
@@ -90,26 +90,25 @@ public class TestOperationLogManager {
     Operation operation1 = 
sessionManager.getOperationManager().getOperation(opHandle1);
 
     String logLocation = operation1.getOperationLog().toString();
-    // as the historic log is enabled, the log dir of the operation1 should be 
under the historic dir
-    
assertTrue(logLocation.startsWith(OperationLogManager.getHistoricLogDir()));
+
+    assertEquals(logLocation, 
((SQLOperation)operation1).getQueryInfo().getOperationLogLocation());
 
     File operationLogFile = new File(operation1.getOperationLog().toString());
     assertTrue(operationLogFile.exists());
+
     client.closeOperation(opHandle1);
-    // now close the session1
+    String op1HistoricLogLocation = 
((SQLOperation)operation1).getQueryInfo().getOperationLogLocation();
+    File op1HistoricLogFile = new File(op1HistoricLogLocation);
+    assertTrue(op1HistoricLogFile.exists());
+
+    // check that the log of operation1 exists even if the session1 has been 
closed
     client.closeSession(session1);
-    // check that the log of operation1 is exist even if the session1 has been 
closed
-    assertTrue(operationLogFile.exists());
+    assertTrue(op1HistoricLogFile.exists());
 
     SessionHandle session2 = client.openSession("user1", "foobar",
         Collections.<String, String>emptyMap());
-    OperationHandle opHandle2 = client.executeStatement(session2, "select 1 + 
1", null);
+    OperationHandle opHandle2 = client.executeStatement(session2, "select 2 + 
2", null);
     Operation operation2 = 
sessionManager.getOperationManager().getOperation(opHandle2);
-
-    // as the historic log is enabled, the log dir of the operation2 should be 
under the historic dir
-    logLocation = operation2.getOperationLog().toString();
-    
assertTrue(logLocation.startsWith(OperationLogManager.getHistoricLogDir()));
-    // remove the query info of operation1 from the cache
     client.closeOperation(opHandle2);
 
     // the operation1 becomes unreachable
@@ -118,33 +117,22 @@ public class TestOperationLogManager {
         && operationManager.getLiveQueryInfos().isEmpty());
     
assertNull(operationManager.getQueryInfo(opHandle1.getHandleIdentifier().toString()));
 
-    // now the session1 is closed and has no cached query info, the historic 
session log dir should be returned.
-    OperationLogManager logManager = sessionManager.getLogManager().get();
-    List<File> expiredLogDirs = logManager.getExpiredSessionLogDirs();
-    List<File> expiredOperationLogs = logManager.getExpiredOperationLogFiles();
-
-    assertEquals(operation1.getQueryId(), 
expiredOperationLogs.get(0).getName());
-    assertEquals(session1.getHandleIdentifier().toString(), 
expiredLogDirs.get(0).getName());
 
-    logManager.removeExpiredOperationLogAndDir();
-    // the historic session log dir has been cleanup
-    assertFalse(operationLogFile.exists());
-    assertFalse(expiredLogDirs.get(0).exists());
+    // OperationLogManager cleans up operation1's historical log, operation2's 
historical log remains.
+    OperationLogManager logManager = sessionManager.getLogManager().get();
+    logManager.deleteHistoricQueryLogs();
+    assertFalse(op1HistoricLogFile.exists());
 
     // though session2 is closed, but there exists his operation(operation2) 
in cache and
     // log file under the historic session log dir, so the historic log dir of 
session2 would not be cleaned
+    String op2LogLocation = 
((SQLOperation)operation2).getQueryInfo().getOperationLogLocation();
     client.closeSession(session2);
     
assertNotNull(operationManager.getQueryInfo(opHandle2.getHandleIdentifier().toString()));
     assertTrue(operationManager.getAllCachedQueryIds().size() == 1
         && operationManager.getLiveQueryInfos().isEmpty());
 
-    expiredOperationLogs = logManager.getExpiredOperationLogFiles();
-    expiredLogDirs = logManager.getExpiredSessionLogDirs();
-    assertTrue(expiredLogDirs.isEmpty());
-    assertTrue(expiredOperationLogs.isEmpty());
-
-    logManager.removeExpiredOperationLogAndDir();
-    assertTrue(new File(logLocation).getParentFile().exists());
+    logManager.deleteHistoricQueryLogs();
+    assertTrue(new File(op2LogLocation).exists());
     FileUtils.deleteQuietly(new File(OperationLogManager.getHistoricLogDir()));
   }
 
@@ -164,6 +152,7 @@ public class TestOperationLogManager {
     byte[] content = writeBytes(logFile, 2 * readLenght);
     operation.getQueryInfo().setOperationLogLocation(logLocation);
     String operationLog = 
OperationLogManager.getOperationLog(operation.getQueryInfo());
+    assertEquals(logLocation, 
operation.getQueryInfo().getOperationLogLocation());
     assertEquals(new String(content, content.length - readLenght, readLenght), 
operationLog);
     FileUtils.deleteQuietly(new File(OperationLogManager.getHistoricLogDir()));
   }
diff --git 
a/service/src/test/org/apache/hive/service/cli/operation/TestQueryLifeTimeHooksWithSQLOperation.java
 
b/service/src/test/org/apache/hive/service/cli/operation/TestQueryLifeTimeHooksWithSQLOperation.java
index 683f5effb81..2fd336128b0 100644
--- 
a/service/src/test/org/apache/hive/service/cli/operation/TestQueryLifeTimeHooksWithSQLOperation.java
+++ 
b/service/src/test/org/apache/hive/service/cli/operation/TestQueryLifeTimeHooksWithSQLOperation.java
@@ -26,7 +26,9 @@ import 
org.apache.hadoop.hive.ql.hooks.QueryLifeTimeHookWithParseHooks;
 import org.apache.hadoop.hive.ql.hooks.TestQueryHooks;
 import org.apache.hadoop.hive.ql.session.SessionState;
 
+import org.apache.hive.service.cli.HandleIdentifier;
 import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.SessionHandle;
 import org.apache.hive.service.cli.session.HiveSession;
 
 import org.junit.Test;
@@ -56,6 +58,9 @@ public class TestQueryLifeTimeHooksWithSQLOperation {
     HiveSession mockHiveSession = mock(HiveSession.class);
     when(mockHiveSession.getHiveConf()).thenReturn(conf);
     when(mockHiveSession.getSessionState()).thenReturn(SessionState.get());
+    SessionHandle sessionHandle = mock(SessionHandle.class);
+    when(sessionHandle.getHandleIdentifier()).thenReturn(new 
HandleIdentifier());
+    when(mockHiveSession.getSessionHandle()).thenReturn(sessionHandle);
     SQLOperation sqlOperation = new SQLOperation(mockHiveSession, QUERY, 
ImmutableMap.of(), false, 0);
     sqlOperation.run();
   }
diff --git 
a/service/src/test/org/apache/hive/service/cli/operation/TestSQLOperationMetrics.java
 
b/service/src/test/org/apache/hive/service/cli/operation/TestSQLOperationMetrics.java
index df538c607eb..5ce3d16bd51 100644
--- 
a/service/src/test/org/apache/hive/service/cli/operation/TestSQLOperationMetrics.java
+++ 
b/service/src/test/org/apache/hive/service/cli/operation/TestSQLOperationMetrics.java
@@ -24,7 +24,9 @@ import 
org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
 import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.service.cli.HandleIdentifier;
 import org.apache.hive.service.cli.OperationState;
+import org.apache.hive.service.cli.SessionHandle;
 import org.apache.hive.service.cli.session.HiveSession;
 import org.junit.After;
 import org.junit.Before;
@@ -51,6 +53,9 @@ public class TestSQLOperationMetrics {
     when(session.getHiveConf()).thenReturn(conf);
     when(session.getSessionState()).thenReturn(mock(SessionState.class));
     when(session.getUserName()).thenReturn("userName");
+    SessionHandle sessionHandle = mock(SessionHandle.class);
+    when(sessionHandle.getHandleIdentifier()).thenReturn(new 
HandleIdentifier());
+    when(session.getSessionHandle()).thenReturn(sessionHandle);
 
     operation = new SQLOperation(session, "select * from dummy",
         Maps.<String, String>newHashMap(), false, 0L);

Reply via email to