This is an automated email from the ASF dual-hosted git repository.
mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 0603700 HIVE-21960 : Disable HMS tasks on replica databases.
(Ashutosh Bapat reviewed by Mahesh Kumar Behera)
0603700 is described below
commit 0603700395827acdd819460fe110e35fe7c59f4a
Author: Ashutosh Bapat <[email protected]>
AuthorDate: Fri Aug 2 10:14:54 2019 +0530
HIVE-21960 : Disable HMS tasks on replica databases. (Ashutosh Bapat
reviewed by Mahesh Kumar Behera)
Signed-off-by: Mahesh Kumar Behera <[email protected]>
---
.../parse/BaseReplicationScenariosAcidTables.java | 9 ++-
.../parse/TestReplicationScenariosAcidTables.java | 6 +-
.../TestReplicationScenariosAcrossInstances.java | 24 ++++--
.../TestReplicationScenariosExternalTables.java | 42 ++++++----
.../parse/TestTableLevelReplicationScenarios.java | 12 ++-
.../hadoop/hive/ql/parse/WarehouseInstance.java | 18 +++++
.../ddl/table/creation/CreateTableOperation.java | 9 +++
.../hadoop/hive/ql/parse/ReplicationSpec.java | 5 +-
.../hadoop/hive/ql/stats/StatsUpdaterThread.java | 14 +++-
.../hive/ql/stats/TestStatsUpdaterThread.java | 75 +++++++++++++++++-
.../apache/hadoop/hive/common/repl/ReplConst.java | 7 ++
.../hive/metastore/PartitionManagementTask.java | 15 +++-
.../hive/metastore/TestPartitionManagement.java | 89 ++++++++++++++++++++++
13 files changed, 290 insertions(+), 35 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
index e543695..5e869d2 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
@@ -192,7 +192,8 @@ public class BaseReplicationScenariosAcidTables {
.run("show tables")
.verifyResults(tableNames)
.run("repl status " + replicatedDbName)
- .verifyResult(lastReplId);
+ .verifyResult(lastReplId)
+ .verifyReplTargetProperty(replicatedDbName);
verifyNonAcidTableLoad(replicatedDbName);
if (includeAcid) {
verifyAcidTableLoad(replicatedDbName);
@@ -295,7 +296,8 @@ public class BaseReplicationScenariosAcidTables {
.run("show tables")
.verifyResults(tableNames)
.run("repl status " + dbName)
- .verifyResult(lastReplId);
+ .verifyResult(lastReplId)
+ .verifyReplTargetProperty(replicatedDbName);
verifyIncNonAcidLoad(dbName);
verifyIncAcidLoad(dbName);
}
@@ -308,7 +310,8 @@ public class BaseReplicationScenariosAcidTables {
.run("show tables")
.verifyResults(tableNames)
.run("repl status " + dbName)
- .verifyResult(lastReplId);
+ .verifyResult(lastReplId)
+ .verifyReplTargetProperty(replicatedDbName);
verifyInc2NonAcidLoad(dbName);
verifyInc2AcidLoad(dbName);
}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 96b074d..e23fdd8 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -512,7 +512,8 @@ public class TestReplicationScenariosAcidTables extends
BaseReplicationScenarios
.run("repl status " + replicatedDbName)
.verifyResult("null")
.run("show tables like t2")
- .verifyResults(new String[] { });
+ .verifyResults(new String[] { })
+ .verifyReplTargetProperty(replicatedDbName);
// Retry with different dump should fail.
replica.loadFailure(replicatedDbName, tuple2.dumpLocation);
@@ -546,7 +547,8 @@ public class TestReplicationScenariosAcidTables extends
BaseReplicationScenarios
.run("select id from t1")
.verifyResults(Arrays.asList("1"))
.run("select name from t2 order by name")
- .verifyResults(Arrays.asList("bob", "carl"));
+ .verifyResults(Arrays.asList("bob", "carl"))
+ .verifyReplTargetProperty(replicatedDbName);
}
@Test
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index af5746f..46a6627 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -579,7 +579,10 @@ public class TestReplicationScenariosAcrossInstances
extends BaseReplicationAcro
.verifyResults(new String[] { "t1" })
.run("use " + dbTwo)
.run("show tables")
- .verifyResults(new String[] { "t1" });
+ .verifyResults(new String[] { "t1" })
+ .verifyReplTargetProperty(primaryDbName)
+ .verifyReplTargetProperty(dbOne)
+ .verifyReplTargetProperty(dbTwo);
/*
Start of cleanup
@@ -646,7 +649,10 @@ public class TestReplicationScenariosAcrossInstances
extends BaseReplicationAcro
.verifyResults(new String[] { "t1" })
.run("use " + dbOne)
.run("show tables")
- .verifyResults(new String[] { "t1" });
+ .verifyResults(new String[] { "t1" })
+ .verifyReplTargetProperty(primaryDbName)
+ .verifyReplTargetProperty(dbOne)
+ .verifyReplTargetProperty(dbTwo);
assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase("default").getParameters()));
assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(primaryDbName).getParameters()));
@@ -660,7 +666,10 @@ public class TestReplicationScenariosAcrossInstances
extends BaseReplicationAcro
.verifyResults(new String[] { "t1" })
.run("use " + dbOne)
.run("show tables")
- .verifyResults(new String[] { "t1", "t2" });
+ .verifyResults(new String[] { "t1", "t2" })
+ .verifyReplTargetProperty(primaryDbName)
+ .verifyReplTargetProperty(dbOne)
+ .verifyReplTargetProperty(dbTwo);
assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase("default").getParameters()));
assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(primaryDbName).getParameters()));
@@ -706,7 +715,8 @@ public class TestReplicationScenariosAcrossInstances
extends BaseReplicationAcro
.run("show tables")
.verifyResults(new String[] { "table1", "table2" })
.run("select * from table1")
- .verifyResults(new String[]{ "1" });
+ .verifyResults(new String[]{ "1" })
+ .verifyReplTargetProperty(replicatedDbName);
//////////// First Incremental ////////////
WarehouseInstance.Tuple incrementalOneTuple = primary
@@ -736,7 +746,8 @@ public class TestReplicationScenariosAcrossInstances
extends BaseReplicationAcro
.run("select * from table3")
.verifyResults(new String[] { "10" })
.run("show functions like '" + replicatedDbName + "%'")
- .verifyResult(replicatedDbName + ".testFunctionOne");
+ .verifyResult(replicatedDbName + ".testFunctionOne")
+ .verifyReplTargetProperty(replicatedDbName);
//////////// Second Incremental ////////////
WarehouseInstance.Tuple secondIncremental = primary
@@ -774,7 +785,8 @@ public class TestReplicationScenariosAcrossInstances
extends BaseReplicationAcro
.run("select * from table3")
.verifyResults(Collections.emptyList())
.run("show functions like '" + replicatedDbName + "%'")
- .verifyResult(null);
+ .verifyResult(null)
+ .verifyReplTargetProperty(replicatedDbName);
}
@Test
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index fbdbb01..e1802ad 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -99,7 +99,8 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
.run("show tables like 't1'")
.verifyFailure(new String[] { "t1" })
.run("show tables like 't2'")
- .verifyFailure(new String[] { "t2" });
+ .verifyFailure(new String[] { "t2" })
+ .verifyReplTargetProperty(replicatedDbName);
tuple = primary.run("use " + primaryDbName)
.run("create external table t3 (id int)")
@@ -114,7 +115,8 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables like 't3'")
- .verifyFailure(new String[] { "t3" });
+ .verifyFailure(new String[] { "t3" })
+ .verifyReplTargetProperty(replicatedDbName);
}
@Test
@@ -300,7 +302,8 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
.run("show tables like 't2'")
.verifyResults(new String[] { "t2" })
.run("select place from t2")
- .verifyResults(new String[] { "bangalore" });
+ .verifyResults(new String[] { "bangalore" })
+ .verifyReplTargetProperty(replicatedDbName);
assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName +
".t2");
@@ -325,7 +328,8 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
.run("select place from t2 where country='india'")
.verifyResults(new String[] { "bangalore", "pune", "mumbai" })
.run("select place from t2 where country='australia'")
- .verifyResults(new String[] { "sydney" });
+ .verifyResults(new String[] { "sydney" })
+ .verifyReplTargetProperty(replicatedDbName);
Path customPartitionLocation =
new Path("/" + testName.getMethodName() +
"/partition_data/t2/country=france");
@@ -345,7 +349,8 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
.run("use " + replicatedDbName)
.run("select place from t2 where country='france'")
- .verifyResults(new String[] { "paris" });
+ .verifyResults(new String[] { "paris" })
+ .verifyReplTargetProperty(replicatedDbName);
// change the location of the partition via alter command
String tmpLocation = "/tmp/" + System.nanoTime();
@@ -358,7 +363,8 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
.run("use " + replicatedDbName)
.run("select place from t2 where country='france'")
- .verifyResults(new String[] {});
+ .verifyResults(new String[] {})
+ .verifyReplTargetProperty(replicatedDbName);
// Changing location of one of the partitions shouldn't result in changing
location of other
// partitions as well as that of the table.
@@ -418,7 +424,8 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
.run("show partitions t1")
.verifyResults(new String[] { "country=india", "country=us" })
.run("select place from t1 order by place")
- .verifyResults(new String[] { "bangalore", "mumbai", "pune" });
+ .verifyResults(new String[] { "bangalore", "mumbai", "pune" })
+ .verifyReplTargetProperty(replicatedDbName);
// Delete one of the file and update another one.
fs.delete(new Path(partitionDir, "file.txt"), true);
@@ -438,7 +445,8 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
.run("show partitions t1")
.verifyResults(new String[] { "country=india", "country=us" })
.run("select place from t1 order by place")
- .verifyResults(new String[] { "chennai" });
+ .verifyResults(new String[] { "chennai" })
+ .verifyReplTargetProperty(replicatedDbName);
Hive hive = Hive.get(replica.getConf());
Set<Partition> partitions =
@@ -453,7 +461,8 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
replica.load(replicatedDbName, tuple.dumpLocation)
.run("select * From t1")
- .verifyResults(new String[] {});
+ .verifyResults(new String[] {})
+ .verifyReplTargetProperty(replicatedDbName);
for (String path : paths) {
assertTrue(replica.miniDFSCluster.getFileSystem().exists(new
Path(path)));
@@ -489,7 +498,8 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
.run("show tables like 't1'")
.verifyFailure(new String[] {"t1" })
.run("show tables like 't2'")
- .verifyFailure(new String[] {"t2" });
+ .verifyFailure(new String[] {"t2" })
+ .verifyReplTargetProperty(replicatedDbName);
dumpWithClause = Arrays.asList("'" +
HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
"'" +
HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'");
@@ -532,7 +542,8 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
.run("show tables like 't3'")
.verifyResult("t3")
.run("show tables like 't4'")
- .verifyResult("t4");
+ .verifyResult("t4")
+ .verifyReplTargetProperty(replicatedDbName);
// Ckpt should be set on bootstrapped tables.
replica.verifyIfCkptSetForTables(replicatedDbName, Arrays.asList("t2",
"t3"), tuple.dumpLocation);
@@ -551,7 +562,8 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
.run("select id from t3 order by id")
.verifyResults(Arrays.asList("10", "20"))
.run("select id from t4 order by id")
- .verifyResults(Arrays.asList("10", "20"));
+ .verifyResults(Arrays.asList("10", "20"))
+ .verifyReplTargetProperty(replicatedDbName);
}
@Test
@@ -580,7 +592,8 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
.run("show tables")
.verifyResult("t3")
.run("select id from t3")
- .verifyResult("1");
+ .verifyResult("1")
+ .verifyReplTargetProperty(replicatedDbName);
dumpWithClause = Arrays.asList("'" +
HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
"'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname +
"'='true'");
@@ -648,7 +661,8 @@ public class TestReplicationScenariosExternalTables extends
BaseReplicationAcros
.run("select id from t4")
.verifyResults(Arrays.asList("10", "20"))
.run("select id from t5")
- .verifyResult("10");
+ .verifyResult("10")
+ .verifyReplTargetProperty(replicatedDbName);
// Once the REPL LOAD is successful, the this config should be unset or
else, the subsequent REPL LOAD
// will also drop those tables which will cause data loss.
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
index 270e61a..78f505b 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
@@ -165,7 +165,8 @@ public class TestTableLevelReplicationScenarios extends
BaseReplicationScenarios
replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables")
- .verifyResults(expectedTables);
+ .verifyResults(expectedTables)
+ .verifyReplTargetProperty(replicatedDbName);
if (records == null) {
records = new String[] {"1"};
@@ -459,7 +460,8 @@ public class TestTableLevelReplicationScenarios extends
BaseReplicationScenarios
replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables")
- .verifyResults(replicatedTables);
+ .verifyResults(replicatedTables)
+ .verifyReplTargetProperty(replicatedDbName);
}
@Test
@@ -497,7 +499,8 @@ public class TestTableLevelReplicationScenarios extends
BaseReplicationScenarios
replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables")
- .verifyResults(incrementalReplicatedTables);
+ .verifyResults(incrementalReplicatedTables)
+ .verifyReplTargetProperty(replicatedDbName);
}
@Test
@@ -648,7 +651,8 @@ public class TestTableLevelReplicationScenarios extends
BaseReplicationScenarios
replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
.run("use " + replicatedDbName)
.run("show tables")
- .verifyResults(incrementalReplicatedTables);
+ .verifyResults(incrementalReplicatedTables)
+ .verifyReplTargetProperty(replicatedDbName);
}
@Test
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 6326bc3..5fbe48d 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreTestUtils;
@@ -447,6 +448,23 @@ public class WarehouseInstance implements Closeable {
}
}
+ // Make sure that every table in the target database is marked as target of
the replication.
+ // Stats updater task and partition management task skip processing tables
being replicated into.
+ private void verifyReplTargetProperty(Map<String, String> props) {
+ assertTrue(props.containsKey(ReplConst.REPL_TARGET_TABLE_PROPERTY));
+ }
+
+ public WarehouseInstance verifyReplTargetProperty(String dbName,
List<String> tblNames) throws Exception {
+ for (String tblName : tblNames) {
+ verifyReplTargetProperty(getTable(dbName, tblName).getParameters());
+ }
+ return this;
+ }
+
+ public WarehouseInstance verifyReplTargetProperty(String dbName) throws
Exception {
+ return verifyReplTargetProperty(dbName, getAllTables(dbName));
+ }
+
public Database getDatabase(String dbName) throws Exception {
try {
return client.getDatabase(dbName);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/creation/CreateTableOperation.java
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/creation/CreateTableOperation.java
index bac0b4c..b6b7d1b 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/creation/CreateTableOperation.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/creation/CreateTableOperation.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.ddl.table.creation;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
@@ -85,6 +86,14 @@ public class CreateTableOperation extends
DDLOperation<CreateTableDesc> {
if (desc.getReplaceMode()) {
createTableReplaceMode(tbl, replDataLocationChanged);
} else {
+ // Some HMS background tasks skip processing tables being replicated
into. Set the
+ // replication property while creating the table so that they can
identify such tables right
+ // from the beginning. Set it to 0, which is lesser than any eventId
ever created. This will
+ // soon be overwritten by an actual value.
+ if (desc.getReplicationSpec().isInReplicationScope() &&
+
!tbl.getParameters().containsKey(ReplConst.REPL_TARGET_TABLE_PROPERTY)) {
+ tbl.getParameters().put(ReplConst.REPL_TARGET_TABLE_PROPERTY, "0");
+ }
createTableNonReplaceMode(tbl);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 48213d1..ad3e55a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
+import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -51,11 +52,11 @@ public class ReplicationSpec {
private boolean isMigratingToExternalTable = false;
private boolean needDupCopyCheck = false;
- // Key definitions related to replication
+ // Key definitions related to replication.
public enum KEY {
REPL_SCOPE("repl.scope"),
EVENT_ID("repl.event.id"),
- CURR_STATE_ID("repl.last.id"),
+ CURR_STATE_ID(ReplConst.REPL_TARGET_TABLE_PROPERTY),
NOOP("repl.noop"),
LAZY("repl.lazy"),
IS_REPLACE("repl.is.replace"),
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
index 8acb1c5..444c7ad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUpdaterThread.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreThread;
import org.apache.hadoop.hive.metastore.ObjectStore;
@@ -175,7 +176,7 @@ public class StatsUpdaterThread extends Thread implements
MetaStoreThread {
}
@VisibleForTesting
- boolean runOneIteration() {
+ public boolean runOneIteration() {
List<TableName> fullTableNames;
try {
fullTableNames = getTablesToCheck();
@@ -220,6 +221,17 @@ public class StatsUpdaterThread extends Thread implements
MetaStoreThread {
String skipParam =
table.getParameters().get(SKIP_STATS_AUTOUPDATE_PROPERTY);
if ("true".equalsIgnoreCase(skipParam)) return null;
+ // If the table is being replicated into,
+ // 1. the stats are also replicated from the source, so we don't need
those to be calculated
+ // on the target again
+ // 2. updating stats requires a writeId to be created. Hence writeIds on
source and target
+ // can get out of sync when stats are updated. That can cause
consistency issues.
+ String replTrgtParam =
table.getParameters().get(ReplConst.REPL_TARGET_TABLE_PROPERTY);
+ if (replTrgtParam != null && !replTrgtParam.isEmpty()) {
+ LOG.debug("Skipping table {} since it is being replicated into", table);
+ return null;
+ }
+
// Note: ideally we should take a lock here to pretend to be a real reader.
// For now, this check is going to have race potential; it may run a
spurious analyze.
String writeIdString = null;
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
index a2f8bab..80251df 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/stats/TestStatsUpdaterThread.java
@@ -30,6 +30,7 @@ import
org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -39,13 +40,13 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.ql.DriverUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -540,6 +541,73 @@ public class TestStatsUpdaterThread {
msClient.close();
}
+ // A table which is target of replication should not be queued for stats
update, and hence its
+ // stats state should not change.
+ @Test(timeout=40000)
+ public void testNoStatsUpdateForSimpleReplTable() throws Exception {
+ testNoStatsUpdateForReplTable("simple", "");
+ }
+
+ // A table which is target of replication should not be queued for stats
update, and hence its
+ // stats state should not change.
+ @Test(timeout=40000)
+ public void testNoStatsUpdateForTxnReplTable() throws Exception {
+ testNoStatsUpdateForReplTable("txn",
+ "TBLPROPERTIES
(\"transactional\"=\"true\",\"transactional_properties\"=\"insert_only\")");
+ }
+
+ private void testNoStatsUpdateForReplTable(String tblNamePrefix, String
txnProperty) throws Exception {
+ String tblWOStats = tblNamePrefix + "_repl_trgt_nostats";
+ String tblWithStats = tblNamePrefix + "_repl_trgt_stats";
+ String ptnTblWOStats = tblNamePrefix + "_ptn_repl_trgt_nostats";
+ String ptnTblWithStats = tblNamePrefix + "_ptn_repl_trgt_stats";
+
+ StatsUpdaterThread su = createUpdater();
+ su.startWorkers();
+ IMetaStoreClient msClient = new HiveMetaStoreClient(hiveConf);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
+
+ executeQuery("create table " + tblWOStats + "(i int, s string) " +
txnProperty);
+ // Mark this table as being replicated into
+ setTableReplTargetProperty(tblWOStats);
+ executeQuery("insert into " + tblWOStats + "(i, s) values (1, 'test')");
+ verifyStatsUpToDate(tblWOStats, Lists.newArrayList("i"), msClient, false);
+
+ executeQuery("create table " + ptnTblWOStats + "(s string) partitioned by
(i int) " + txnProperty);
+ // Mark this table as being replicated into
+ setTableReplTargetProperty(ptnTblWOStats);
+ executeQuery("insert into " + ptnTblWOStats + "(i, s) values (1, 'test')");
+ executeQuery("insert into " + ptnTblWOStats + "(i, s) values (2,
'test2')");
+ executeQuery("insert into " + ptnTblWOStats + "(i, s) values (3,
'test3')");
+ verifyPartStatsUpToDate(3, 1, msClient, ptnTblWOStats, false);
+
+ executeQuery("create table " + tblWithStats + "(i int, s string)" +
txnProperty);
+ // Mark this table as being replicated into
+ setTableReplTargetProperty(tblWithStats);
+ executeQuery("insert into " + tblWithStats + "(i, s) values (1, 'test')");
+ executeQuery("analyze table " + tblWithStats + " compute statistics for
columns");
+ verifyStatsUpToDate(tblWithStats, Lists.newArrayList("i"), msClient, true);
+
+ executeQuery("create table " + ptnTblWithStats + "(s string) partitioned
by (i int) " + txnProperty);
+ // Mark this table as being replicated into
+ setTableReplTargetProperty(ptnTblWithStats);
+ executeQuery("insert into " + ptnTblWithStats + "(i, s) values (1,
'test')");
+ executeQuery("insert into " + ptnTblWithStats + "(i, s) values (2,
'test2')");
+ executeQuery("insert into " + ptnTblWithStats + "(i, s) values (3,
'test3')");
+ executeQuery("analyze table " + ptnTblWithStats + " compute statistics for
columns");
+ verifyPartStatsUpToDate(3, 1, msClient, ptnTblWithStats, true);
+
+ assertFalse(su.runOneIteration());
+ Assert.assertEquals(0, su.getQueueLength());
+ verifyStatsUpToDate(tblWOStats, Lists.newArrayList("i"), msClient, false);
+ verifyStatsUpToDate(tblWithStats, Lists.newArrayList("i"), msClient, true);
+ verifyPartStatsUpToDate(3, 1, msClient, ptnTblWOStats, false);
+ verifyPartStatsUpToDate(3, 1, msClient, ptnTblWithStats, true);
+
+ msClient.close();
+ }
+
private void verifyPartStatsUpToDate(int partCount, int skip,
IMetaStoreClient msClient, String tbl, boolean isUpToDate) throws
Exception {
for (int i = skip; i < partCount; ++i) {
@@ -566,6 +634,11 @@ public class TestStatsUpdaterThread {
msClient.alter_table(table.getDbName(), table.getTableName(), table);
}
+ private void setTableReplTargetProperty(String tblName) throws Exception {
+ executeQuery("alter table " + tblName +
+ " set tblproperties ('" + ReplConst.REPL_TARGET_TABLE_PROPERTY +
"' = '1')");
+ }
+
private void setPartitionSkipProperty(
IMetaStoreClient msClient, String tblName, String partName, String val)
throws Exception {
Partition part = msClient.getPartition(ss.getCurrentDatabase(), tblName,
partName);
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
index 7c29969..f075e2a 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/common/repl/ReplConst.java
@@ -37,4 +37,11 @@ public class ReplConst {
* database is created as part of repl load and survives the incremental
cycles.
*/
public static final String REPL_TARGET_DB_PROPERTY = "hive.repl.ckpt.key";
+
+ /**
+ * A table which is target of replication will have this property set. The
property serves two
+ * purposes, 1. identifies the tables being replicated into and 2. records
the event id of the
+ * last event affecting this table.
+ */
+ public static final String REPL_TARGET_TABLE_PROPERTY = "repl.last.id";
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
index da0259c..e4488f4 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -30,6 +31,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableMeta;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -80,6 +82,16 @@ public class PartitionManagementTask implements
MetastoreTaskThread {
return conf;
}
+ private static boolean partitionDiscoveryEnabled(Map<String, String> params)
{
+ return params != null &&
params.containsKey(DISCOVER_PARTITIONS_TBLPROPERTY) &&
+
params.get(DISCOVER_PARTITIONS_TBLPROPERTY).equalsIgnoreCase("true");
+ }
+
+ private static boolean tblBeingReplicatedInto(Map<String, String> params) {
+ return params != null &&
params.containsKey(ReplConst.REPL_TARGET_TABLE_PROPERTY) &&
+ !params.get(ReplConst.REPL_TARGET_TABLE_PROPERTY).trim().isEmpty();
+ }
+
@Override
public void run() {
if (lock.tryLock()) {
@@ -116,8 +128,7 @@ public class PartitionManagementTask implements
MetastoreTaskThread {
for (TableMeta tableMeta : foundTableMetas) {
Table table = msc.getTable(tableMeta.getCatName(),
tableMeta.getDbName(), tableMeta.getTableName());
- if (table.getParameters() != null &&
table.getParameters().containsKey(DISCOVER_PARTITIONS_TBLPROPERTY) &&
-
table.getParameters().get(DISCOVER_PARTITIONS_TBLPROPERTY).equalsIgnoreCase("true"))
{
+ if (partitionDiscoveryEnabled(table.getParameters()) &&
!tblBeingReplicatedInto(table.getParameters())) {
candidateTables.add(table);
}
}
diff --git
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
index 9562b4f..1961a70 100644
---
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
+++
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
@@ -36,6 +36,7 @@ import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.repl.ReplConst;
import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
import org.apache.hadoop.hive.metastore.api.Catalog;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -565,6 +566,94 @@ public class TestPartitionManagement {
assertEquals(4, partitions.size());
}
+ @Test
+ public void testNoPartitionDiscoveryForReplTable() throws Exception {
+ String dbName = "db_repl1";
+ String tableName = "tbl_repl1";
+ Map<String, Column> colMap = buildAllColumns();
+ List<String> partKeys = Lists.newArrayList("state", "dt");
+ List<String> partKeyTypes = Lists.newArrayList("string", "date");
+ List<List<String>> partVals = Lists.newArrayList(
+ Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+ Lists.newArrayList("CA", "1986-04-28"),
+ Lists.newArrayList("MN", "2018-11-31"));
+ createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys,
partKeyTypes, partVals, colMap, false);
+ Table table = client.getTable(dbName, tableName);
+ List<Partition> partitions = client.listPartitions(dbName, tableName,
(short) -1);
+ assertEquals(3, partitions.size());
+ String tableLocation = table.getSd().getLocation();
+ URI location = URI.create(tableLocation);
+ Path tablePath = new Path(location);
+ FileSystem fs = FileSystem.get(location, conf);
+ Path newPart1 = new Path(tablePath, "state=WA/dt=2018-12-01");
+ Path newPart2 = new Path(tablePath, "state=UT/dt=2018-12-02");
+ fs.mkdirs(newPart1);
+ fs.mkdirs(newPart2);
+ assertEquals(5, fs.listStatus(tablePath).length);
+ partitions = client.listPartitions(dbName, tableName, (short) -1);
+ assertEquals(3, partitions.size());
+
+ // table property is set to true, but the table is marked as replication
target. The new
+ // partitions should not be created
+
table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY,
"true");
+ table.getParameters().put(ReplConst.REPL_TARGET_TABLE_PROPERTY, "1");
+ client.alter_table(dbName, tableName, table);
+ runPartitionManagementTask(conf);
+ partitions = client.listPartitions(dbName, tableName, (short) -1);
+ assertEquals(3, partitions.size());
+
+ // change table type to external, delete a partition directory and make
sure partition discovery works
+ table.getParameters().put("EXTERNAL", "true");
+ table.setTableType(TableType.EXTERNAL_TABLE.name());
+ client.alter_table(dbName, tableName, table);
+ // Delete location of one of the partitions. The partition discovery task
should not drop
+ // that partition.
+ boolean deleted = fs.delete((new
Path(URI.create(partitions.get(0).getSd().getLocation()))).getParent(),
+ true);
+ assertTrue(deleted);
+ assertEquals(4, fs.listStatus(tablePath).length);
+ runPartitionManagementTask(conf);
+ partitions = client.listPartitions(dbName, tableName, (short) -1);
+ assertEquals(3, partitions.size());
+ }
+
+ @Test
+ public void testNoPartitionRetentionForReplTarget() throws TException,
InterruptedException {
+ String dbName = "db_repl2";
+ String tableName = "tbl_repl2";
+ Map<String, Column> colMap = buildAllColumns();
+ List<String> partKeys = Lists.newArrayList("state", "dt");
+ List<String> partKeyTypes = Lists.newArrayList("string", "date");
+ List<List<String>> partVals = Lists.newArrayList(
+ Lists.newArrayList("__HIVE_DEFAULT_PARTITION__", "1990-01-01"),
+ Lists.newArrayList("CA", "1986-04-28"),
+ Lists.newArrayList("MN", "2018-11-31"));
+ // Check for the existence of partitions 10 seconds after the partition
retention period has
+ // elapsed. Gives enough time for the partition retention task to work.
+ long partitionRetentionPeriodMs = 20000;
+ long waitingPeriodForTest = partitionRetentionPeriodMs + 10 * 1000;
+ createMetadata(DEFAULT_CATALOG_NAME, dbName, tableName, partKeys,
partKeyTypes, partVals, colMap, false);
+ Table table = client.getTable(dbName, tableName);
+ List<Partition> partitions = client.listPartitions(dbName, tableName,
(short) -1);
+ assertEquals(3, partitions.size());
+
+
table.getParameters().put(PartitionManagementTask.DISCOVER_PARTITIONS_TBLPROPERTY,
"true");
+
table.getParameters().put(PartitionManagementTask.PARTITION_RETENTION_PERIOD_TBLPROPERTY,
+ partitionRetentionPeriodMs + "ms");
+ table.getParameters().put(ReplConst.REPL_TARGET_TABLE_PROPERTY, "1");
+ client.alter_table(dbName, tableName, table);
+
+ runPartitionManagementTask(conf);
+ partitions = client.listPartitions(dbName, tableName, (short) -1);
+ assertEquals(3, partitions.size());
+
+ // after 30s all partitions should remain in-tact for a table which is
target of replication.
+ Thread.sleep(waitingPeriodForTest);
+ runPartitionManagementTask(conf);
+ partitions = client.listPartitions(dbName, tableName, (short) -1);
+ assertEquals(3, partitions.size());
+ }
+
private void runPartitionManagementTask(Configuration conf) {
PartitionManagementTask task = new PartitionManagementTask();
task.setConf(conf);