This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a107484 HIVE-24173:notification cleanup interval value changes
depending upon replication enabled or not.(Arko Sharma, reviewed by Aasha Medhi)
a107484 is described below
commit a107484136a4db3f6c518d7df0b5e0a16b752e26
Author: Anishek Agarwal <[email protected]>
AuthorDate: Tue Nov 10 11:46:13 2020 +0530
HIVE-24173:notification cleanup interval value changes depending upon
replication enabled or not.(Arko Sharma, reviewed by Aasha Medhi)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 +-
.../hcatalog/listener/DbNotificationListener.java | 56 +++++++-
.../ql/parse/TestReplWithJsonMessageFormat.java | 7 +-
.../hive/ql/parse/TestReplicationScenarios.java | 142 +++++++++++++++++++++
.../hadoop/hive/metastore/conf/MetastoreConf.java | 11 +-
5 files changed, 209 insertions(+), 12 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7897a9a..94da886 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -511,7 +511,7 @@ public class HiveConf extends Configuration {
"Turn on ChangeManager, so delete files will go to cmrootdir."),
REPLCMDIR("hive.repl.cmrootdir","/user/${system:user.name}/cmroot/",
"Root dir for ChangeManager, used for deleted files."),
- REPLCMRETIAN("hive.repl.cm.retain","7d",
+ REPLCMRETIAN("hive.repl.cm.retain","10d",
new TimeValidator(TimeUnit.DAYS),
"Time to retain removed files in cmrootdir."),
REPLCMENCRYPTEDDIR("hive.repl.cm.encryptionzone.rootdir", ".cmroot",
@@ -1302,7 +1302,8 @@ public class HiveConf extends Configuration {
@Deprecated
METASTORE_EVENT_DB_LISTENER_TTL("hive.metastore.event.db.listener.timetolive",
"86400s",
new TimeValidator(TimeUnit.SECONDS),
- "time after which events will be removed from the database listener
queue"),
+ "time after which events will be removed from the database listener
queue when repl.cm.enabled \n" +
+ "is set to false. When repl.cm.enabled is set to true,
repl.event.db.listener.timetolive is used instead"),
/**
* @deprecated Use MetastoreConf.EVENT_DB_NOTIFICATION_API_AUTH
diff --git
a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index d7757e6..a6fbeb8 100644
---
a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++
b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -30,11 +30,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
import org.apache.hadoop.hive.metastore.MetaStoreEventListenerConstants;
import org.apache.hadoop.hive.metastore.RawStore;
@@ -161,6 +163,15 @@ public class DbNotificationListener extends
TransactionalMetaStoreEventListener
}
}
+ @VisibleForTesting
+ public static synchronized void resetCleaner(HiveConf conf) throws Exception
{
+ if(cleaner != null &&
conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)){
+ cleaner.stopRun();
+ cleaner = null;
+ init(conf);
+ }
+ }
+
public DbNotificationListener(Configuration config) throws MetaException {
super(config);
conf = config;
@@ -184,8 +195,31 @@ public class DbNotificationListener extends
TransactionalMetaStoreEventListener
TimeUnit.SECONDS);
MetastoreConf.setTimeVar(getConf(),
MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, time,
TimeUnit.SECONDS);
- cleaner.setTimeToLive(MetastoreConf.getTimeVar(getConf(),
- MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, TimeUnit.SECONDS));
+ boolean isReplEnabled = MetastoreConf.getBoolVar(getConf(),
ConfVars.REPLCMENABLED);
+ if(!isReplEnabled){
+ cleaner.setTimeToLive(MetastoreConf.getTimeVar(getConf(),
ConfVars.EVENT_DB_LISTENER_TTL,
+ TimeUnit.SECONDS));
+ }
+ } else if (key.equals(ConfVars.REPL_EVENT_DB_LISTENER_TTL.toString()) ||
+ key.equals(ConfVars.REPL_EVENT_DB_LISTENER_TTL.getHiveName())) {
+ long time = MetastoreConf.convertTimeStr(tableEvent.getNewValue(),
TimeUnit.SECONDS,
+ TimeUnit.SECONDS);
+ boolean isReplEnabled = MetastoreConf.getBoolVar(getConf(),
ConfVars.REPLCMENABLED);
+ if(isReplEnabled){
+ cleaner.setTimeToLive(time);
+ }
+ }
+
+ if (key.equals(ConfVars.REPLCMENABLED.toString()) ||
key.equals(ConfVars.REPLCMENABLED.getHiveName())) {
+ boolean isReplEnabled = MetastoreConf.getBoolVar(conf,
ConfVars.REPLCMENABLED);
+ if(isReplEnabled){
+ cleaner.setTimeToLive(MetastoreConf.getTimeVar(conf,
ConfVars.REPL_EVENT_DB_LISTENER_TTL,
+ TimeUnit.SECONDS));
+ }
+ else {
+ cleaner.setTimeToLive(MetastoreConf.getTimeVar(conf,
ConfVars.EVENT_DB_LISTENER_TTL,
+ TimeUnit.SECONDS));
+ }
}
if (key.equals(ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL.toString()) ||
@@ -1190,13 +1224,21 @@ public class DbNotificationListener extends
TransactionalMetaStoreEventListener
private static class CleanerThread extends Thread {
private RawStore rs;
private int ttl;
+ private boolean shouldRun = true;
private long sleepTime;
CleanerThread(Configuration conf, RawStore rs) {
super("DB-Notification-Cleaner");
this.rs = rs;
- setTimeToLive(MetastoreConf.getTimeVar(conf,
ConfVars.EVENT_DB_LISTENER_TTL,
- TimeUnit.SECONDS));
+ boolean isReplEnabled = MetastoreConf.getBoolVar(conf,
ConfVars.REPLCMENABLED);
+ if(isReplEnabled){
+ setTimeToLive(MetastoreConf.getTimeVar(conf,
ConfVars.REPL_EVENT_DB_LISTENER_TTL,
+ TimeUnit.SECONDS));
+ }
+ else {
+ setTimeToLive(MetastoreConf.getTimeVar(conf,
ConfVars.EVENT_DB_LISTENER_TTL,
+ TimeUnit.SECONDS));
+ }
setCleanupInterval(MetastoreConf.getTimeVar(conf,
ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL,
TimeUnit.MILLISECONDS));
setDaemon(true);
@@ -1204,7 +1246,7 @@ public class DbNotificationListener extends
TransactionalMetaStoreEventListener
@Override
public void run() {
- while (true) {
+ while (shouldRun) {
try {
rs.cleanNotificationEvents(ttl);
rs.cleanWriteNotificationEvents(ttl);
@@ -1235,5 +1277,9 @@ public class DbNotificationListener extends
TransactionalMetaStoreEventListener
sleepTime = configInterval;
}
+ @VisibleForTesting
+ private synchronized void stopRun(){
+ shouldRun = false;
+ }
}
}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
index 4d94975..19a56de 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplWithJsonMessageFormat.java
@@ -25,6 +25,7 @@ import org.junit.Rule;
import org.junit.rules.TestRule;
import java.util.ArrayList;
+import java.util.List;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -32,8 +33,10 @@ import java.util.Map;
public class TestReplWithJsonMessageFormat extends TestReplicationScenarios {
@Rule
public TestRule replV1BackwardCompatibleRule =
- new ReplicationV1CompatRule(metaStoreClient, hconf,
- new ArrayList<>(Collections.singletonList("testEventFilters")));
+ new ReplicationV1CompatRule(metaStoreClient, hconf, new
ArrayList<String>() {{
+ add("testEventFilters");
+ add("testReplConfiguredCleanupOfNotificationEvents");
+ }});
@BeforeClass
public static void setUpBeforeClass() throws Exception {
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 27e97b9..6028282 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.hcatalog.listener.DbNotificationListener;
import org.apache.hadoop.hive.metastore.*;
import
org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
@@ -102,6 +103,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
import java.util.Base64;
import static
org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
@@ -2162,6 +2164,146 @@ public class TestReplicationScenarios {
}
@Test
+ public void testReplConfiguredCleanupOfNotificationEvents() throws Exception
{
+
+ boolean verifySetupOriginal = verifySetupSteps;
+ verifySetupSteps = true;
+ final int cleanerTtlSeconds = 1;
+ final int cleanerIntervalSeconds = 1;
+ String nameOfTest = testName.getMethodName();
+ String dbName = createDB(nameOfTest, driver);
+ String replDbName = dbName + "_dupe";
+
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE",
driver);
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int)
STORED AS TEXTFILE", driver);
+
+ //bootstrap
+ bootstrapLoadAndVerify(dbName, replDbName);
+
+ String[] unptnData = new String[] {"eleven", "twelve"};
+ String[] ptnData1 = new String[] {"thirteen", "fourteen", "fifteen"};
+ String[] ptnData2 = new String[] {"fifteen", "sixteen", "seventeen"};
+ String[] empty = new String[] {};
+
+ String unptnLocn = new Path(TEST_PATH, nameOfTest +
"_unptn").toUri().getPath();
+ String ptnLocn1 = new Path(TEST_PATH, nameOfTest +
"_ptn1").toUri().getPath();
+ String ptnLocn2 = new Path(TEST_PATH, nameOfTest +
"_ptn2").toUri().getPath();
+
+ createTestDataFile(unptnLocn, unptnData);
+ createTestDataFile(ptnLocn1, ptnData1);
+ createTestDataFile(ptnLocn2, ptnData2);
+
+ run("LOAD DATA LOCAL INPATH '" + unptnLocn + "' OVERWRITE INTO TABLE " +
dbName + ".unptned", driver);
+ verifySetup("SELECT * from " + dbName + ".unptned", unptnData, driver);
+ run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName +
".unptned", driver);
+ run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " +
dbName + ".unptned", driver);
+ verifySetup("SELECT * from " + dbName + ".unptned_late", unptnData,
driver);
+
+ // CM was enabled during setup, REPL_EVENT_DB_LISTENER_TTL should be used,
set the other one to a low value
+ MetastoreConf.setTimeVar(hconf,
MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, cleanerTtlSeconds,
TimeUnit.SECONDS);
+ MetastoreConf.setTimeVar(hconf,
MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL,
cleanerIntervalSeconds, TimeUnit.SECONDS);
+ DbNotificationListener.resetCleaner(hconf);
+
+ //sleep to ensure correct conf(REPL_EVENT_DB_LISTENER_TTL) is used
+ try {
+ Thread.sleep(cleanerIntervalSeconds * 1000 * 10);
+ } catch (InterruptedException e) {
+ LOG.warn("Sleep unsuccesful", e);
+ }
+
+ //verify events get replicated
+ Tuple incrDump = replDumpDb(dbName);
+ loadAndVerify(replDbName, dbName, incrDump.lastReplId);
+ verifyRun("SELECT * from " + replDbName + ".unptned", unptnData,
driverMirror);
+ verifyRun("SELECT * from " + replDbName + ".unptned_late", unptnData,
driverMirror);
+
+
+ // For next run, CM is enabled, set REPL_EVENT_DB_LISTENER_TTL to low
value for events to get deleted
+ MetastoreConf.setTimeVar(hconf,
MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, cleanerTtlSeconds * 60 * 60,
TimeUnit.SECONDS);
+ MetastoreConf.setTimeVar(hconf,
MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds ,
TimeUnit.SECONDS);
+ DbNotificationListener.resetCleaner(hconf);
+
+ run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver);
+ run("LOAD DATA LOCAL INPATH '" + ptnLocn1 + "' OVERWRITE INTO TABLE " +
dbName
+ + ".ptned PARTITION(b=1)", driver);
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptnData1,
driver);
+ run("LOAD DATA LOCAL INPATH '" + ptnLocn2 + "' OVERWRITE INTO TABLE " +
dbName
+ + ".ptned PARTITION(b=2)", driver);
+ verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptnData2,
driver);
+
+ try {
+ Thread.sleep(cleanerIntervalSeconds * 1000 * 10);
+ } catch (InterruptedException e) {
+ LOG.warn("Sleep unsuccesful", e);
+ }
+
+ incrDump = replDumpDb(dbName);
+
+ // expected empty data because REPL_EVENT_DB_LISTENER_TTL should have been
exceeded before dump
+ loadAndVerify(replDbName, dbName, incrDump.lastReplId);
+ verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", empty,
driverMirror);
+ verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", empty,
driverMirror);
+
+ // With CM disabled, EVENT_DB_LISTENER_TTL should be used.
+ // First check with high ttl
+ MetastoreConf.setBoolVar(hconf, MetastoreConf.ConfVars.REPLCMENABLED,
false);
+ MetastoreConf.setTimeVar(hconf,
MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, cleanerTtlSeconds * 60 * 60,
TimeUnit.SECONDS);
+ MetastoreConf.setTimeVar(hconf,
MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds,
TimeUnit.SECONDS);
+ DbNotificationListener.resetCleaner(hconf);
+
+ run("CREATE TABLE " + dbName
+ + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS
TEXTFILE", driver);
+ run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a
FROM " + dbName
+ + ".ptned WHERE b=1", driver);
+ verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptnData1,
driver);
+
+
+ //sleep to ensure correct conf(EVENT_DB_LISTENER_TTL) is used
+ try {
+ Thread.sleep(cleanerIntervalSeconds * 1000 * 10);
+ } catch (InterruptedException e) {
+ LOG.warn("Sleep unsuccesful", e);
+ }
+
+ //check replication success
+ incrDump = replDumpDb(dbName);
+ loadAndVerify(replDbName, dbName, incrDump.lastReplId);
+ verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1",
ptnData1, driverMirror);
+
+
+ //With CM disabled, set a low ttl for events to get deleted
+ MetastoreConf.setBoolVar(hconf, MetastoreConf.ConfVars.REPLCMENABLED,
false);
+ MetastoreConf.setTimeVar(hconf,
MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, cleanerTtlSeconds,
TimeUnit.SECONDS);
+ MetastoreConf.setTimeVar(hconf,
MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, cleanerTtlSeconds * 60 *
60, TimeUnit.SECONDS);
+ DbNotificationListener.resetCleaner(hconf);
+
+ run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a
FROM " + dbName
+ + ".ptned WHERE b=2", driver);
+ verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptnData2,
driver);
+
+
+ try {
+ Thread.sleep(cleanerIntervalSeconds * 1000 * 10);
+ } catch (InterruptedException e) {
+ LOG.warn("Sleep unsuccesful", e);
+ }
+
+ //events should be deleted before dump
+ incrDump = replDumpDb(dbName);
+ loadAndVerify(replDbName, dbName, incrDump.lastReplId);
+ verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", empty,
driverMirror);
+
+
+ //restore original values
+ MetastoreConf.setBoolVar(hconf, MetastoreConf.ConfVars.REPLCMENABLED,
true);
+ MetastoreConf.setTimeVar(hconf,
MetastoreConf.ConfVars.EVENT_DB_LISTENER_TTL, 86400, TimeUnit.SECONDS);
+ MetastoreConf.setTimeVar(hconf,
MetastoreConf.ConfVars.REPL_EVENT_DB_LISTENER_TTL, 864000, TimeUnit.SECONDS);
+ MetastoreConf.setTimeVar(hconf,
MetastoreConf.ConfVars.EVENT_DB_LISTENER_CLEAN_INTERVAL, 7200,
TimeUnit.SECONDS);
+ DbNotificationListener.resetCleaner(hconf);
+ verifySetupSteps = verifySetupOriginal;
+ }
+
+ @Test
public void testIncrementalInsertToPartition() throws IOException {
String testName = "incrementalInsertToPartition";
String dbName = createDB(testName, driver);
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index cdbe919..acf5e25 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -600,8 +600,9 @@ public class MetastoreConf {
+ "present in HMS Notification. Any key-value pair whose key is
matched with any regex will"
+" be removed from Parameters map during Serialization of
Table/Partition object."),
EVENT_DB_LISTENER_TTL("metastore.event.db.listener.timetolive",
- "hive.metastore.event.db.listener.timetolive", 7, TimeUnit.DAYS,
- "time after which events will be removed from the database listener
queue"),
+ "hive.metastore.event.db.listener.timetolive", 1, TimeUnit.DAYS,
+ "time after which events will be removed from the database listener
queue when repl.cm.enabled \n" +
+ "is set to false. When set to true, the conf
repl.event.db.listener.timetolive is used instead."),
EVENT_CLEAN_MAX_EVENTS("metastore.event.db.clean.maxevents",
"hive.metastore.event.db.clean.maxevents", 10000,
"Limit on number events to be cleaned at a time in metastore
cleanNotificationEvents " +
@@ -975,7 +976,7 @@ public class MetastoreConf {
REPLCMFALLBACKNONENCRYPTEDDIR("metastore.repl.cm.nonencryptionzone.rootdir",
"hive.repl.cm.nonencryptionzone.rootdir", "",
"Root dir for ChangeManager for non encrypted paths if
hive.repl.cmrootdir is encrypted."),
- REPLCMRETIAN("metastore.repl.cm.retain", "hive.repl.cm.retain", 24,
TimeUnit.HOURS,
+ REPLCMRETIAN("metastore.repl.cm.retain", "hive.repl.cm.retain", 24 * 10,
TimeUnit.HOURS,
"Time to retain removed files in cmrootdir."),
REPLCMINTERVAL("metastore.repl.cm.interval", "hive.repl.cm.interval",
3600, TimeUnit.SECONDS,
"Inteval for cmroot cleanup thread."),
@@ -991,6 +992,10 @@ public class MetastoreConf {
"hive.exec.copyfile.maxsize", 32L * 1024 * 1024 /*32M*/,
"Maximum file size (in bytes) that Hive uses to do single HDFS copies
between directories." +
"Distributed copies (distcp) will be used instead for bigger files
so that copies can be done faster."),
+ REPL_EVENT_DB_LISTENER_TTL("metastore.repl.event.db.listener.timetolive",
+ "hive.repl.event.db.listener.timetolive", 10, TimeUnit.DAYS,
+ "time after which events will be removed from the database
listener queue when repl.cm.enabled \n" +
+ "is set to true. When set to false, the conf
event.db.listener.timetolive is used instead."),
REPL_METRICS_CACHE_MAXSIZE("metastore.repl.metrics.cache.maxsize",
"hive.repl.metrics.cache.maxsize", 10000 /*10000 rows */,
"Maximum in memory cache size to collect replication metrics. The
metrics will be pushed to persistent"