This is an automated email from the ASF dual-hosted git repository.
zabetak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 3de78df9042 HIVE-26404: HMS memory leak when compaction cleaner fails
to remove obsolete files (Stamatis Zampetakis reviewed by Denys Kuzmenko)
3de78df9042 is described below
commit 3de78df9042e4d364aea019b9a16691bcf51ea9b
Author: Stamatis Zampetakis <[email protected]>
AuthorDate: Fri Aug 5 19:59:41 2022 +0300
HIVE-26404: HMS memory leak when compaction cleaner fails to remove
obsolete files (Stamatis Zampetakis reviewed by Denys Kuzmenko)
Closes #3514
---
.../ql/txn/compactor/TestCleanerWithSecureDFS.java | 184 +++++++++++++++++++++
.../txn/compactor/TestCleanerWithReplication.java | 21 +--
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 19 ++-
.../hive/ql/txn/compactor/CompactorTest.java | 28 ++--
4 files changed, 213 insertions(+), 39 deletions(-)
diff --git
a/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java
b/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java
new file mode 100644
index 00000000000..ecf8472ea99
--- /dev/null
+++
b/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.internal.util.reflection.FieldSetter;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static
org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTP_POLICY_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY;
+import static
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
+
+public class TestCleanerWithSecureDFS extends CompactorTest {
+ private static final Path KEYSTORE_DIR =
+ Paths.get(System.getProperty("test.tmp.dir"), "kdc_root_dir" +
UUID.randomUUID());
+ private static final String SUPER_USER_NAME = "hdfs";
+ private static final Path SUPER_USER_KEYTAB =
KEYSTORE_DIR.resolve(SUPER_USER_NAME + ".keytab");
+
+ private static MiniDFSCluster dfsCluster = null;
+ private static MiniKdc kdc = null;
+ private static HiveConf secureConf = null;
+
+ private static MiniKdc initKDC() {
+ try {
+ MiniKdc kdc = new MiniKdc(MiniKdc.createConf(), KEYSTORE_DIR.toFile());
+ kdc.start();
+ kdc.createPrincipal(SUPER_USER_KEYTAB.toFile(), SUPER_USER_NAME +
"/localhost", "HTTP/localhost");
+ return kdc;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static MiniDFSCluster initDFS(Configuration c) {
+ try {
+ MiniDFSCluster cluster = new
MiniDFSCluster.Builder(c).numDataNodes(1).skipFsyncForTesting(true).build();
+ cluster.waitActive();
+ return cluster;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @BeforeClass
+ public static void startCluster() throws Exception {
+ kdc = initKDC();
+ secureConf = createSecureDFSConfig(kdc);
+ dfsCluster = initDFS(secureConf);
+ }
+
+ @AfterClass
+ public static void stopCluster() {
+ secureConf = null;
+ try {
+ if (dfsCluster != null) {
+ dfsCluster.close();
+ }
+ } finally {
+ dfsCluster = null;
+ if (kdc != null) {
+ kdc.stop();
+ }
+ kdc = null;
+ }
+ }
+
+ @Override
+ public void setup() throws Exception {
+ HiveConf conf = new HiveConf(secureConf);
+ conf.set("fs.defaultFS", dfsCluster.getFileSystem().getUri().toString());
+ setup(new HiveConf(secureConf));
+ }
+
+ private static HiveConf createSecureDFSConfig(MiniKdc kdc) throws Exception {
+ HiveConf conf = new HiveConf();
+
SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS,
conf);
+ String suPrincipal = SUPER_USER_NAME + "/localhost@" + kdc.getRealm();
+ String suKeyTab = SUPER_USER_KEYTAB.toAbsolutePath().toString();
+ conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, suPrincipal);
+ conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, suKeyTab);
+ conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, suPrincipal);
+ conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, suKeyTab);
+ conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, "HTTP/localhost@"
+ kdc.getRealm());
+ conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+ conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY,
"authentication,integrity,privacy");
+ conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());
+ conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");
+ conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");
+ conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);
+ conf.set("hadoop.proxyuser.hdfs.groups", "*");
+ conf.set("hadoop.proxyuser.hdfs.hosts", "*");
+
+ String sslConfDir =
KeyStoreTestUtil.getClasspathDir(TestCleanerWithSecureDFS.class);
+ KeyStoreTestUtil.setupSSLConfig(KEYSTORE_DIR.toAbsolutePath().toString(),
sslConfDir, conf, false);
+ return conf;
+ }
+
+ @Test
+ public void testLeakAfterHistoryException() throws Exception {
+ final String tblNamePrefix = "tbl_hive_26404_";
+ // Generate compaction requests that will fail cause the base file will
not meet the expectations
+ for (int i = 0; i < 5; i++) {
+ Table t = newTable("default", tblNamePrefix + i, false);
+ CompactionRequest rqst = new CompactionRequest("default",
t.getTableName(), CompactionType.MAJOR);
+ // The requests should come from different users in order to trigger the
leak
+ rqst.setRunas("user" + i);
+ long cTxn = compactInTxn(rqst);
+ // We need at least a base compacted file in order to trigger the
exception.
+ // It must not be a valid base otherwise the cleaner will not throw.
+ addBaseFile(t, null, 1, 1, cTxn);
+ }
+ Cleaner cleaner = new Cleaner();
+ // Create a big configuration, (by adding lots of properties) to trigger
the memory
+ // leak fast. The problem can still be reproduced with small
configurations but it requires more
+ // compaction requests to fail and its not practical to have in a unit
test.
+ // The size of the configuration, measured by taking heapdump and
inspecting the objects, is
+ // roughly 190MB.
+ HiveConf cleanerConf = new HiveConf(conf);
+ for (int i = 0; i < 1_000_000; i++) {
+ cleanerConf.set("hive.random.property.with.id." + i,
Integer.toString(i));
+ }
+ cleaner.setConf(cleanerConf);
+ cleaner.init(new AtomicBoolean(true));
+ FieldSetter.setField(cleaner,
MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), txnHandler);
+ Runtime.getRuntime().gc();
+ long startMem = Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory();
+ cleaner.run();
+ Runtime.getRuntime().gc();
+ long endMem = Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory();
+ long diffMem = Math.abs(endMem - startMem);
+ // 5 failed compactions X 190MB leak per config ~ 1GB leak
+ // Depending on the Xmx value the leak may lead to OOM; if you definitely
want to see the OOM
+ // increase the size of the configuration or the number of failed
compactions.
+ Assert.assertTrue("Allocated memory, " + diffMem + "bytes , exceeds
acceptable variance of 250MB.",
+ diffMem < 250_000_000);
+ }
+
+ @Override
+ boolean useHive130DeltaDirName() {
+ return false;
+ }
+}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
index f38eb8b02bb..4a7bb34bad1 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
@@ -18,11 +18,9 @@
package org.apache.hadoop.hive.ql.txn.compactor;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.Partition;
@@ -31,10 +29,7 @@ import
org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.Table;
import static
org.apache.hadoop.hive.common.repl.ReplConst.SOURCE_OF_REPLICATION;
import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.shims.Utils;
import org.junit.After;
import org.junit.AfterClass;
@@ -44,34 +39,25 @@ import org.junit.BeforeClass;
import org.junit.Test;
import javax.security.auth.login.LoginException;
-import java.io.File;
import java.io.IOException;
-import java.nio.file.Files;
import static org.junit.Assert.assertEquals;
public class TestCleanerWithReplication extends CompactorTest {
private Path cmRootDirectory;
- private static FileSystem fs;
private static MiniDFSCluster miniDFSCluster;
private final String dbName = "TestCleanerWithReplication";
@Before
public void setup() throws Exception {
- conf = new HiveConf();
- TestTxnDbUtil.setConfValues(conf);
- TestTxnDbUtil.cleanDb(conf);
- conf.set("fs.defaultFS", fs.getUri().toString());
+ HiveConf conf = new HiveConf();
+ conf.set("fs.defaultFS",
miniDFSCluster.getFileSystem().getUri().toString());
conf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
- MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
- TestTxnDbUtil.prepDb(conf);
- ms = new HiveMetaStoreClient(conf);
- txnHandler = TxnUtils.getTxnStore(conf);
+ setup(conf);
cmRootDirectory = new Path(conf.get(HiveConf.ConfVars.REPLCMDIR.varname));
if (!fs.exists(cmRootDirectory)) {
fs.mkdirs(cmRootDirectory);
}
- tmpdir = new
File(Files.createTempDirectory("compactor_test_table_").toString());
Database db = new Database();
db.putToParameters(SOURCE_OF_REPLICATION, "1,2,3");
db.setName(dbName);
@@ -85,7 +71,6 @@ public class TestCleanerWithReplication extends CompactorTest
{
hadoopConf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() +
".hosts", "*");
miniDFSCluster =
new
MiniDFSCluster.Builder(hadoopConf).numDataNodes(2).format(true).build();
- fs = miniDFSCluster.getFileSystem();
}
@After
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 196ee478370..2dc2fa873ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -262,15 +262,18 @@ public class Cleaner extends MetaStoreCompactorThread {
LOG.info("Cleaning as user " + ci.runAs + " for " +
ci.getFullPartitionName());
UserGroupInformation ugi =
UserGroupInformation.createProxyUser(ci.runAs,
UserGroupInformation.getLoginUser());
- ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
- removedFiles.value = cleanUpTask.call();
- return null;
- });
try {
- FileSystem.closeAllForUGI(ugi);
- } catch (IOException exception) {
- LOG.error("Could not clean up file-system handles for UGI: " + ugi +
" for " +
- ci.getFullPartitionName() + idWatermark(ci), exception);
+ ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+ removedFiles.value = cleanUpTask.call();
+ return null;
+ });
+ } finally {
+ try {
+ FileSystem.closeAllForUGI(ugi);
+ } catch (IOException exception) {
+ LOG.error("Could not clean up file-system handles for UGI: " + ugi
+ " for " +
+ ci.getFullPartitionName() + idWatermark(ci), exception);
+ }
}
}
if (removedFiles.value || isDynPartAbort(t, ci)) {
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 78cf8617f44..08ad89b20c7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -84,10 +83,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
-import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
-import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -99,6 +96,7 @@ import java.util.Stack;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import
org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtilities.CompactorThreadType;
@@ -114,21 +112,24 @@ public abstract class CompactorTest {
static final private String CLASS_NAME = CompactorTest.class.getName();
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
public static final String WORKER_VERSION =
HiveVersionInfo.getShortVersion();
+ private static final AtomicInteger TMP_DIR_ID = new AtomicInteger();
protected TxnStore txnHandler;
protected IMetaStoreClient ms;
protected HiveConf conf;
private final AtomicBoolean stop = new AtomicBoolean();
- protected File tmpdir;
-
+ private Path tmpdir;
+ FileSystem fs;
+
@Before
public void setup() throws Exception {
setup(new HiveConf());
}
- protected void setup(HiveConf conf) throws Exception {
+ protected final void setup(HiveConf conf) throws Exception {
this.conf = conf;
+ fs = FileSystem.get(conf);
MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT,
2, TimeUnit.SECONDS);
MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
TestTxnDbUtil.setConfValues(conf);
@@ -136,11 +137,13 @@ public abstract class CompactorTest {
TestTxnDbUtil.prepDb(conf);
ms = new HiveMetaStoreClient(conf);
txnHandler = TxnUtils.getTxnStore(conf);
- tmpdir = new
File(Files.createTempDirectory("compactor_test_table_").toString());
+ Path tmpPath = new Path(System.getProperty("test.tmp.dir"),
"compactor_test_table_" + TMP_DIR_ID.getAndIncrement());
+ fs.mkdirs(tmpPath);
+ tmpdir = fs.resolvePath(tmpPath);
}
protected void compactorTestCleanup() throws IOException {
- FileUtils.deleteDirectory(tmpdir);
+ fs.delete(tmpdir, true);
}
protected void startInitiator() throws Exception {
@@ -380,12 +383,11 @@ public abstract class CompactorTest {
}
private String getLocation(String tableName, String partValue) {
- String location = tmpdir.getAbsolutePath() +
- System.getProperty("file.separator") + tableName;
+ Path tblLocation = new Path(tmpdir, tableName);
if (partValue != null) {
- location += System.getProperty("file.separator") + "ds=" + partValue;
+ tblLocation = new Path(tblLocation, "ds=" + partValue);
}
- return location;
+ return tblLocation.toString();
}
private enum FileType {BASE, DELTA, LEGACY, LENGTH_FILE}
@@ -673,7 +675,7 @@ public abstract class CompactorTest {
findNextCompactRequest.setWorkerId("fred");
findNextCompactRequest.setWorkerVersion(WORKER_VERSION);
CompactionInfo ci = txnHandler.findNextToCompact(findNextCompactRequest);
- ci.runAs = System.getProperty("user.name");
+ ci.runAs = rqst.getRunas() == null ? System.getProperty("user.name") :
rqst.getRunas();
long compactorTxnId = openTxn(TxnType.COMPACTION);
// Need to create a valid writeIdList to set the highestWriteId in ci
ValidTxnList validTxnList =
TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), compactorTxnId);