This is an automated email from the ASF dual-hosted git repository.

iwasakims pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/bigtop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 4c220b37c BIGTOP-3963: Merge hive memory leak PRs (for hive-3.1.3) 
(#1248)
4c220b37c is described below

commit 4c220b37c51d15b46163000485de91ed8e146ed0
Author: Zhiguo Wu <[email protected]>
AuthorDate: Wed May 8 08:17:34 2024 +0800

    BIGTOP-3963: Merge hive memory leak PRs (for hive-3.1.3) (#1248)
    
    Co-authored-by: rzuo <[email protected]>
    (cherry picked from commit 3808ba7d1b8d6c2504c08ed9079a9d5673ae83de)
---
 ...6-memleak-HIVE-16455-HIVE-24858-branch-3.1.diff |  14 +
 .../patch17-memleak-HIVE-24236-branch-3.1.diff     |  80 +++++
 .../patch18-memleak-HIVE-24552-branch-3.1.diff     |  14 +
 .../patch19-memleak-HIVE-26404-branch-3.1.diff     | 378 +++++++++++++++++++++
 .../patch20-memleak-HIVE-26530-branch-3.1.diff     |  26 ++
 .../patch21-memleak-HIVE-24590-branch-3.1.diff     | 153 +++++++++
 6 files changed, 665 insertions(+)

diff --git 
a/bigtop-packages/src/common/hive/patch16-memleak-HIVE-16455-HIVE-24858-branch-3.1.diff
 
b/bigtop-packages/src/common/hive/patch16-memleak-HIVE-16455-HIVE-24858-branch-3.1.diff
new file mode 100644
index 000000000..38aa38d5b
--- /dev/null
+++ 
b/bigtop-packages/src/common/hive/patch16-memleak-HIVE-16455-HIVE-24858-branch-3.1.diff
@@ -0,0 +1,14 @@
+diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+index 7a5866cb1c..1888e6eac4 100644
+--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
++++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+@@ -1746,7 +1746,9 @@ public void close() throws IOException {
+     if (txnMgr != null) {
+       txnMgr.closeTxnManager();
+     }
++    delete_resources(ResourceType.JAR);
+     JavaUtils.closeClassLoadersTo(sessionConf.getClassLoader(), parentLoader);
++    Utilities.restoreSessionSpecifiedClassLoader(getClass().getClassLoader());
+     File resourceDir =
+         new 
File(getConf().getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR));
+     LOG.debug("Removing resource dir " + resourceDir);
diff --git 
a/bigtop-packages/src/common/hive/patch17-memleak-HIVE-24236-branch-3.1.diff 
b/bigtop-packages/src/common/hive/patch17-memleak-HIVE-24236-branch-3.1.diff
new file mode 100644
index 000000000..fc0ac4cf6
--- /dev/null
+++ b/bigtop-packages/src/common/hive/patch17-memleak-HIVE-24236-branch-3.1.diff
@@ -0,0 +1,80 @@
+diff --git 
a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java 
b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+index 9294c2b32c..ec258b5e9d 100644
+--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
++++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+@@ -1370,21 +1370,25 @@ public void showLocks() throws Exception {
+   @Ignore("Wedges Derby")
+   public void deadlockDetected() throws Exception {
+     LOG.debug("Starting deadlock test");
++
+     if (txnHandler instanceof TxnHandler) {
+       final TxnHandler tHndlr = (TxnHandler)txnHandler;
+       Connection conn = tHndlr.getDbConn(Connection.TRANSACTION_SERIALIZABLE);
+-      Statement stmt = conn.createStatement();
+-      long now = tHndlr.getDbTime(conn);
+-      stmt.executeUpdate("insert into TXNS (txn_id, txn_state, txn_started, 
txn_last_heartbeat, " +
+-          "txn_user, txn_host) values (1, 'o', " + now + ", " + now + ", 
'shagy', " +
+-          "'scooby.com')");
+-      stmt.executeUpdate("insert into HIVE_LOCKS (hl_lock_ext_id, 
hl_lock_int_id, hl_txnid, " +
+-          "hl_db, hl_table, hl_partition, hl_lock_state, hl_lock_type, 
hl_last_heartbeat, " +
+-          "hl_user, hl_host) values (1, 1, 1, 'mydb', 'mytable', 
'mypartition', '" +
+-          tHndlr.LOCK_WAITING + "', '" + tHndlr.LOCK_EXCLUSIVE + "', " + now 
+ ", 'fred', " +
+-          "'scooby.com')");
+-      conn.commit();
+-      tHndlr.closeDbConn(conn);
++      try {
++        Statement stmt = conn.createStatement();
++        long now = tHndlr.getDbTime(conn);
++        stmt.executeUpdate("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", 
\"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", " +
++                "txn_user, txn_host) values (1, 'o', " + now + ", " + now + 
", 'shagy', " +
++                "'scooby.com')");
++        stmt.executeUpdate("INSERT INTO \"HIVE_LOCKS\" (\"HL_LOCK_EXT_ID\", 
\"HL_LOCK_INT_ID\", \"HL_TXNID\", " +
++                "\"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", 
\"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_LAST_HEARTBEAT\", " +
++                "\"HL_USER\", \"HL_HOST\") VALUES (1, 1, 1, 'MYDB', 
'MYTABLE', 'MYPARTITION', '" +
++                tHndlr.LOCK_WAITING + "', '" + tHndlr.LOCK_EXCLUSIVE + "', " 
+ now + ", 'fred', " +
++                "'scooby.com')");
++        conn.commit();
++      } finally {
++        tHndlr.closeDbConn(conn);
++      }
+ 
+       final AtomicBoolean sawDeadlock = new AtomicBoolean();
+ 
+diff --git 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+index 38db1c97da..f911acba58 100644
+--- 
a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
++++ 
b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+@@ -4775,6 +4775,7 @@ public LockHandle acquireLock(String key) throws 
MetaException {
+     Connection dbConn = null;
+     Statement stmt = null;
+     ResultSet rs = null;
++    boolean needToCloseConn = true;
+     try {
+       try {
+         String sqlStmt = sqlGenerator.addForUpdateClause("select MT_COMMENT 
from AUX_TABLE where MT_KEY1=" + quoteString(key) + " and MT_KEY2=0");
+@@ -4812,20 +4813,21 @@ public LockHandle acquireLock(String key) throws 
MetaException {
+           derbySemaphore.acquire();
+         }
+         LOG.debug(quoteString(key) + " locked by " + 
quoteString(TxnHandler.hostname));
++        needToCloseConn = false;  //The connection is good, we need not close 
it
+         //OK, so now we have a lock
+         return new LockHandleImpl(dbConn, stmt, rs, key, derbySemaphore);
+       } catch (SQLException ex) {
+-        rollbackDBConn(dbConn);
+-        close(rs, stmt, dbConn);
+         checkRetryable(dbConn, ex, "acquireLock(" + key + ")");
+         throw new MetaException("Unable to lock " + quoteString(key) + " due 
to: " + getMessage(ex) + "; " + StringUtils.stringifyException(ex));
+       }
+       catch(InterruptedException ex) {
+-        rollbackDBConn(dbConn);
+-        close(rs, stmt, dbConn);
+         throw new MetaException("Unable to lock " + quoteString(key) + " due 
to: " + ex.getMessage() + StringUtils.stringifyException(ex));
+       }
+       finally {
++        if (needToCloseConn) {
++          rollbackDBConn(dbConn);
++          close(rs, stmt, dbConn);
++        }
+         unlockInternal();
+       }
+     }
diff --git 
a/bigtop-packages/src/common/hive/patch18-memleak-HIVE-24552-branch-3.1.diff 
b/bigtop-packages/src/common/hive/patch18-memleak-HIVE-24552-branch-3.1.diff
new file mode 100644
index 000000000..a9472e747
--- /dev/null
+++ b/bigtop-packages/src/common/hive/patch18-memleak-HIVE-24552-branch-3.1.diff
@@ -0,0 +1,14 @@
+diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+index 4d2e1a4e9a..78b523e350 100644
+--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
++++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+@@ -2274,6 +2274,9 @@ public Void call() throws Exception {
+                   + " isAcid=" + isAcid + ", "
+                   + " hasFollowingStatsTask=" + hasFollowingStatsTask, t);
+               throw t;
++            } finally {
++              // get(conf).getMSC can be called in this task, Close the HMS 
connection right after use, do not wait for finalizer to close it.
++              closeCurrent();
+             }
+           }
+         }));
diff --git 
a/bigtop-packages/src/common/hive/patch19-memleak-HIVE-26404-branch-3.1.diff 
b/bigtop-packages/src/common/hive/patch19-memleak-HIVE-26404-branch-3.1.diff
new file mode 100644
index 000000000..1ff10ae0e
--- /dev/null
+++ b/bigtop-packages/src/common/hive/patch19-memleak-HIVE-26404-branch-3.1.diff
@@ -0,0 +1,378 @@
+diff --git 
a/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java
 
b/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java
+new file mode 100644
+index 0000000000..b41cbae8d4
+--- /dev/null
++++ 
b/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java
+@@ -0,0 +1,184 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to you under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.hadoop.hive.ql.txn.compactor;
++
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.hdfs.MiniDFSCluster;
++import org.apache.hadoop.hive.conf.HiveConf;
++import org.apache.hadoop.hive.metastore.api.CompactionRequest;
++import org.apache.hadoop.hive.metastore.api.CompactionType;
++import org.apache.hadoop.hive.metastore.api.Table;
++import org.apache.hadoop.http.HttpConfig;
++import org.apache.hadoop.minikdc.MiniKdc;
++import org.apache.hadoop.security.SecurityUtil;
++import org.apache.hadoop.security.UserGroupInformation;
++import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
++
++import org.junit.AfterClass;
++import org.junit.Assert;
++import org.junit.BeforeClass;
++import org.junit.Test;
++import org.mockito.internal.util.reflection.FieldSetter;
++
++import java.io.IOException;
++import java.nio.file.Path;
++import java.nio.file.Paths;
++import java.util.UUID;
++import java.util.concurrent.atomic.AtomicBoolean;
++
++import static 
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
++import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
++import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
++import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
++import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
++import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
++import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
++import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
++import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
++import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
++import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
++
++public class TestCleanerWithSecureDFS extends CompactorTest {
++    private static final Path KEYSTORE_DIR =
++            Paths.get(System.getProperty("test.tmp.dir"), "kdc_root_dir" + 
UUID.randomUUID());
++    private static final String SUPER_USER_NAME = "hdfs";
++    private static final Path SUPER_USER_KEYTAB = 
KEYSTORE_DIR.resolve(SUPER_USER_NAME + ".keytab");
++
++    private static MiniDFSCluster dfsCluster = null;
++    private static MiniKdc kdc = null;
++    private static HiveConf secureConf = null;
++
++    private static MiniKdc initKDC() {
++        try {
++            MiniKdc kdc = new MiniKdc(MiniKdc.createConf(), 
KEYSTORE_DIR.toFile());
++            kdc.start();
++            kdc.createPrincipal(SUPER_USER_KEYTAB.toFile(), SUPER_USER_NAME + 
"/localhost", "HTTP/localhost");
++            return kdc;
++        } catch (Exception e) {
++            throw new RuntimeException(e);
++        }
++    }
++
++    private static MiniDFSCluster initDFS(Configuration c) {
++        try {
++            MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(c).numDataNodes(1).skipFsyncForTesting(true).build();
++            cluster.waitActive();
++            return cluster;
++        } catch (IOException e) {
++            throw new RuntimeException(e);
++        }
++    }
++
++    @BeforeClass
++    public static void startCluster() throws Exception {
++        kdc = initKDC();
++        secureConf = createSecureDFSConfig(kdc);
++        dfsCluster = initDFS(secureConf);
++    }
++
++    @AfterClass
++    public static void stopCluster() {
++        secureConf = null;
++        try {
++            if (dfsCluster != null) {
++                dfsCluster.close();
++            }
++        } finally {
++            dfsCluster = null;
++            if (kdc != null) {
++                kdc.stop();
++            }
++            kdc = null;
++        }
++    }
++
++    @Override
++    public void setup() throws Exception {
++        HiveConf conf = new HiveConf(secureConf);
++        conf.set("fs.defaultFS", 
dfsCluster.getFileSystem().getUri().toString());
++        setup(new HiveConf(secureConf));
++    }
++
++    private static HiveConf createSecureDFSConfig(MiniKdc kdc) throws 
Exception {
++        HiveConf conf = new HiveConf();
++        
SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS,
 conf);
++        String suPrincipal = SUPER_USER_NAME + "/localhost@" + kdc.getRealm();
++        String suKeyTab = SUPER_USER_KEYTAB.toAbsolutePath().toString();
++        conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, suPrincipal);
++        conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, suKeyTab);
++        conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, suPrincipal);
++        conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, suKeyTab);
++        conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, 
"HTTP/localhost@" + kdc.getRealm());
++        conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
++        conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, 
"authentication,integrity,privacy");
++        conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
++        conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
++        conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
++        conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
++        conf.set("hadoop.proxyuser.hdfs.groups", "*");
++        conf.set("hadoop.proxyuser.hdfs.hosts", "*");
++
++        String sslConfDir = 
KeyStoreTestUtil.getClasspathDir(TestCleanerWithSecureDFS.class);
++        
KeyStoreTestUtil.setupSSLConfig(KEYSTORE_DIR.toAbsolutePath().toString(), 
sslConfDir, conf, false);
++        return conf;
++    }
++
++    @Test
++    public void testLeakAfterHistoryException() throws Exception {
++        final String tblNamePrefix = "tbl_hive_26404_";
++        // Generate compaction requests that will fail cause the base file 
will not meet the expectations
++        for (int i = 0; i < 5; i++) {
++            Table t = newTable("default", tblNamePrefix + i, false);
++            CompactionRequest rqst = new CompactionRequest("default", 
t.getTableName(), CompactionType.MAJOR);
++            // The requests should come from different users in order to 
trigger the leak
++            rqst.setRunas("user" + i);
++            long cTxn = compactInTxn(rqst);
++            // We need at least a base compacted file in order to trigger the 
exception.
++            // It must not be a valid base otherwise the cleaner will not 
throw.
++            addBaseFile(t, null, 1, 1, cTxn);
++        }
++        Cleaner cleaner = new Cleaner();
++        // Create a big configuration, (by adding lots of properties) to 
trigger the memory
++        // leak fast. The problem can still be reproduced with small 
configurations but it requires more
++        // compaction requests to fail and its not practical to have in a 
unit test.
++        // The size of the configuration, measured by taking heapdump and 
inspecting the objects, is
++        // roughly 190MB.
++        HiveConf cleanerConf = new HiveConf(conf);
++        for (int i = 0; i < 1_000_000; i++) {
++            cleanerConf.set("hive.random.property.with.id." + i, 
Integer.toString(i));
++        }
++        cleaner.setConf(cleanerConf);
++        cleaner.init(new AtomicBoolean(true));
++        FieldSetter.setField(cleaner, 
MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), txnHandler);
++        Runtime.getRuntime().gc();
++        long startMem = Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory();
++        cleaner.run();
++        Runtime.getRuntime().gc();
++        long endMem = Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory();
++        long diffMem = Math.abs(endMem - startMem);
++        // 5 failed compactions X 190MB leak per config ~ 1GB leak
++        // Depending on the Xmx value the leak may lead to OOM; if you 
definitely want to see the OOM
++        // increase the size of the configuration or the number of failed 
compactions.
++        Assert.assertTrue("Allocated memory, " + diffMem + "bytes , exceeds 
acceptable variance of 250MB.",
++                diffMem < 250_000_000);
++    }
++
++    @Override
++    boolean useHive130DeltaDirName() {
++        return false;
++    }
++}
+diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
+index 14d389464f..154dccb6d4 100644
+--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
++++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
+@@ -18,11 +18,9 @@
+ package org.apache.hadoop.hive.ql.txn.compactor;
+ 
+ import org.apache.hadoop.conf.Configuration;
+-import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hdfs.MiniDFSCluster;
+ import org.apache.hadoop.hive.conf.HiveConf;
+-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+ import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+ import org.apache.hadoop.hive.metastore.api.CompactionType;
+ import org.apache.hadoop.hive.metastore.api.Partition;
+@@ -34,7 +32,6 @@
+ import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+ import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+ import org.apache.hadoop.hive.metastore.txn.TxnStore;
+-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+ import org.apache.hadoop.hive.shims.Utils;
+ import org.junit.After;
+ import org.junit.AfterClass;
+@@ -44,33 +41,25 @@
+ import org.junit.Test;
+ 
+ import javax.security.auth.login.LoginException;
+-import java.io.File;
+ import java.io.IOException;
+-import java.nio.file.Files;
+ 
+ import static org.junit.Assert.assertEquals;
+ 
+ public class TestCleanerWithReplication extends CompactorTest {
+   private Path cmRootDirectory;
+-  private static FileSystem fs;
+   private static MiniDFSCluster miniDFSCluster;
+   private final String dbName = "TestCleanerWithReplication";
+ 
+   @Before
+   public void setup() throws Exception {
+-    conf = new HiveConf();
+-    TxnDbUtil.setConfValues(conf);
+-    TxnDbUtil.cleanDb(conf);
+-    conf.set("fs.defaultFS", fs.getUri().toString());
++    HiveConf conf = new HiveConf();
++    conf.set("fs.defaultFS", 
miniDFSCluster.getFileSystem().getUri().toString());
+     conf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
+-    TxnDbUtil.prepDb(conf);
+-    ms = new HiveMetaStoreClient(conf);
+-    txnHandler = TxnUtils.getTxnStore(conf);
++    setup(conf)
+     cmRootDirectory = new Path(conf.get(HiveConf.ConfVars.REPLCMDIR.varname));
+     if (!fs.exists(cmRootDirectory)) {
+       fs.mkdirs(cmRootDirectory);
+     }
+-    tmpdir = new 
File(Files.createTempDirectory("compactor_test_table_").toString());
+     Database db = new Database();
+     db.putToParameters(SOURCE_OF_REPLICATION, "1,2,3");
+     db.setName(dbName);
+@@ -84,7 +73,6 @@ public static void classLevelSetup() throws LoginException, 
IOException {
+     hadoopConf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + 
".hosts", "*");
+     miniDFSCluster =
+         new 
MiniDFSCluster.Builder(hadoopConf).numDataNodes(1).format(true).build();
+-    fs = miniDFSCluster.getFileSystem();
+   }
+ 
+   @After
+diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+index f26920cf43..294c36089a 100644
+--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
++++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+@@ -298,18 +298,25 @@ private void clean(CompactionInfo ci) throws 
MetaException {
+         LOG.info("Cleaning as user " + ci.runAs + " for " + 
ci.getFullPartitionName());
+         UserGroupInformation ugi = 
UserGroupInformation.createProxyUser(ci.runAs,
+             UserGroupInformation.getLoginUser());
+-        ugi.doAs(new PrivilegedExceptionAction<Object>() {
+-          @Override
+-          public Object run() throws Exception {
+-            removeFiles(location, validWriteIdList, ci);
+-            return null;
+-          }
+-        });
+         try {
+           FileSystem.closeAllForUGI(ugi);
+         } catch (IOException exception) {
+           LOG.error("Could not clean up file-system handles for UGI: " + ugi 
+ " for " +
+               ci.getFullPartitionName() + idWatermark(ci), exception);
++          ugi.doAs(new PrivilegedExceptionAction<Object>() {
++            @Override
++            public Object run() throws Exception {
++              removeFiles(location, validWriteIdList, ci);
++              return null;
++            }
++          });
++        } finally {
++          try {
++            FileSystem.closeAllForUGI(ugi);
++          } catch (IOException exception) {
++            LOG.error("Could not clean up file-system handles for UGI: " + 
ugi + " for " +
++                ci.getFullPartitionName() + idWatermark(ci), exception);
++          }
+         }
+       }
+       txnHandler.markCleaned(ci);
+diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+index 124c97e092..4347e2a6ee 100644
+--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
++++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+@@ -17,7 +17,6 @@
+  */
+ package org.apache.hadoop.hive.ql.txn.compactor;
+ 
+-import org.apache.commons.io.FileUtils;
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FSDataInputStream;
+ import org.apache.hadoop.fs.FSDataOutputStream;
+@@ -71,9 +70,7 @@
+ import org.slf4j.LoggerFactory;
+ 
+ import java.io.EOFException;
+-import java.io.File;
+ import java.io.IOException;
+-import java.nio.file.Files;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashMap;
+@@ -83,6 +80,7 @@
+ import java.util.Set;
+ import java.util.Stack;
+ import java.util.concurrent.atomic.AtomicBoolean;
++import java.util.concurrent.atomic.AtomicInteger;
+ 
+ /**
+  * Super class for all of the compactor test modules.
+@@ -90,26 +88,31 @@
+ public abstract class CompactorTest {
+   static final private String CLASS_NAME = CompactorTest.class.getName();
+   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
++  private static final AtomicInteger TMP_DIR_ID = new AtomicInteger();
+ 
+   protected TxnStore txnHandler;
+   protected IMetaStoreClient ms;
+   protected HiveConf conf;
+ 
+   private final AtomicBoolean stop = new AtomicBoolean();
+-  protected File tmpdir;
++  private Path tmpdir;
++  FileSystem fs;
+ 
+   @Before
+-  public void setup() throws Exception {
++  public final void setup() throws Exception {
+     conf = new HiveConf();
++    fs = FileSystem.get(conf);
+     TxnDbUtil.setConfValues(conf);
+     TxnDbUtil.cleanDb(conf);
+     ms = new HiveMetaStoreClient(conf);
+     txnHandler = TxnUtils.getTxnStore(conf);
+-    tmpdir = new 
File(Files.createTempDirectory("compactor_test_table_").toString());
++    Path tmpPath = new Path(System.getProperty("test.tmp.dir"), 
"compactor_test_table_" + TMP_DIR_ID.getAndIncrement());
++    fs.mkdirs(tmpPath);
++    tmpdir = fs.resolvePath(tmpPath);
+   }
+ 
+   protected void compactorTestCleanup() throws IOException {
+-    FileUtils.deleteDirectory(tmpdir);
++    fs.delete(tmpdir, true);
+   }
+ 
+   protected void startInitiator() throws Exception {
+@@ -320,12 +323,11 @@ private void startThread(char type, boolean 
stopAfterOne, AtomicBoolean looped)
+   }
+ 
+   private String getLocation(String tableName, String partValue) {
+-    String location =  tmpdir.getAbsolutePath() +
+-      System.getProperty("file.separator") + tableName;
++    Path tblLocation = new Path(tmpdir, tableName);
+     if (partValue != null) {
+-      location += System.getProperty("file.separator") + "ds=" + partValue;
++      tblLocation = new Path(tblLocation, "ds=" + partValue);
+     }
+-    return location;
++    return tblLocation.toString();
+   }
+ 
+   private enum FileType {BASE, DELTA, LEGACY, LENGTH_FILE};
diff --git 
a/bigtop-packages/src/common/hive/patch20-memleak-HIVE-26530-branch-3.1.diff 
b/bigtop-packages/src/common/hive/patch20-memleak-HIVE-26530-branch-3.1.diff
new file mode 100644
index 000000000..58a403308
--- /dev/null
+++ b/bigtop-packages/src/common/hive/patch20-memleak-HIVE-26530-branch-3.1.diff
@@ -0,0 +1,26 @@
+diff --git 
a/service/src/java/org/apache/hive/service/cli/operation/Operation.java 
b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
+index d866881b1a..d711a36569 100644
+--- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java
++++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
+@@ -409,4 +409,8 @@ protected void markOperationStartTime() {
+   protected void markOperationCompletedTime() {
+     operationComplete = System.currentTimeMillis();
+   }
++
++  public String getQueryId() {
++    return queryState.getQueryId();
++  }
+ }
+diff --git 
a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java 
b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
+index 5336034839..a5435a6805 100644
+--- 
a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
++++ 
b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
+@@ -186,7 +186,7 @@ private Operation getOperationInternal(OperationHandle 
operationHandle) {
+   }
+ 
+   private String getQueryId(Operation operation) {
+-    return 
operation.getParentSession().getHiveConf().getVar(ConfVars.HIVEQUERYID);
++    return operation.getQueryId();
+   }
+ 
+   private void addOperation(Operation operation) {
diff --git 
a/bigtop-packages/src/common/hive/patch21-memleak-HIVE-24590-branch-3.1.diff 
b/bigtop-packages/src/common/hive/patch21-memleak-HIVE-24590-branch-3.1.diff
new file mode 100644
index 000000000..7c9195951
--- /dev/null
+++ b/bigtop-packages/src/common/hive/patch21-memleak-HIVE-24590-branch-3.1.diff
@@ -0,0 +1,153 @@
+diff --git a/common/src/java/org/apache/hadoop/hive/common/LogUtils.java 
b/common/src/java/org/apache/hadoop/hive/common/LogUtils.java
+index 5068eb5be7..ba18c123d4 100644
+--- a/common/src/java/org/apache/hadoop/hive/common/LogUtils.java
++++ b/common/src/java/org/apache/hadoop/hive/common/LogUtils.java
+@@ -19,20 +19,14 @@
+ package org.apache.hadoop.hive.common;
+ 
+ import java.io.File;
+-import java.lang.reflect.InvocationTargetException;
+-import java.lang.reflect.Method;
+ import java.net.URL;
+-import java.util.Map;
+ 
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+ import org.apache.logging.log4j.LogManager;
+-import org.apache.logging.log4j.core.Appender;
+-import org.apache.logging.log4j.core.LoggerContext;
+-import org.apache.logging.log4j.core.appender.routing.RoutingAppender;
++import org.apache.logging.log4j.ThreadContext;
+ import org.apache.logging.log4j.core.config.Configurator;
+-import org.apache.logging.log4j.core.config.LoggerConfig;
+ import org.apache.logging.log4j.core.impl.Log4jContextFactory;
+ import org.apache.logging.log4j.spi.DefaultThreadContextMap;
+ import org.slf4j.Logger;
+@@ -229,31 +223,4 @@ public static void registerLoggingContext(Configuration 
conf) {
+   public static void unregisterLoggingContext() {
+     MDC.clear();
+   }
+-
+-  /**
+-   * Stop the subordinate appender for the operation log so it will not leak 
a file descriptor.
+-   * @param routingAppenderName the name of the RoutingAppender
+-   * @param queryId the id of the query that is closing
+-   */
+-  public static void stopQueryAppender(String routingAppenderName, String 
queryId) {
+-    LoggerContext context = (LoggerContext) LogManager.getContext(false);
+-    org.apache.logging.log4j.core.config.Configuration configuration = 
context.getConfiguration();
+-    LoggerConfig loggerConfig = configuration.getRootLogger();
+-    Map<String, Appender> appenders = loggerConfig.getAppenders();
+-    RoutingAppender routingAppender = (RoutingAppender) 
appenders.get(routingAppenderName);
+-    // routingAppender can be null if it has not been registered
+-    if (routingAppender != null) {
+-      // The appender is configured to use ${ctx:queryId} by 
registerRoutingAppender()
+-      try {
+-        Class<? extends RoutingAppender> clazz = routingAppender.getClass();
+-        Method method = clazz.getDeclaredMethod("deleteAppender", 
String.class);
+-        method.setAccessible(true);
+-        method.invoke(routingAppender, queryId);
+-      } catch (NoSuchMethodException | SecurityException | 
IllegalAccessException |
+-          IllegalArgumentException | InvocationTargetException e) {
+-        l4j.warn("Unable to close the operation log appender for query id " + 
queryId, e);
+-      }
+-    }
+-  }
+-
+ }
+diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+index 0517dc0bbc..a28626c7e0 100644
+--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
++++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+@@ -3135,6 +3135,11 @@ public static enum ConfVars {
+     
HIVE_SERVER2_OPERATION_LOG_CLEANUP_DELAY("hive.server2.operation.log.cleanup.delay",
 "300s",
+       new TimeValidator(TimeUnit.SECONDS), "When a query is cancelled (via 
kill query, query timeout or triggers),\n" +
+       " operation logs gets cleaned up after this delay"),
++    
HIVE_SERVER2_OPERATION_LOG_PURGEPOLICY_TIMETOLIVE("hive.server2.operation.log.purgePolicy.timeToLive",
++            "60s", new TimeValidator(TimeUnit.SECONDS),
++            "Number of seconds the appender, which has been dynamically 
created by Log4J framework for the " +
++                    "operation log, should survive without having any events 
sent to it. For more details, check " +
++                    "Log4J's IdlePurgePolicy."),
+ 
+     // HS2 connections guard rails
+     
HIVE_SERVER2_LIMIT_CONNECTIONS_PER_USER("hive.server2.limit.connections.per.user",
 0,
+diff --git a/data/conf/hive-site.xml b/data/conf/hive-site.xml
+index 0c3adb4b0f..0057f71933 100644
+--- a/data/conf/hive-site.xml
++++ b/data/conf/hive-site.xml
+@@ -339,4 +339,8 @@
+   <value>false</value>
+ </property>
+ 
++  <property>
++    <name>hive.server2.operation.log.purgePolicy.timeToLive</name>
++    <value>5s</value>
++  </property>
+ </configuration>
+diff --git a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java 
b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
+index b5e8b95744..76d7f458d0 100644
+--- a/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
++++ b/ql/src/java/org/apache/hadoop/hive/ql/log/LogDivertAppender.java
+@@ -17,6 +17,7 @@
+  */
+ package org.apache.hadoop.hive.ql.log;
+ 
++import java.util.concurrent.TimeUnit;
+ import java.util.regex.Pattern;
+ 
+ import org.apache.hadoop.hive.common.LogUtils;
+@@ -27,6 +28,8 @@
+ import org.apache.logging.log4j.LogManager;
+ import org.apache.logging.log4j.core.LogEvent;
+ import org.apache.logging.log4j.core.LoggerContext;
++import org.apache.logging.log4j.core.appender.routing.IdlePurgePolicy;
++import org.apache.logging.log4j.core.appender.routing.PurgePolicy;
+ import org.apache.logging.log4j.core.appender.routing.Route;
+ import org.apache.logging.log4j.core.appender.routing.Routes;
+ import org.apache.logging.log4j.core.appender.routing.RoutingAppender;
+@@ -235,12 +238,19 @@ public static void 
registerRoutingAppender(org.apache.hadoop.conf.Configuration
+     LoggerContext context = (LoggerContext) LogManager.getContext(false);
+     Configuration configuration = context.getConfiguration();
+ 
++    String timeToLive = String.valueOf(HiveConf
++            .getTimeVar(conf, 
HiveConf.ConfVars.HIVE_SERVER2_OPERATION_LOG_PURGEPOLICY_TIMETOLIVE, 
TimeUnit.SECONDS));
++    PurgePolicy purgePolicy = IdlePurgePolicy.createPurgePolicy(timeToLive, 
null, "SECONDS", configuration);
++    // Hack: due to the (non-standard) way that log4j configuration is 
extended to introduce the routing appender
++    // the life-cycle methods are not called as expected leading to 
initialization problems (such as the scheduler)
++    configuration.getScheduler().incrementScheduledItems();
++
+     RoutingAppender routingAppender = 
RoutingAppender.createAppender(QUERY_ROUTING_APPENDER,
+         "true",
+         routes,
+         configuration,
+         null,
+-        null,
++        purgePolicy,
+         null);
+ 
+     LoggerConfig loggerConfig = configuration.getRootLogger();
+diff --git 
a/service/src/java/org/apache/hive/service/cli/operation/Operation.java 
b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
+index d711a36569..7098970a39 100644
+--- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java
++++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
+@@ -34,8 +34,6 @@
+ import org.apache.hadoop.hive.common.metrics.common.MetricsScope;
+ import org.apache.hadoop.hive.conf.HiveConf;
+ import org.apache.hadoop.hive.ql.QueryState;
+-import org.apache.hadoop.hive.ql.log.LogDivertAppender;
+-import org.apache.hadoop.hive.ql.log.LogDivertAppenderForTest;
+ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+ import org.apache.hadoop.hive.ql.session.OperationLog;
+ import org.apache.hive.service.cli.FetchOrientation;
+@@ -268,10 +266,6 @@ public void run() {
+   }
+ 
+   protected synchronized void cleanupOperationLog(final long 
operationLogCleanupDelayMs) {
+-    // stop the appenders for the operation log
+-    String queryId = queryState.getQueryId();
+-    LogUtils.stopQueryAppender(LogDivertAppender.QUERY_ROUTING_APPENDER, 
queryId);
+-    
LogUtils.stopQueryAppender(LogDivertAppenderForTest.TEST_QUERY_ROUTING_APPENDER,
 queryId);
+     if (isOperationLogEnabled) {
+       if (opHandle == null) {
+         LOG.warn("Operation seems to be in invalid state, opHandle is null");


Reply via email to