This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 0c0bc15d54b HIVE-29302: Wider Tez session open scope should be guarded
to prevent local resource leaks (#6161)
0c0bc15d54b is described below
commit 0c0bc15d54bc4135ea6a5f89be4d103061c92500
Author: Bodor Laszlo <[email protected]>
AuthorDate: Tue Dec 16 23:47:27 2025 +0100
HIVE-29302: Wider Tez session open scope should be guarded to prevent local
resource leaks (#6161)
---
.../hadoop/hive/ql/exec/tez/TezSessionState.java | 41 +++++++++++++++-----
.../hive/ql/exec/tez/TestTezSessionState.java | 44 +++++++++++++++++++++-
2 files changed, 74 insertions(+), 11 deletions(-)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index b89cac4e813..41e01b17feb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -114,7 +114,8 @@ public class TezSessionState {
private static final String LLAP_TASK_COMMUNICATOR =
LlapTaskCommunicator.class.getName();
private final HiveConf conf;
- private Path tezScratchDir;
+ @VisibleForTesting
+ Path tezScratchDir;
private LocalResource appJarLr;
private TezClient session;
private Future<TezClient> sessionFuture;
@@ -272,9 +273,6 @@ protected void openInternal(String[]
additionalFilesNotFromConf,
this.queueName = confQueueName;
this.doAsEnabled =
conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
- final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar(
- conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
-
// TODO This - at least for the session pool - will always be the hive
user. How does doAs above this affect things ?
UserGroupInformation ugi = Utils.getUGI();
user = ugi.getShortUserName();
@@ -292,12 +290,37 @@ protected void openInternal(String[]
additionalFilesNotFromConf,
LOG.info("Created new resources: " + this.resources);
}
- // unless already installed on all the cluster nodes, we'll have to
- // localize hive-exec.jar as well.
+ // Unless already installed on all the cluster nodes, we'll have to
localize hive-exec.jar as well.
appJarLr = createJarLocalResource(utils.getExecJarPathLocal(conf));
- // configuration for the application master
- final Map<String, LocalResource> commonLocalResources = new
HashMap<String, LocalResource>();
+ try {
+ openInternalUnsafe(isAsync, console);
+ } catch (Exception e) {
+ LOG.warn("Failed to open session, deleting scratch dir to prevent
resource leak...", e);
+ cleanupScratchDir();
+ throw e;
+ }
+ }
+
+ /**
+ * Opens a Tez session without performing a complete rollback/cleanup on
failure.
+ *
+ * <p><strong>Callers MUST guard this method with try/catch and perform
cleanup</strong>
+ * of partially initialized state (such as localized files in the scratch
directory).
+ * This method is not safe on its own.</p>
+ *
+ * @param isAsync whether to open the Tez session asynchronously in a
separate thread
+ * @param console a {@link LogHelper} used to log session startup events
+ *
+ * @throws TezException if the session fails to start (including failures
during
+ * container launch or session initialization)
+ * @throws IOException if local resource localization or I/O setup fails
+ */
+ @VisibleForTesting
+ void openInternalUnsafe(boolean isAsync, LogHelper console) throws
TezException, IOException {
+ final Map<String, LocalResource> commonLocalResources = new HashMap<>();
+ final boolean llapMode = "llap".equalsIgnoreCase(HiveConf.getVar(conf,
HiveConf.ConfVars.HIVE_EXECUTION_MODE));
+
commonLocalResources.put(DagUtils.getBaseName(appJarLr), appJarLr);
for (LocalResource lr : this.resources.localizedResources) {
commonLocalResources.put(DagUtils.getBaseName(lr), lr);
@@ -312,7 +335,7 @@ protected void openInternal(String[]
additionalFilesNotFromConf,
}
// Create environment for AM.
- Map<String, String> amEnv = new HashMap<String, String>();
+ Map<String, String> amEnv = new HashMap<>();
MRHelpers.updateEnvBasedOnMRAMEnv(conf, amEnv);
// and finally we're ready to create and start the session
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
index 8e48c0f9998..62e0d0cd27c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
@@ -16,17 +16,25 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
+import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConfForTest;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.TezException;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestTezSessionState {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestTezSessionState.class.getName());
@Test
public void testSymlinkedLocalFilesAreLocalizedOnce() throws Exception {
@@ -39,8 +47,7 @@ public void testSymlinkedLocalFilesAreLocalizedOnce() throws
Exception {
Assert.assertTrue(Files.isSymbolicLink(symlinkPath));
- HiveConf hiveConf = new HiveConf();
- hiveConf.set(HiveConf.ConfVars.HIVE_JAR_DIRECTORY.varname, "/tmp");
+ HiveConf hiveConf = new HiveConfForTest(getClass());
TezSessionState sessionState = new TezSessionState(DagUtils.getInstance(),
hiveConf);
@@ -50,4 +57,37 @@ public void testSymlinkedLocalFilesAreLocalizedOnce() throws
Exception {
// local resources point to the same original resource
Assert.assertEquals(l1.getResource().toPath(), l2.getResource().toPath());
}
+
+ @Test
+ public void testScratchDirDeletedInTheEventOfExceptionWhileOpeningSession()
throws Exception {
+ HiveConf hiveConf = new HiveConfForTest(getClass());
+ hiveConf.set("hive.security.authorization.manager",
+
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdConfOnlyAuthorizerFactory");
+ SessionState.start(hiveConf);
+
+ final AtomicReference<String> scratchDirPath = new AtomicReference<>();
+
+ TezSessionState sessionState = new
TezSessionState(SessionState.get().getSessionId(), hiveConf) {
+ @Override
+ void openInternalUnsafe(boolean isAsync, SessionState.LogHelper console)
throws TezException, IOException {
+ super.openInternalUnsafe(isAsync, console);
+ // save scratch dir here as it's nullified while calling the cleanup
+ scratchDirPath.set(tezScratchDir.toUri().getPath());
+ throw new RuntimeException("fake exception in openInternalUnsafe");
+ }
+ };
+
+ TezSessionState.HiveResources resources =
+ new TezSessionState.HiveResources(new
org.apache.hadoop.fs.Path("/tmp"));
+
+ try {
+ sessionState.open(resources);
+ Assert.fail("An exception should have been thrown while calling
openInternal");
+ } catch (Exception e) {
+ Assert.assertEquals("fake exception in openInternalUnsafe",
e.getMessage());
+ }
+ LOG.info("Checking if scratch dir exists: {}", scratchDirPath.get());
+ Assert.assertFalse("Scratch dir is not supposed to exist after cleanup: "
+ scratchDirPath.get(),
+ Files.exists(Paths.get(scratchDirPath.get())));
+ }
}
\ No newline at end of file