Repository: hive
Updated Branches:
  refs/heads/branch-1 3cca2a4d8 -> 80cd2d6cc


HIVE-13429: Tool to remove dangling scratch dir (Daniel Dai, reviewed by Thejas 
Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/80cd2d6c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/80cd2d6c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/80cd2d6c

Branch: refs/heads/branch-1
Commit: 80cd2d6ccc1f4e839351732543206cb671994600
Parents: 3cca2a4
Author: Daniel Dai <[email protected]>
Authored: Mon Apr 11 15:10:58 2016 -0700
Committer: Daniel Dai <[email protected]>
Committed: Mon Apr 11 15:11:46 2016 -0700

----------------------------------------------------------------------
 bin/ext/cleardanglingscratchdir.cmd             |  35 ++++
 bin/ext/cleardanglingscratchdir.sh              |  28 +++
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +
 .../ql/session/TestClearDanglingScratchDir.java | 161 +++++++++++++++++
 .../ql/session/ClearDanglingScratchDir.java     | 176 +++++++++++++++++++
 .../hadoop/hive/ql/session/SessionState.java    |  35 +++-
 6 files changed, 433 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/80cd2d6c/bin/ext/cleardanglingscratchdir.cmd
----------------------------------------------------------------------
diff --git a/bin/ext/cleardanglingscratchdir.cmd 
b/bin/ext/cleardanglingscratchdir.cmd
new file mode 100644
index 0000000..31104af
--- /dev/null
+++ b/bin/ext/cleardanglingscratchdir.cmd
@@ -0,0 +1,35 @@
+@echo off
+@rem Licensed to the Apache Software Foundation (ASF) under one or more
+@rem contributor license agreements.  See the NOTICE file distributed with
+@rem this work for additional information regarding copyright ownership.
+@rem The ASF licenses this file to You under the Apache License, Version 2.0
+@rem (the "License"); you may not use this file except in compliance with
+@rem the License.  You may obtain a copy of the License at
+@rem
+@rem     http://www.apache.org/licenses/LICENSE-2.0
+@rem
+@rem Unless required by applicable law or agreed to in writing, software
+@rem distributed under the License is distributed on an "AS IS" BASIS,
+@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@rem See the License for the specific language governing permissions and
+@rem limitations under the License.
+
+set CLASS=org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir
+set HIVE_OPTS=
+set HADOOP_CLASSPATH=
+
+pushd %HIVE_LIB%
+for /f %%a IN ('dir /b hive-exec-*.jar') do (
+        set JAR=%HIVE_LIB%\%%a
+)
+popd
+
+if [%1]==[cleardanglingscratchdir_help] goto :cleardanglingscratchdir_help
+
+:cleardanglingscratchdir
+        call %HIVE_BIN_PATH%\ext\util\execHiveCmd.cmd %CLASS%
+goto :EOF
+
+:cleardanglingscratchdir_help
+        echo "usage hive --service cleardanglingscratchdir"
+goto :EOF

http://git-wip-us.apache.org/repos/asf/hive/blob/80cd2d6c/bin/ext/cleardanglingscratchdir.sh
----------------------------------------------------------------------
diff --git a/bin/ext/cleardanglingscratchdir.sh 
b/bin/ext/cleardanglingscratchdir.sh
new file mode 100644
index 0000000..dcc44e3
--- /dev/null
+++ b/bin/ext/cleardanglingscratchdir.sh
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+THISSERVICE=cleardanglingscratchdir
+export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} "
+
+cleardanglingscratchdir () {
+  CLASS=org.apache.hadoop.hive.ql.session.ClearDanglingScratchDir
+  HIVE_OPTS=''
+  execHiveCmd $CLASS "$@"
+}
+
+cleardanglingscratchdir_help () {
+  echo ""
+  echo "usage ./hive --service cleardanglingscratchdir"
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/80cd2d6c/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f84c940..0d31131 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1788,6 +1788,8 @@ public class HiveConf extends Configuration {
         "Must be a subclass of org.apache.hadoop.hive.ql.log.PerfLogger"),
     HIVE_START_CLEANUP_SCRATCHDIR("hive.start.cleanup.scratchdir", false,
         "To cleanup the Hive scratchdir when starting the Hive Server"),
+    HIVE_SCRATCH_DIR_LOCK("hive.scratchdir.lock", false,
+        "To hold a lock file in scratchdir to prevent to be removed by 
cleardanglingscratchdir"),
     HIVE_INSERT_INTO_MULTILEVEL_DIRS("hive.insert.into.multilevel.dirs", false,
         "Where to insert into multilevel directories like\n" +
         "\"insert directory '/HIVEFT25686/chinna/' from table\""),

http://git-wip-us.apache.org/repos/asf/hive/blob/80cd2d6c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/session/TestClearDanglingScratchDir.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/session/TestClearDanglingScratchDir.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/session/TestClearDanglingScratchDir.java
new file mode 100644
index 0000000..1007113
--- /dev/null
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/session/TestClearDanglingScratchDir.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.session;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.nio.channels.FileChannel;
+import java.util.UUID;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+public class TestClearDanglingScratchDir {
+  private static MiniDFSCluster m_dfs = null;
+  private static HiveConf conf;
+  private static Path scratchDir;
+  private ByteArrayOutputStream stdout;
+  private PrintStream origStdoutPs;
+  private static File logFile;
+
+  @BeforeClass
+  static public void oneTimeSetup() throws Exception {
+    logFile = File.createTempFile("log", "");
+    File log4jConfig = File.createTempFile("config", ".properties");
+    log4jConfig.deleteOnExit();
+    PrintWriter pw = new PrintWriter(log4jConfig);
+    pw.println("appenders = console, file");
+    pw.println("appender.console.type = Console");
+    pw.println("appender.console.name = STDOUT");
+    pw.println("appender.console.layout.type = PatternLayout");
+    pw.println("appender.console.layout.pattern = %t %-5p %c{2} - %m%n");
+    pw.println("appender.file.type = File");
+    pw.println("appender.file.name = LOGFILE");
+    pw.println("appender.file.fileName = " + logFile.getAbsolutePath());
+    pw.println("appender.file.layout.type = PatternLayout");
+    pw.println("appender.file.layout.pattern = %t %-5p %c{2} - %m%n");
+    pw.println("rootLogger.level = debug");
+    pw.println("rootLogger.appenderRefs = stdout");
+    pw.println("rootLogger.appenderRef.stdout.ref = STDOUT");
+    pw.println("loggers = file");
+    pw.println("logger.file.name = SessionState");
+    pw.println("logger.file.level = debug");
+    pw.println("logger.file.appenderRefs = file");
+    pw.println("logger.file.appenderRef.file.ref = LOGFILE");
+    pw.close();
+    System.setProperty("log4j.configurationFile", 
log4jConfig.getAbsolutePath());
+
+    m_dfs = new MiniDFSCluster.Builder(new 
Configuration()).numDataNodes(1).format(true).build();
+    conf = new HiveConf();
+    conf.set(HiveConf.ConfVars.HIVE_SCRATCH_DIR_LOCK.toString(), "true");
+    conf.set(HiveConf.ConfVars.METASTORE_AUTO_CREATE_ALL.toString(), "true");
+    LoggerFactory.getLogger("SessionState");
+    conf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,
+        new Path(System.getProperty("test.tmp.dir"), "warehouse").toString());
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+        m_dfs.getFileSystem().getUri().toString());
+
+    scratchDir = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR));
+    m_dfs.getFileSystem().mkdirs(scratchDir);
+    m_dfs.getFileSystem().setPermission(scratchDir, new FsPermission("777"));
+  }
+
+  @AfterClass
+  static public void shutdown() throws Exception {
+    m_dfs.shutdown();
+  }
+
+  public void redirectOutput() throws IOException {
+    stdout = new ByteArrayOutputStream();
+    PrintStream psStdout = new PrintStream(stdout);
+    origStdoutPs = System.out;
+    System.setOut(psStdout);
+
+    FileOutputStream fos = new FileOutputStream(logFile, true);
+    FileChannel outChan = fos.getChannel();
+    outChan.truncate(0);
+    outChan.close();
+    fos.close();
+  }
+
+  public void rollbackOutput() {
+    System.setOut(origStdoutPs);
+  }
+
+  @Test
+  public void testClearDanglingScratchDir() throws Exception {
+
+    // No scratch dir initially
+    redirectOutput();
+    ClearDanglingScratchDir.main(new String[]{"-s",
+        m_dfs.getFileSystem().getUri().toString() + 
scratchDir.toUri().toString()});
+    rollbackOutput();
+    Assert.assertTrue(FileUtils.readFileToString(logFile).contains("Cannot 
find any scratch directory to clear"));
+
+    // Create scratch dir without lock files
+    m_dfs.getFileSystem().mkdirs(new Path(new Path(scratchDir, "dummy"), 
UUID.randomUUID().toString()));
+    redirectOutput();
+    ClearDanglingScratchDir.main(new String[]{"-s",
+        m_dfs.getFileSystem().getUri().toString() + 
scratchDir.toUri().toString()});
+    rollbackOutput();
+    
Assert.assertEquals(StringUtils.countMatches(FileUtils.readFileToString(logFile),
+        "since it does not contain " + SessionState.LOCK_FILE_NAME), 1);
+    Assert.assertTrue(FileUtils.readFileToString(logFile).contains("Cannot 
find any scratch directory to clear"));
+
+    // One live session
+    SessionState ss = SessionState.start(conf);
+    redirectOutput();
+    ClearDanglingScratchDir.main(new String[]{"-s",
+        m_dfs.getFileSystem().getUri().toString() + 
scratchDir.toUri().toString()});
+    rollbackOutput();
+    
Assert.assertEquals(StringUtils.countMatches(FileUtils.readFileToString(logFile),
 "is being used by live process"), 1);
+
+    // One dead session with dry-run
+    ss.releaseSessionLockFile();
+    redirectOutput();
+    ClearDanglingScratchDir.main(new String[]{"-r", "-s",
+        m_dfs.getFileSystem().getUri().toString() + 
scratchDir.toUri().toString()});
+    rollbackOutput();
+    // Find one session dir to remove
+    Assert.assertFalse(stdout.toString().isEmpty());
+
+    // Remove the dead session dir
+    redirectOutput();
+    ClearDanglingScratchDir.main(new String[]{"-s",
+        m_dfs.getFileSystem().getUri().toString() + 
scratchDir.toUri().toString()});
+    rollbackOutput();
+    Assert.assertTrue(FileUtils.readFileToString(logFile).contains("Removing 1 
scratch directories"));
+    
Assert.assertEquals(StringUtils.countMatches(FileUtils.readFileToString(logFile),
 "removed"), 1);
+    ss.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/80cd2d6c/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java 
b/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java
new file mode 100644
index 0000000..8543768
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/ClearDanglingScratchDir.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.session;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * A tool to remove dangling scratch directory. A scratch directory could be 
left behind
+ * in some cases, such as when vm restarts and leave no chance for Hive to run 
shutdown hook.
+ * The tool will test a scratch directory is use, if not, remove it.
+ * We rely on HDFS write lock for to detect if a scratch directory is in use:
+ * 1. A HDFS client open HDFS file ($scratchdir/inuse.lck) for write and only 
close
+ *    it at the time the session is closed
+ * 2. cleardanglingscratchDir can try to open $scratchdir/inuse.lck for write. 
If the
+ *    corresponding HiveCli/HiveServer2 is still running, we will get 
exception.
+ *    Otherwise, we know the session is dead
+ * 3. If the HiveCli/HiveServer2 dies without closing the HDFS file, NN will 
reclaim the
+ *    lease after 10 min, ie, the HDFS file hold by the dead 
HiveCli/HiveServer2 is writable
+ *    again after 10 min. Once it become writable, cleardanglingscratchDir 
will be able to
+ *    remove it
+ */
+public class ClearDanglingScratchDir {
+
+  public static void main(String[] args) throws Exception {
+    Options opts = createOptions();
+    CommandLine cli = new GnuParser().parse(opts, args);
+
+    if (cli.hasOption('h')) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("cleardanglingscratchdir"
+          + " (clear scratch dir left behind by dead HiveCli or HiveServer2)", 
opts);
+      return;
+    }
+
+    boolean dryRun = false;
+    boolean verbose = false;
+
+    if (cli.hasOption("r")) {
+      dryRun = true;
+    }
+
+    if (cli.hasOption("v")) {
+      verbose = true;
+    }
+
+    HiveConf conf = new HiveConf();
+
+    Path rootHDFSDirPath;
+    if (cli.hasOption("s")) {
+      rootHDFSDirPath = new Path(cli.getOptionValue("s"));
+    } else {
+      rootHDFSDirPath = new Path(HiveConf.getVar(conf, 
HiveConf.ConfVars.SCRATCHDIR));
+    }
+
+    FileSystem fs = FileSystem.get(rootHDFSDirPath.toUri(), conf);
+    FileStatus[] userHDFSDirList = fs.listStatus(rootHDFSDirPath);
+
+    List<Path> scratchDirToRemove = new ArrayList<Path>();
+    for (FileStatus userHDFSDir : userHDFSDirList) {
+      FileStatus[] scratchDirList = fs.listStatus(userHDFSDir.getPath());
+      for (FileStatus scratchDir : scratchDirList) {
+        Path lockFilePath = new Path(scratchDir.getPath(), 
SessionState.LOCK_FILE_NAME);
+        if (!fs.exists(lockFilePath)) {
+          String message = "Skipping " + scratchDir.getPath() + " since it 
does not contain " +
+              SessionState.LOCK_FILE_NAME;
+          if (verbose) {
+            SessionState.getConsole().printInfo(message);
+          } else {
+            SessionState.getConsole().logInfo(message);
+          }
+          continue;
+        }
+        try {
+          IOUtils.closeStream(fs.append(lockFilePath));
+          scratchDirToRemove.add(scratchDir.getPath());
+        } catch (RemoteException e) {
+          // RemoteException with AlreadyBeingCreatedException will be thrown
+          // if the file is currently held by a writer
+          
if(AlreadyBeingCreatedException.class.getName().equals(e.getClassName())){
+            // Cannot open the lock file for writing, must be held by a live 
process
+            String message = scratchDir.getPath() + " is being used by live 
process";
+            if (verbose) {
+              SessionState.getConsole().printInfo(message);
+            } else {
+              SessionState.getConsole().logInfo(message);
+            }
+          } else {
+            throw e;
+          }
+        }
+      }
+    }
+
+    if (scratchDirToRemove.size()==0) {
+      SessionState.getConsole().printInfo("Cannot find any scratch directory 
to clear");
+      return;
+    }
+    SessionState.getConsole().printInfo("Removing " + 
scratchDirToRemove.size() + " scratch directories");
+    for (Path scratchDir : scratchDirToRemove) {
+      if (dryRun) {
+        System.out.println(scratchDir);
+      } else {
+        boolean succ = fs.delete(scratchDir, true);
+        if (!succ) {
+          SessionState.getConsole().printInfo("Cannot remove " + scratchDir);
+        } else {
+          String message = scratchDir + " removed";
+          if (verbose) {
+            SessionState.getConsole().printInfo(message);
+          } else {
+            SessionState.getConsole().logInfo(message);
+          }
+        }
+      }
+    }
+  }
+
+  static Options createOptions() {
+    Options result = new Options();
+
+    // add -r and --dry-run to generate list only
+    result.addOption(OptionBuilder
+        .withLongOpt("dry-run")
+        .withDescription("Generate a list of dangling scratch dir, printed on 
console")
+        .create('r'));
+
+    // add -s and --scratchdir to specify a non-default scratch dir
+    result.addOption(OptionBuilder
+        .withLongOpt("scratchdir")
+        .withDescription("Specify a non-default location of the scratch dir")
+        .hasArg()
+        .create('s'));
+
+    // add -v and --verbose to print verbose message
+    result.addOption(OptionBuilder
+        .withLongOpt("verbose")
+        .withDescription("Print verbose message")
+        .create('v'));
+
+    result.addOption(OptionBuilder
+        .withLongOpt("help")
+        .withDescription("print help message")
+        .create('h'));
+
+    return result;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/80cd2d6c/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index c8299fd..bf17b43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -23,6 +23,8 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URLClassLoader;
@@ -45,6 +47,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -91,6 +94,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Shell;
 
 import com.google.common.base.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * SessionState encapsulates common data associated with a session.
@@ -106,6 +110,8 @@ public class SessionState {
   private static final String LOCAL_SESSION_PATH_KEY = 
"_hive.local.session.path";
   private static final String HDFS_SESSION_PATH_KEY = 
"_hive.hdfs.session.path";
   private static final String TMP_TABLE_SPACE_KEY = "_hive.tmp_table_space";
+  static final String LOCK_FILE_NAME = "inuse.lck";
+
   private final Map<String, Map<String, Table>> tempTables = new 
HashMap<String, Map<String, Table>>();
   private final Map<String, Map<String, ColumnStatisticsObj>> 
tempTableColStats =
       new HashMap<String, Map<String, ColumnStatisticsObj>>();
@@ -227,6 +233,8 @@ public class SessionState {
    */
   private Path hdfsSessionPath;
 
+  private FSDataOutputStream hdfsSessionPathLockFile = null;
+
   /**
    * sub dir of hdfs session path. used to keep tmp tables
    * @return Path for temporary tables created by the current session
@@ -574,8 +582,9 @@ public class SessionState {
    * 2. Local scratch dir
    * 3. Local downloaded resource dir
    * 4. HDFS session path
-   * 5. Local session path
-   * 6. HDFS temp table space
+   * 5. hold a lock file in HDFS session dir to indicate the it is in use
+   * 6. Local session path
+   * 7. HDFS temp table space
    * @param userName
    * @throws IOException
    */
@@ -603,11 +612,19 @@ public class SessionState {
     hdfsSessionPath = new Path(hdfsScratchDirURIString, sessionId);
     createPath(conf, hdfsSessionPath, scratchDirPermission, false, true);
     conf.set(HDFS_SESSION_PATH_KEY, hdfsSessionPath.toUri().toString());
-    // 5. Local session path
+    // 5. hold a lock file in HDFS session dir to indicate the it is in use
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_SCRATCH_DIR_LOCK)) {
+      FileSystem fs = FileSystem.get(conf);
+      hdfsSessionPathLockFile = fs.create(new Path(hdfsSessionPath, 
LOCK_FILE_NAME), true);
+      hdfsSessionPathLockFile.writeUTF("hostname: " + 
InetAddress.getLocalHost().getHostName() + "\n");
+      hdfsSessionPathLockFile.writeUTF("process: " + 
ManagementFactory.getRuntimeMXBean().getName() + "\n");
+      hdfsSessionPathLockFile.hsync();
+    }
+    // 6. Local session path
     localSessionPath = new Path(HiveConf.getVar(conf, 
HiveConf.ConfVars.LOCALSCRATCHDIR), sessionId);
     createPath(conf, localSessionPath, scratchDirPermission, true, true);
     conf.set(LOCAL_SESSION_PATH_KEY, localSessionPath.toUri().toString());
-    // 6. HDFS temp table space
+    // 7. HDFS temp table space
     hdfsTmpTableSpace = new Path(hdfsSessionPath, TMP_PREFIX);
     createPath(conf, hdfsTmpTableSpace, scratchDirPermission, false, true);
     conf.set(TMP_TABLE_SPACE_KEY, hdfsTmpTableSpace.toUri().toString());
@@ -722,8 +739,18 @@ public class SessionState {
     return this.hdfsTmpTableSpace;
   }
 
+  @VisibleForTesting
+  void releaseSessionLockFile() throws IOException {
+    if (hdfsSessionPath != null && hdfsSessionPathLockFile != null) {
+      hdfsSessionPathLockFile.close();
+    }
+  }
+
   private void dropSessionPaths(Configuration conf) throws IOException {
     if (hdfsSessionPath != null) {
+      if (hdfsSessionPathLockFile != null) {
+        hdfsSessionPathLockFile.close();
+      }
       hdfsSessionPath.getFileSystem(conf).delete(hdfsSessionPath, true);
     }
     if (localSessionPath != null) {

Reply via email to