Repository: spark
Updated Branches:
  refs/heads/master 9ddd1e2ce -> 918fb9bee


[SPARK-23547][SQL] Cleanup the .pipeout file when the Hive Session closed

## What changes were proposed in this pull request?

![2018-03-07_121010](https://user-images.githubusercontent.com/24823338/37073232-922e10d2-2200-11e8-8172-6e03aa984b39.png)

when the hive session closed, we should also cleanup the .pipeout file.

## How was this patch tested?

Added test cases.

Author: zuotingbing <zuo.tingbi...@zte.com.cn>

Closes #20702 from zuotingbing/SPARK-23547.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/918fb9be
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/918fb9be
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/918fb9be

Branch: refs/heads/master
Commit: 918fb9beee6a2fd499b8f18dfe0d460f078f5290
Parents: 9ddd1e2
Author: zuotingbing <zuo.tingbi...@zte.com.cn>
Authored: Tue Mar 13 11:31:32 2018 -0700
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Mar 13 11:31:32 2018 -0700

----------------------------------------------------------------------
 .../service/cli/session/HiveSessionImpl.java    | 18 +++++++++++
 .../thriftserver/HiveThriftServer2Suites.scala  | 32 +++++++++++++++++++-
 2 files changed, 49 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/918fb9be/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index fc818bc..f59cdcd 100644
--- 
a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ 
b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -641,6 +641,8 @@ public class HiveSessionImpl implements HiveSession {
       opHandleSet.clear();
       // Cleanup session log directory.
       cleanupSessionLogDir();
+      // Cleanup pipeout file.
+      cleanupPipeoutFile();
       HiveHistory hiveHist = sessionState.getHiveHistory();
       if (null != hiveHist) {
         hiveHist.closeStream();
@@ -665,6 +667,22 @@ public class HiveSessionImpl implements HiveSession {
     }
   }
 
+  private void cleanupPipeoutFile() {
+    String lScratchDir = hiveConf.getVar(ConfVars.LOCALSCRATCHDIR);
+    String sessionID = hiveConf.getVar(ConfVars.HIVESESSIONID);
+
+    File[] fileAry = new File(lScratchDir).listFiles(
+            (dir, name) -> name.startsWith(sessionID) && 
name.endsWith(".pipeout"));
+
+    for (File file : fileAry) {
+      try {
+        FileUtils.forceDelete(file);
+      } catch (Exception e) {
+        LOG.error("Failed to cleanup pipeout file: " + file, e);
+      }
+    }
+  }
+
   private void cleanupSessionLogDir() {
     if (isOperationLogEnabled) {
       try {

http://git-wip-us.apache.org/repos/asf/spark/blob/918fb9be/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index b32c547..192f33a 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.sql.hive.thriftserver
 
-import java.io.File
+import java.io.{File, FilenameFilter}
 import java.net.URL
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, DriverManager, SQLException, Statement}
+import java.util.UUID
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
@@ -613,6 +614,28 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftJdbcTest {
       bufferSrc.close()
     }
   }
+
+  test("SPARK-23547 Cleanup the .pipeout file when the Hive Session closed") {
+    def pipeoutFileList(sessionID: UUID): Array[File] = {
+      lScratchDir.listFiles(new FilenameFilter {
+        override def accept(dir: File, name: String): Boolean = {
+          name.startsWith(sessionID.toString) && name.endsWith(".pipeout")
+        }
+      })
+    }
+
+    withCLIServiceClient { client =>
+      val user = System.getProperty("user.name")
+      val sessionHandle = client.openSession(user, "")
+      val sessionID = sessionHandle.getSessionId
+
+      assert(pipeoutFileList(sessionID).length == 1)
+
+      client.closeSession(sessionHandle)
+
+      assert(pipeoutFileList(sessionID).length == 0)
+    }
+  }
 }
 
 class SingleSessionSuite extends HiveThriftJdbcTest {
@@ -807,6 +830,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite 
with BeforeAndAfterAl
   private val pidDir: File = Utils.createTempDir(namePrefix = 
"thriftserver-pid")
   protected var logPath: File = _
   protected var operationLogPath: File = _
+  protected var lScratchDir: File = _
   private var logTailingProcess: Process = _
   private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]
 
@@ -844,6 +868,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite 
with BeforeAndAfterAl
        |  --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
        |  --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=$mode
        |  --hiveconf 
${ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}=$operationLogPath
+       |  --hiveconf ${ConfVars.LOCALSCRATCHDIR}=$lScratchDir
        |  --hiveconf $portConf=$port
        |  --driver-class-path $driverClassPath
        |  --driver-java-options -Dlog4j.debug
@@ -873,6 +898,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite 
with BeforeAndAfterAl
     metastorePath.delete()
     operationLogPath = Utils.createTempDir()
     operationLogPath.delete()
+    lScratchDir = Utils.createTempDir()
+    lScratchDir.delete()
     logPath = null
     logTailingProcess = null
 
@@ -956,6 +983,9 @@ abstract class HiveThriftServer2Test extends SparkFunSuite 
with BeforeAndAfterAl
     operationLogPath.delete()
     operationLogPath = null
 
+    lScratchDir.delete()
+    lScratchDir = null
+
     Option(logPath).foreach(_.delete())
     logPath = null
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to