Repository: hive Updated Branches: refs/heads/master 0c8edf053 -> 2f5889c9b
HIVE-15332: REPL LOAD & DUMP support for incremental CREATE_TABLE/ADD_PTN (Sushanth Sowmyan reviewed by Vaibhav Gumashta) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2f5889c9 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2f5889c9 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2f5889c9 Branch: refs/heads/master Commit: 2f5889c9b94977b064ba614c89684404cbb9ca63 Parents: 0c8edf0 Author: Vaibhav Gumashta <vgumas...@hortonworks.com> Authored: Tue Dec 6 11:16:12 2016 -0800 Committer: Vaibhav Gumashta <vgumas...@hortonworks.com> Committed: Tue Dec 6 11:16:12 2016 -0800 ---------------------------------------------------------------------- .../listener/DbNotificationListener.java | 1 + itests/hive-unit/pom.xml | 5 + .../hive/ql/TestReplicationScenarios.java | 129 +++++++- .../hive/metastore/messaging/EventUtils.java | 22 ++ .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 16 + .../apache/hadoop/hive/ql/metadata/Hive.java | 28 +- .../hive/ql/parse/ImportSemanticAnalyzer.java | 19 +- .../ql/parse/ReplicationSemanticAnalyzer.java | 320 +++++++++++++++---- .../hadoop/hive/ql/parse/ReplicationSpec.java | 8 + .../hadoop/hive/ql/plan/AddPartitionDesc.java | 22 ++ .../hadoop/hive/ql/plan/CreateTableDesc.java | 21 ++ 11 files changed, 516 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 494d01f..119801f 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -332,6 +332,7 @@ public class DbNotificationListener extends MetaStoreEventListener { private void enqueue(NotificationEvent event) { if (rs != null) { synchronized(NOTIFICATION_TBL_LOCK) { + LOG.debug("DbNotif:Enqueueing : {}:{}",event.getEventId(),event.getMessage()); rs.addNotificationEvent(event); } } else { http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/itests/hive-unit/pom.xml ---------------------------------------------------------------------- diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml index cd209b4..6a190d1 100644 --- a/itests/hive-unit/pom.xml +++ b/itests/hive-unit/pom.xml @@ -81,6 +81,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hive-hcatalog-server-extensions</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-it-util</artifactId> <version>${project.version}</version> http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 01abe9b..95db9e8 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.Shell; @@ -55,6 +56,7 @@ public class TestReplicationScenarios { static Driver driver; protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); + private ArrayList<String> lastResults; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -69,8 +71,8 @@ public class TestReplicationScenarios { WindowsPathUtil.convertPathsFromWindowsToHdfs(hconf); } -// System.setProperty(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname, -// DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore + System.setProperty(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname, + DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore msPort = MetaStoreUtils.startMetaStore(); hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/"); hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" @@ -106,6 +108,12 @@ public class TestReplicationScenarios { // after each test } + private static int next = 0; + private synchronized void advanceDumpDir() { + next++; + ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next)); + } + /** * Tests basic operation - creates a db, with 4 tables, 2 ptned and 2 unptned. * Inserts data into one of the ptned tables, and one of the unptned tables, @@ -152,7 +160,7 @@ public class TestReplicationScenarios { run("SELECT * from " + dbName + ".unptned_empty"); verifyResults(empty); - + advanceDumpDir(); run("REPL DUMP " + dbName); String replDumpLocn = getResult(0,0); run("REPL LOAD " + dbName + "_dupe FROM '"+replDumpLocn+"'"); @@ -169,15 +177,116 @@ public class TestReplicationScenarios { verifyResults(empty); } + @Test + public void testIncrementalAdds() throws IOException { + String testName = "incrementalAdds"; + LOG.info("Testing "+testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE"); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0,0); + String replDumpId = getResult(0,1,true); + LOG.info("Dumped to {} with id {}",replDumpLocn,replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + String[] unptn_data = new String[]{ "eleven" , "twelve" }; + String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"}; + String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"}; + String[] empty = new String[]{}; + + String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath(); + + createTestDataFile(unptn_locn, unptn_data); + createTestDataFile(ptn_locn_1, ptn_data_1); + createTestDataFile(ptn_locn_2, ptn_data_2); + + run("SELECT a from " + dbName + ".ptned_empty"); + verifyResults(empty); + run("SELECT * from " + dbName + ".unptned_empty"); + verifyResults(empty); + + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); + run("SELECT * from " + dbName + ".unptned"); + verifyResults(unptn_data); + run("CREATE TABLE " + dbName + ".unptned_late AS SELECT * from " + dbName + ".unptned"); + run("SELECT * from " + dbName + ".unptned_late"); + verifyResults(unptn_data); + + + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)"); + run("SELECT a from " + dbName + ".ptned WHERE b=1"); + verifyResults(ptn_data_1); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)"); + run("SELECT a from " + dbName + ".ptned WHERE b=2"); + verifyResults(ptn_data_2); + + // verified up to here. + run("CREATE TABLE " + dbName + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE"); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=1)"); + run("SELECT a from " + dbName + ".ptned_late WHERE b=1"); + verifyResults(ptn_data_1); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=2)"); + run("SELECT a from " + dbName + ".ptned_late WHERE b=2"); + verifyResults(ptn_data_2); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId ); + String incrementalDumpLocn = getResult(0,0); + String incrementalDumpId = getResult(0,1,true); + LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'"); + + run("SELECT * from " + dbName + "_dupe.unptned_empty"); + verifyResults(empty); + run("SELECT a from " + dbName + ".ptned_empty"); + verifyResults(empty); + + +// this does not work because LOAD DATA LOCAL INPATH into an unptned table seems +// to use ALTER_TABLE only - it does not emit an INSERT or CREATE - re-enable after +// fixing that. +// run("SELECT * from " + dbName + "_dupe.unptned"); +// verifyResults(unptn_data); + run("SELECT * from " + dbName + "_dupe.unptned_late"); + verifyResults(unptn_data); + + + run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1"); + verifyResults(ptn_data_1); + run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2"); + verifyResults(ptn_data_2); + + // verified up to here. + run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1"); + verifyResults(ptn_data_1); + run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2"); + verifyResults(ptn_data_2); + } + private String getResult(int rowNum, int colNum) throws IOException { - List<String> results = new ArrayList<String>(); - try { - driver.getResults(results); - } catch (CommandNeedRetryException e) { - e.printStackTrace(); - throw new RuntimeException(e); + return getResult(rowNum,colNum,false); + } + private String getResult(int rowNum, int colNum, boolean reuse) throws IOException { + if (!reuse) { + lastResults = new ArrayList<String>(); + try { + driver.getResults(lastResults); + } catch (CommandNeedRetryException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } } - return (results.get(rowNum).split("\\001"))[colNum]; + return (lastResults.get(rowNum).split("\\001"))[colNum]; } private void verifyResults(String[] data) throws IOException { http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java index 932af7e..927bf15 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java @@ -63,6 +63,28 @@ public class EventUtils { }; } + public static IMetaStoreClient.NotificationFilter getEventBoundaryFilter(final Long eventFrom, final Long eventTo){ + return new IMetaStoreClient.NotificationFilter() { + @Override + public boolean accept(NotificationEvent event) { + if ( (event == null) || (event.getEventId() < eventFrom) || (event.getEventId() > eventTo)) { + return false; + } + return true; + } + }; + } + + public static IMetaStoreClient.NotificationFilter andFilter( + final IMetaStoreClient.NotificationFilter filter1, + final IMetaStoreClient.NotificationFilter filter2) { + return new IMetaStoreClient.NotificationFilter() { + @Override + public boolean accept(NotificationEvent event) { + return filter1.accept(event) && filter2.accept(event); + } + }; + } public interface NotificationFetcher { public int getBatchSize() throws IOException; http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 0ac9053..4b39eb9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -4084,6 +4084,22 @@ public class DDLTask extends Task<DDLWork> implements Serializable { LOG.info("creating table " + tbl.getDbName() + "." + tbl.getTableName() + " on " + tbl.getDataLocation()); + if (crtTbl.getReplicationSpec().isInReplicationScope() && (!crtTbl.getReplaceMode())){ + // if this is a replication spec, then replace-mode semantics might apply. + // if we're already asking for a table replacement, then we can skip this check. + // however, otherwise, if in replication scope, and we've not been explicitly asked + // to replace, we should check if the object we're looking at exists, and if so, + // trigger replace-mode semantics. + Table existingTable = db.getTable(tbl.getDbName(), tbl.getTableName(), false); + if (existingTable != null){ + if (!crtTbl.getReplicationSpec().allowEventReplacementInto(existingTable)){ + return 0; // no replacement, the existing table state is newer than our update. + } else { + crtTbl.setReplaceMode(true); // we replace existing table. + } + } + } + // create the table if (crtTbl.getReplaceMode()){ // replace-mode creates are really alters using CreateTableDesc. http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 8f230fc..e477f24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2016,18 +2016,40 @@ private void constructOneLBLocationMap(FileStatus fSta, } List<Partition> out = new ArrayList<Partition>(); try { - if (!addPartitionDesc.getReplaceMode()){ + if (!addPartitionDesc.getReplicationSpec().isInReplicationScope()){ // TODO: normally, the result is not necessary; might make sense to pass false for (org.apache.hadoop.hive.metastore.api.Partition outPart : getMSC().add_partitions(in, addPartitionDesc.isIfNotExists(), true)) { out.add(new Partition(tbl, outPart)); } } else { - getMSC().alter_partitions(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), in, null); - List<String> part_names = new ArrayList<String>(); + + // For replication add-ptns, we need to follow a insert-if-not-exist, alter-if-exists scenario. + // TODO : ideally, we should push this mechanism to the metastore, because, otherwise, we have + // no choice but to iterate over the partitions here. + + List<org.apache.hadoop.hive.metastore.api.Partition> partsToAdd = new ArrayList<>(); + List<org.apache.hadoop.hive.metastore.api.Partition> partsToAlter = new ArrayList<>(); + List<String> part_names = new ArrayList<>(); for (org.apache.hadoop.hive.metastore.api.Partition p: in){ part_names.add(Warehouse.makePartName(tbl.getPartitionKeys(), p.getValues())); + try { + org.apache.hadoop.hive.metastore.api.Partition ptn = + getMSC().getPartition(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), p.getValues()); + if (addPartitionDesc.getReplicationSpec().allowReplacementInto(ptn)){ + partsToAlter.add(p); + } // else ptn already exists, but we do nothing with it. + } catch (NoSuchObjectException nsoe){ + // if the object does not exist, we want to add it. + partsToAdd.add(p); + } } + for (org.apache.hadoop.hive.metastore.api.Partition outPart + : getMSC().add_partitions(partsToAdd, addPartitionDesc.isIfNotExists(), true)) { + out.add(new Partition(tbl, outPart)); + } + getMSC().alter_partitions(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), partsToAlter, null); + for ( org.apache.hadoop.hive.metastore.api.Partition outPart : getMSC().getPartitionsByNames(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(),part_names)){ out.add(new Partition(tbl,outPart)); http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 3420efd..ce952c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -204,6 +204,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { // Executed if relevant, and used to contain all the other details about the table if not. CreateTableDesc tblDesc = getBaseCreateTableDescFromTable(dbname,rv.getTable()); + if ((replicationSpec!= null) && replicationSpec.isInReplicationScope()){ + tblDesc.setReplicationSpec(replicationSpec); + } + if (isExternalSet){ tblDesc.setExternal(isExternalSet); // This condition-check could have been avoided, but to honour the old @@ -368,8 +372,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } private static Task<? extends Serializable> alterTableTask(CreateTableDesc tableDesc, - EximUtil.SemanticAnalyzerWrapperContext x) { + EximUtil.SemanticAnalyzerWrapperContext x, ReplicationSpec replicationSpec) { tableDesc.setReplaceMode(true); + if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())){ + tableDesc.setReplicationSpec(replicationSpec); + } return TaskFactory.get(new DDLWork( x.getInputs(), x.getOutputs(), @@ -383,6 +390,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn, EximUtil.SemanticAnalyzerWrapperContext x) { addPartitionDesc.setReplaceMode(true); + if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())){ + addPartitionDesc.setReplicationSpec(replicationSpec); + } addPartitionDesc.getPartition(0).setLocation(ptn.getLocation()); // use existing location return TaskFactory.get(new DDLWork( x.getInputs(), @@ -860,6 +870,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { if (!replicationSpec.isMetadataOnly()) { if (isPartitioned(tblDesc)) { for (AddPartitionDesc addPartitionDesc : partitionDescs) { + addPartitionDesc.setReplicationSpec(replicationSpec); t.addDependentTask( addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x)); } @@ -881,7 +892,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { if (table.isPartitioned()) { x.getLOG().debug("table partitioned"); for (AddPartitionDesc addPartitionDesc : partitionDescs) { - + addPartitionDesc.setReplicationSpec(replicationSpec); Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; @@ -912,7 +923,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } if (replicationSpec.isMetadataOnly() && partitionDescs.isEmpty()){ // MD-ONLY table alter - x.getTasks().add(alterTableTask(tblDesc, x)); + x.getTasks().add(alterTableTask(tblDesc, x,replicationSpec)); if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ lockType = WriteEntity.WriteType.DDL_SHARED; } @@ -925,7 +936,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { if (!replicationSpec.isMetadataOnly()) { loadTable(fromURI, table, true, new Path(fromURI), replicationSpec, x); // repl-imports are replace-into } else { - x.getTasks().add(alterTableTask(tblDesc, x)); + x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec)); } if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ lockType = WriteEntity.WriteType.DDL_SHARED; http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 938355e..8007c4e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hive.ql.parse; +import com.google.common.base.Function; +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; import org.antlr.runtime.tree.Tree; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -25,25 +28,26 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; -import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; import org.apache.hadoop.hive.metastore.messaging.EventUtils; -import org.apache.hadoop.hive.metastore.messaging.InsertMessage; import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryState; +import org.apache.hadoop.hive.ql.exec.ReplCopyTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.io.IOUtils; +import javax.annotation.Nullable; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -63,12 +67,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { private String dbNameOrPattern; // Table name or pattern private String tblNameOrPattern; - private Integer eventFrom; - private Integer eventTo; + private Long eventFrom; + private Long eventTo; private Integer batchSize; // Base path for REPL LOAD private String path; + private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour + public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException { super(queryState); } @@ -114,13 +120,13 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } else { // TOK_FROM subtree Tree fromNode = ast.getChild(currNode); - eventFrom = Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(0).getText())); + eventFrom = Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(0).getText())); // skip the first, which is always required int numChild = 1; while (numChild < fromNode.getChildCount()) { if (fromNode.getChild(numChild).getType() == TOK_TO) { eventTo = - Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText())); + Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText())); // skip the next child, since we already took care of it numChild++; } else if (fromNode.getChild(numChild).getType() == TOK_BATCH) { @@ -142,38 +148,212 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { // REPL DUMP private void analyzeReplDump(ASTNode ast) throws SemanticException { - // FIXME: support non-bootstrap: use eventFrom/eventTo/batchSize LOG.debug("ReplicationSemanticAnalyzer.analyzeReplDump: " + String.valueOf(dbNameOrPattern) + "." + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(eventFrom) + " to " + String.valueOf(eventTo) + " batchsize " + String.valueOf(batchSize)); String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR); Path dumpRoot = new Path(replRoot, getNextDumpDir()); + Path dumpMetadata = new Path(dumpRoot,"_dumpmetadata"); try { - for (String dbName : matchesDb(dbNameOrPattern)) { - LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); - Path dbRoot = dumpDbMetadata(dbName, dumpRoot); - for (String tblName : matchesTbl(dbName, tblNameOrPattern)) { - LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping table: " + tblName - + " to db root " + dbRoot.toUri()); - dumpTbl(ast, dbName, tblName, dbRoot); + if (eventFrom == null){ + // bootstrap case + String bootDumpBeginReplId = + String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId()); + for (String dbName : matchesDb(dbNameOrPattern)) { + LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName); + Path dbRoot = dumpDbMetadata(dbName, dumpRoot); + for (String tblName : matchesTbl(dbName, tblNameOrPattern)) { + LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping table: " + tblName + + " to db root " + dbRoot.toUri()); + dumpTbl(ast, dbName, tblName, dbRoot); + } + } + String bootDumpEndReplId = + String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId()); + LOG.info("Bootstrap object dump phase took from {} to {}",bootDumpBeginReplId, bootDumpEndReplId); + + // Now that bootstrap has dumped all objects related, we have to account for the changes + // that occurred while bootstrap was happening - i.e. we have to look through all events + // during the bootstrap period and consolidate them with our dump. + + IMetaStoreClient.NotificationFilter evFilter = + EventUtils.getDbTblNotificationFilter(dbNameOrPattern, tblNameOrPattern); + EventUtils.MSClientNotificationFetcher evFetcher = + new EventUtils.MSClientNotificationFetcher(db.getMSC()); + EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( + evFetcher, Long.valueOf(bootDumpBeginReplId), + Ints.checkedCast(Long.valueOf(bootDumpEndReplId) - Long.valueOf(bootDumpBeginReplId) + 1), + evFilter ); + + // Now we consolidate all the events that happenned during the objdump into the objdump + while (evIter.hasNext()){ + NotificationEvent ev = evIter.next(); + Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId())); + // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot) } + LOG.info("Consolidation done, preparing to return {},{}",dumpRoot.toUri(),bootDumpEndReplId); + prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), bootDumpEndReplId), + "dump_dir,last_repl_id#string,string"); + } else { + // get list of events matching dbPattern & tblPattern + // go through each event, and dump out each event to a event-level dump dir inside dumproot + if (eventTo == null){ + eventTo = db.getMSC().getCurrentNotificationEventId().getEventId(); + LOG.debug("eventTo not specified, using current event id : {}", eventTo); + } + + Integer maxRange = Ints.checkedCast(eventTo - eventFrom + 1); + batchSize = 15; + if (batchSize == null){ + batchSize = maxRange; + } else { + if (batchSize > maxRange){ + batchSize = maxRange; + } + } + + IMetaStoreClient.NotificationFilter evFilter = EventUtils.andFilter( + EventUtils.getDbTblNotificationFilter(dbNameOrPattern,tblNameOrPattern), + EventUtils.getEventBoundaryFilter(eventFrom, eventTo)); + + EventUtils.MSClientNotificationFetcher evFetcher + = new EventUtils.MSClientNotificationFetcher(db.getMSC()); + + EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator( + evFetcher, eventFrom, batchSize, evFilter); + + while (evIter.hasNext()){ + NotificationEvent ev = evIter.next(); + Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId())); + dumpEvent(ev,evRoot); + } + + LOG.info("Done dumping events, preparing to return {},{}",dumpRoot.toUri(),eventTo); + List<String> vals; + writeOutput(Arrays.asList("event", String.valueOf(eventFrom), String.valueOf(eventTo)), dumpMetadata); + prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(eventTo)), + "dump_dir,last_repl_id#string,string"); } - String currentReplId = - String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId()); - prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), currentReplId), - "dump_dir,last_repl_id#string,string"); } catch (Exception e) { // TODO : simple wrap & rethrow for now, clean up with error codes + LOG.warn("Error during analyzeReplDump",e); throw new SemanticException(e); } } + private void dumpEvent(NotificationEvent ev, Path evRoot) throws Exception { + long evid = ev.getEventId(); + String evidStr = String.valueOf(evid); + ReplicationSpec replicationSpec = getNewReplicationSpec(evidStr, evidStr); + MessageDeserializer md = MessageFactory.getInstance().getDeserializer(); + switch (ev.getEventType()){ + case MessageFactory.CREATE_TABLE_EVENT : { + LOG.info("Processing#{} CREATE_TABLE message : {}",ev.getEventId(),ev.getMessage()); + + // FIXME : Current MessageFactory api is lacking, + // and impl is in JSONMessageFactory instead. This needs to be + // refactored correctly so we don't depend on a specific impl. + org.apache.hadoop.hive.metastore.api.Table tobj = + JSONMessageFactory.getTableObj(JSONMessageFactory.getJsonTree(ev)); + if (tobj == null){ + LOG.debug("Event#{} was a CREATE_TABLE_EVENT with no table listed"); + break; + } + + Table qlMdTable = new Table(tobj); + + Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME); + EximUtil.createExportDump( + metaDataPath.getFileSystem(conf), + metaDataPath, + qlMdTable, + null, + replicationSpec); + + // FIXME : dump _files should happen at dbnotif time, doing it here is incorrect + // we will, however, do so here, now, for dev/debug's sake. + Path dataPath = new Path(evRoot,"data"); + rootTasks.add(ReplCopyTask.getDumpCopyTask(replicationSpec, qlMdTable.getPath(), dataPath , conf)); + + break; + } + case MessageFactory.ADD_PARTITION_EVENT : { + LOG.info("Processing#{} ADD_PARTITION message : {}",ev.getEventId(),ev.getMessage()); + // FIXME : Current MessageFactory api is lacking, + // and impl is in JSONMessageFactory instead. This needs to be + // refactored correctly so we don't depend on a specific impl. + List<org.apache.hadoop.hive.metastore.api.Partition> ptnObjs = + JSONMessageFactory.getPartitionObjList(JSONMessageFactory.getJsonTree(ev)); + if ((ptnObjs == null) || (ptnObjs.size() == 0)) { + LOG.debug("Event#{} was an ADD_PTN_EVENT with no partitions"); + break; + } + org.apache.hadoop.hive.metastore.api.Table tobj = + JSONMessageFactory.getTableObj(JSONMessageFactory.getJsonTree(ev)); + if (tobj == null){ + LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed"); + break; + } + + final Table qlMdTable = new Table(tobj); + List<Partition> qlPtns = Lists.transform( + ptnObjs, + new Function<org.apache.hadoop.hive.metastore.api.Partition, Partition>() { + @Nullable + @Override + public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition input) { + if (input == null){ + return null; + } + try { + return new Partition(qlMdTable,input); + } catch (HiveException e) { + throw new IllegalArgumentException(e); + } + } + } + ); + + Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME); + EximUtil.createExportDump( + metaDataPath.getFileSystem(conf), + metaDataPath, + qlMdTable, + qlPtns, + replicationSpec); + + // FIXME : dump _files should ideally happen at dbnotif time, doing it here introduces + // rubberbanding. But, till we have support for that, this is our closest equivalent + for (Partition qlPtn : qlPtns){ + Path ptnDataPath = new Path(evRoot,qlPtn.getName()); + rootTasks.add(ReplCopyTask.getDumpCopyTask( + replicationSpec, qlPtn.getPartitionPath(), ptnDataPath, conf)); + } + + break; + } + default: + LOG.info("Skipping processing#{} message : {}",ev.getEventId(), ev.getMessage()); + // TODO : handle other event types + break; + } + + } + + public static void injectNextDumpDirForTest(String dumpdir){ + testInjectDumpDir = dumpdir; + } + String getNextDumpDir() { if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) { - return "next"; - // make it easy to write unit tests, instead of unique id generation. + // make it easy to write .q unit tests, instead of unique id generation. // however, this does mean that in writing tests, we have to be aware that // repl dump will clash with prior dumps, and thus have to clean up properly. + if (testInjectDumpDir == null){ + return "next"; + } else { + return testInjectDumpDir; + } } else { return String.valueOf(System.currentTimeMillis()); // TODO: time good enough for now - we'll likely improve this. @@ -259,14 +439,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { // looking at each db, and then each table, and then setting up the appropriate // import job in its place. - // FIXME : handle non-bootstrap cases. - - // We look at the path, and go through each subdir. - // Each subdir corresponds to a database. - // For each subdir, there is a _metadata file which allows us to re-impress the db object - // After each db object is loaded appropriately, iterate through the sub-table dirs, and pretend - // that we had an IMPORT on each of them, into this db. - try { Path loadPath = new Path(path); @@ -277,25 +449,39 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { throw new FileNotFoundException(loadPath.toUri().toString()); } - // Now, the dumped path can be one of two things: + // Now, the dumped path can be one of three things: // a) It can be a db dump, in which case we expect a set of dirs, each with a // db name, and with a _metadata file in each, and table dirs inside that. // b) It can be a table dump dir, in which case we expect a _metadata dump of // a table in question in the dir, and individual ptn dir hierarchy. - // Once we expand this into doing incremental repl, we can have individual events which can - // be other things like roles and fns as well. Also, if tblname is specified, we're guaranteed - // that this is a tbl-level dump, and it is an error condition if we find anything else. Also, - // if dbname is specified, we expect exactly one db dumped, and having more is an error - // condition. + // c) A dump can be an event-level dump, which means we have several subdirs + // each of which have the evid as the dir name, and each of which correspond + // to a event-level dump. Currently, only CREATE_TABLE and ADD_PARTITION are + // handled, so all of these dumps will be at a table/ptn level. + + // For incremental repl, eventually, we can have individual events which can + // be other things like roles and fns as well. + + boolean evDump = false; + Path dumpMetadata = new Path(loadPath, "_dumpmetadata"); + // TODO : only event dumps currently have _dumpmetadata - this might change. Generify. + if (fs.exists(dumpMetadata)){ + LOG.debug("{} exists, this is a event dump", dumpMetadata); + evDump = true; + } else { + LOG.debug("{} does not exist, this is an object dump", dumpMetadata); + } - if ((tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { + if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) { + // not an event dump, and table name pattern specified, this has to be a tbl-level dump analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null); return; } FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(fs, loadPath); if (srcs == null || (srcs.length == 0)) { - throw new FileNotFoundException(loadPath.toUri().toString()); + LOG.warn("Nothing to load at {}",loadPath.toUri().toString()); + return; } FileStatus[] dirsInLoadPath = fs.listStatus(loadPath, EximUtil.getDirectoryFilter(fs)); @@ -304,19 +490,31 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { throw new IllegalArgumentException("No data to load in path " + loadPath.toUri().toString()); } - if ((dbNameOrPattern != null) && (dirsInLoadPath.length > 1)) { - LOG.debug("Found multiple dirs when we expected 1:"); - for (FileStatus d : dirsInLoadPath) { - LOG.debug("> " + d.getPath().toUri().toString()); + if (!evDump){ + // not an event dump, not a table dump - thus, a db dump + if ((dbNameOrPattern != null) && (dirsInLoadPath.length > 1)) { + LOG.debug("Found multiple dirs when we expected 1:"); + for (FileStatus d : dirsInLoadPath) { + LOG.debug("> " + d.getPath().toUri().toString()); + } + throw new IllegalArgumentException( + "Multiple dirs in " + + loadPath.toUri().toString() + + " does not correspond to REPL LOAD expecting to load to a singular destination point."); } - throw new IllegalArgumentException( - "Multiple dirs in " - + loadPath.toUri().toString() - + " does not correspond to REPL LOAD expecting to load to a singular destination point."); - } - for (FileStatus dir : dirsInLoadPath) { - analyzeDatabaseLoad(dbNameOrPattern, fs, dir); + for (FileStatus dir : dirsInLoadPath) { + analyzeDatabaseLoad(dbNameOrPattern, fs, dir); + } + } else { + // event dump, each subdir is an individual event dump. + for (FileStatus dir : dirsInLoadPath){ + // event loads will behave similar to table loads, with one crucial difference + // precursor order is strict, and each event must be processed after the previous one. + LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern); + analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, dir.getPath().toUri().toString(), null); + // FIXME: we should have a strict order of execution so that each event's tasks occur linearly + } } } catch (Exception e) { @@ -326,6 +524,11 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } + private void analyzeEventLoad(String dbNameOrPattern, String tblNameOrPattern, + FileSystem fs, FileStatus dir) throws SemanticException { + + } + private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir) throws SemanticException { try { @@ -486,11 +689,10 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { // If we do call it, then FetchWork thinks that the "table" here winds up thinking that // this is a partitioned dir, which does not work. Thus, this does not work. - writeOutput(values); + writeOutput(values,ctx.getResFile()); } - private void writeOutput(List<String> values) throws SemanticException { - Path outputFile = ctx.getResFile(); + private void writeOutput(List<String> values, Path outputFile) throws SemanticException { FileSystem fs = null; DataOutputStream outStream = null; try { @@ -499,7 +701,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0))); for (int i = 1; i < values.size(); i++) { outStream.write(Utilities.ctrlaCode); - outStream.writeBytes((values.get(1) == null ? Utilities.nullStringOutput : values.get(1))); + outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i))); } outStream.write(Utilities.newLineCode); } catch (IOException e) { @@ -513,17 +715,19 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { private ReplicationSpec getNewReplicationSpec() throws SemanticException { try { - ReplicationSpec replicationSpec = - new ReplicationSpec(true, false, "replv2", "will-be-set", false, true); - replicationSpec.setCurrentReplicationState(String.valueOf(db.getMSC() + ReplicationSpec rspec = getNewReplicationSpec("replv2","will-be-set"); + rspec.setCurrentReplicationState(String.valueOf(db.getMSC() .getCurrentNotificationEventId().getEventId())); - return replicationSpec; + return rspec; } catch (Exception e) { - throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error - // codes + throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error codes } } + private ReplicationSpec getNewReplicationSpec(String evState, String objState) throws SemanticException { + return new ReplicationSpec(true, false, evState, objState, false, true); + } + private Iterable<? extends String> matchesTbl(String dbName, String tblPattern) throws HiveException { if (tblPattern == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index 824cf11..060f2a1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -179,6 +179,14 @@ public class ReplicationSpec { } /** + * Determines if a current replication object(current state of dump) is allowed to + * replicate-replace-into a given partition + */ + public boolean allowReplacementInto(org.apache.hadoop.hive.metastore.api.Partition ptn){ + return allowReplacement(getLastReplicatedStateFromParameters(ptn.getParameters()),this.getCurrentReplicationState()); + } + + /** * Determines if a current replication event specification is allowed to * replicate-replace-into a given partition */ http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java index 7a583c3..6ffd94a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java @@ -25,12 +25,14 @@ import java.util.Map; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; /** * Contains the information needed to add one or more partitions. */ public class AddPartitionDesc extends DDLDesc implements Serializable { + public static class OnePartitionDesc { public OnePartitionDesc() {} @@ -152,6 +154,7 @@ public class AddPartitionDesc extends DDLDesc implements Serializable { boolean ifNotExists; List<OnePartitionDesc> partitions = null; boolean replaceMode = false; + private ReplicationSpec replicationSpec = null; /** @@ -302,4 +305,23 @@ public class AddPartitionDesc extends DDLDesc implements Serializable { public boolean getReplaceMode() { return this.replaceMode; } + + /** + * @param replicationSpec Sets the replication spec governing this create. + * This parameter will have meaningful values only for creates happening as a result of a replication. + */ + public void setReplicationSpec(ReplicationSpec replicationSpec) { + this.replicationSpec = replicationSpec; + } + + /** + * @return what kind of replication scope this drop is running under. + * This can result in a "CREATE/REPLACE IF NEWER THAN" kind of semantic + */ + public ReplicationSpec getReplicationSpec(){ + if (replicationSpec == null){ + this.replicationSpec = new ReplicationSpec(); + } + return this.replicationSpec; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java index 60858e6..4f614a8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ParseUtils; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.serde.serdeConstants; @@ -91,6 +92,7 @@ public class CreateTableDesc extends DDLDesc implements Serializable { boolean isTemporary = false; private boolean isMaterialization = false; private boolean replaceMode = false; + private ReplicationSpec replicationSpec = null; private boolean isCTAS = false; List<SQLPrimaryKey> primaryKeys; List<SQLForeignKey> foreignKeys; @@ -646,6 +648,25 @@ public class CreateTableDesc extends DDLDesc implements Serializable { return replaceMode; } + /** + * @param replicationSpec Sets the replication spec governing this create. + * This parameter will have meaningful values only for creates happening as a result of a replication. + */ + public void setReplicationSpec(ReplicationSpec replicationSpec) { + this.replicationSpec = replicationSpec; + } + + /** + * @return what kind of replication scope this drop is running under. + * This can result in a "CREATE/REPLACE IF NEWER THAN" kind of semantic + */ + public ReplicationSpec getReplicationSpec(){ + if (replicationSpec == null){ + this.replicationSpec = new ReplicationSpec(); + } + return this.replicationSpec; + } + public boolean isCTAS() { return isCTAS; }