http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
index 314ca48..b71cfa4 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
@@ -17,13 +17,21 @@
  */
 package org.apache.hadoop.hive.ql.parse;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import 
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
 import org.apache.hadoop.hive.shims.Utils;
-
+import org.apache.hadoop.hive.ql.parse.WarehouseInstance;
 import static 
org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
+import static org.apache.hadoop.hive.ql.io.AcidUtils.isFullAcidTable;
+import static org.apache.hadoop.hive.ql.io.AcidUtils.isTransactionalTable;
+
+import org.apache.hadoop.hive.ql.parse.ReplicationTestUtils;
 
 import org.junit.rules.TestName;
 
@@ -40,7 +48,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import com.google.common.collect.Lists;
 
 /**
@@ -52,14 +62,9 @@ public class 
TestReplicationScenariosIncrementalLoadAcidTables {
 
   protected static final Logger LOG = 
LoggerFactory.getLogger(TestReplicationScenariosIncrementalLoadAcidTables.class);
   static WarehouseInstance primary;
-  private static WarehouseInstance replica, replicaNonAcid;
+  private static WarehouseInstance replica, replicaNonAcid, replicaMigration, 
primaryMigration;
   private static HiveConf conf;
   private String primaryDbName, replicatedDbName, primaryDbNameExtra;
-  private enum OperationType {
-    REPL_TEST_ACID_INSERT, REPL_TEST_ACID_INSERT_SELECT, REPL_TEST_ACID_CTAS,
-    REPL_TEST_ACID_INSERT_OVERWRITE, REPL_TEST_ACID_INSERT_IMPORT, 
REPL_TEST_ACID_INSERT_LOADLOCAL,
-    REPL_TEST_ACID_INSERT_UNION
-  }
 
   @BeforeClass
   public static void classLevelSetup() throws Exception {
@@ -67,7 +72,7 @@ public class 
TestReplicationScenariosIncrementalLoadAcidTables {
     overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
         GzipJSONMessageEncoder.class.getCanonicalName());
 
-    internalBeforeClassSetup(overrides, 
TestReplicationScenariosAcidTables.class);
+    internalBeforeClassSetup(overrides, 
TestReplicationScenariosIncrementalLoadAcidTables.class);
   }
 
   static void internalBeforeClassSetup(Map<String, String> overrides, Class 
clazz)
@@ -100,6 +105,36 @@ public class 
TestReplicationScenariosIncrementalLoadAcidTables {
         put("hive.metastore.client.capability.check", "false");
     }};
     replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, 
overridesForHiveConf1);
+
+    HashMap<String, String> overridesForHiveConfReplicaMigration = new 
HashMap<String, String>() {{
+      put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
+      put("hive.support.concurrency", "true");
+      put("hive.txn.manager", 
"org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+      put("hive.metastore.client.capability.check", "false");
+      put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+      put("hive.exec.dynamic.partition.mode", "nonstrict");
+      put("hive.strict.checks.bucketing", "false");
+      put("hive.mapred.mode", "nonstrict");
+      put("mapred.input.dir.recursive", "true");
+      put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+      put("hive.strict.managed.tables", "true");
+    }};
+    replicaMigration = new WarehouseInstance(LOG, miniDFSCluster, 
overridesForHiveConfReplicaMigration);
+
+    HashMap<String, String> overridesForHiveConfPrimaryMigration = new 
HashMap<String, String>() {{
+      put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
+      put("hive.metastore.client.capability.check", "false");
+      put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+      put("hive.exec.dynamic.partition.mode", "nonstrict");
+      put("hive.strict.checks.bucketing", "false");
+      put("hive.mapred.mode", "nonstrict");
+      put("mapred.input.dir.recursive", "true");
+      put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+      put("hive.support.concurrency", "false");
+      put("hive.txn.manager", 
"org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
+      put("hive.strict.managed.tables", "false");
+    }};
+    primaryMigration = new WarehouseInstance(LOG, miniDFSCluster, 
overridesForHiveConfPrimaryMigration);
   }
 
   @AfterClass
@@ -138,231 +173,70 @@ public class 
TestReplicationScenariosIncrementalLoadAcidTables {
     String tableName = testName.getMethodName() + "testInsert";
     String tableNameMM = tableName + "_MM";
 
-    appendInsert(tableName, tableNameMM, selectStmtList, expectedValues);
-    appendDelete(selectStmtList, expectedValues);
-    appendUpdate(selectStmtList, expectedValues);
-    appendTruncate(selectStmtList, expectedValues);
-    appendInsertIntoFromSelect(tableName, tableNameMM, selectStmtList, 
expectedValues);
-    appendMerge(selectStmtList, expectedValues);
-    appendCreateAsSelect(tableName, tableNameMM, selectStmtList, 
expectedValues);
-    appendImport(tableName, tableNameMM, selectStmtList, expectedValues);
-    appendInsertOverwrite(tableName, tableNameMM, selectStmtList, 
expectedValues);
-    appendLoadLocal(tableName, tableNameMM, selectStmtList, expectedValues);
-    appendInsertUnion(tableName, tableNameMM, selectStmtList, expectedValues);
-    appendMultiStatementTxn(selectStmtList, expectedValues);
-    appendMultiStatementTxnUpdateDelete(selectStmtList, expectedValues);
-    appendAlterTable(selectStmtList, expectedValues);
-
-    verifyIncrementalLoad(selectStmtList, expectedValues, 
bootStrapDump.lastReplicationId);
-  }
-
-  private void appendInsert(String tableName, String tableNameMM,
+    ReplicationTestUtils.appendInsert(primary, primaryDbName, 
primaryDbNameExtra, tableName,
+            tableNameMM, selectStmtList, expectedValues);
+    appendDelete(primary, primaryDbName, primaryDbNameExtra, selectStmtList, 
expectedValues);
+    appendUpdate(primary, primaryDbName, primaryDbNameExtra, selectStmtList, 
expectedValues);
+    ReplicationTestUtils.appendTruncate(primary, primaryDbName, 
primaryDbNameExtra,
+            selectStmtList, expectedValues);
+    ReplicationTestUtils.appendInsertIntoFromSelect(primary, primaryDbName, 
primaryDbNameExtra,
+            tableName, tableNameMM, selectStmtList, expectedValues);
+    ReplicationTestUtils.appendMerge(primary, primaryDbName, 
primaryDbNameExtra, selectStmtList, expectedValues);
+    ReplicationTestUtils.appendCreateAsSelect(primary, primaryDbName, 
primaryDbNameExtra, tableName,
+            tableNameMM, selectStmtList, expectedValues);
+    ReplicationTestUtils.appendImport(primary, primaryDbName, 
primaryDbNameExtra, tableName,
+            tableNameMM, selectStmtList, expectedValues);
+    ReplicationTestUtils.appendInsertOverwrite(primary, primaryDbName, 
primaryDbNameExtra, tableName,
+            tableNameMM, selectStmtList, expectedValues);
+    ReplicationTestUtils.appendLoadLocal(primary, primaryDbName, 
primaryDbNameExtra, tableName,
+            tableNameMM, selectStmtList, expectedValues);
+    ReplicationTestUtils.appendInsertUnion(primary, primaryDbName, 
primaryDbNameExtra, tableName,
+            tableNameMM, selectStmtList, expectedValues);
+    ReplicationTestUtils.appendMultiStatementTxn(primary, primaryDbName, 
primaryDbNameExtra,
+            selectStmtList, expectedValues);
+    appendMultiStatementTxnUpdateDelete(primary, primaryDbName, 
primaryDbNameExtra, selectStmtList, expectedValues);
+    ReplicationTestUtils.appendAlterTable(primary, primaryDbName, 
primaryDbNameExtra, selectStmtList, expectedValues);
+
+    verifyIncrementalLoadInt(selectStmtList, expectedValues, 
bootStrapDump.lastReplicationId);
+  }
+
+  private void appendDelete(WarehouseInstance primary, String primaryDbName, 
String primaryDbNameExtra,
                             List<String> selectStmtList, List<String[]> 
expectedValues) throws Throwable {
-    insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
-    selectStmtList.add("select key from " + tableName + " order by key");
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-    insertRecords(tableNameMM, null, true, 
OperationType.REPL_TEST_ACID_INSERT);
-    selectStmtList.add("select key from " + tableNameMM + " order by key");
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-  }
-
-  private void appendDelete(List<String> selectStmtList, List<String[]> 
expectedValues) throws Throwable {
-    String tableName = testName.getMethodName() + "testDelete";
-    insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+    String tableName = "testDelete";
+    ReplicationTestUtils.insertRecords(primary, primaryDbName, 
primaryDbNameExtra,
+            tableName, null, false, 
ReplicationTestUtils.OperationType.REPL_TEST_ACID_INSERT);
     deleteRecords(tableName);
     selectStmtList.add("select count(*) from " + tableName);
     expectedValues.add(new String[] {"0"});
   }
 
-  private void appendUpdate(List<String> selectStmtList, List<String[]> 
expectedValues) throws Throwable {
-    String tableName = testName.getMethodName() + "testUpdate";
-    insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+  private void appendUpdate(WarehouseInstance primary, String primaryDbName, 
String primaryDbNameExtra,
+                            List<String> selectStmtList, List<String[]> 
expectedValues) throws Throwable {
+    String tableName = "testUpdate";
+    ReplicationTestUtils.insertRecords(primary, primaryDbName, 
primaryDbNameExtra,
+            tableName, null, false, 
ReplicationTestUtils.OperationType.REPL_TEST_ACID_INSERT);
     updateRecords(tableName);
     selectStmtList.add("select value from " + tableName + " order by value");
     expectedValues.add(new String[] {"1", "100", "100", "100", "100"});
   }
 
-  private void appendTruncate(List<String> selectStmtList, List<String[]> 
expectedValues) throws Throwable {
-    String tableName = testName.getMethodName() + "testTruncate";
-    String tableNameMM = tableName + "_MM";
-
-    insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
-    truncateTable(primaryDbName, tableName);
-    selectStmtList.add("select count(*) from " + tableName);
-    expectedValues.add(new String[] {"0"});
-    selectStmtList.add("select count(*) from " + tableName + "_nopart");
-    expectedValues.add(new String[] {"0"});
-
-    insertRecords(tableNameMM, null, true, 
OperationType.REPL_TEST_ACID_INSERT);
-    truncateTable(primaryDbName, tableNameMM);
-    selectStmtList.add("select count(*) from " + tableNameMM);
-    expectedValues.add(new String[] {"0"});
-    selectStmtList.add("select count(*) from " + tableNameMM + "_nopart");
-    expectedValues.add(new String[] {"0"});
-  }
-
-  private void appendAlterTable(List<String> selectStmtList, List<String[]> 
expectedValues) throws Throwable {
-    String tableName = testName.getMethodName() + "testAlterTable";
-    String tableNameMM = tableName + "_MM";
-
-    insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
-    primary.run("use " + primaryDbName)
-            .run("alter table " + tableName + " change value value1 int ")
-            .run("select value1 from " + tableName)
-            .verifyResults(new String[]{"1", "2", "3", "4", "5"})
-            .run("alter table " + tableName + "_nopart change value value1 int 
")
-            .run("select value1 from " + tableName + "_nopart")
-            .verifyResults(new String[]{"1", "2", "3", "4", "5"});
-    selectStmtList.add("select value1 from " + tableName );
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-    selectStmtList.add("select value1 from " + tableName + "_nopart");
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-
-    insertRecords(tableNameMM, null, true, 
OperationType.REPL_TEST_ACID_INSERT);
-    primary.run("use " + primaryDbName)
-            .run("alter table " + tableNameMM + " change value value1 int ")
-            .run("select value1 from " + tableNameMM)
-            .verifyResults(new String[]{"1", "2", "3", "4", "5"})
-            .run("alter table " + tableNameMM + "_nopart change value value1 
int ")
-            .run("select value1 from " + tableNameMM + "_nopart")
-            .verifyResults(new String[]{"1", "2", "3", "4", "5"});
-    selectStmtList.add("select value1 from " + tableNameMM );
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-    selectStmtList.add("select value1 from " + tableNameMM + "_nopart");
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-  }
-
-  private void appendInsertIntoFromSelect(String tableName, String tableNameMM,
-                                          List<String> selectStmtList, 
List<String[]> expectedValues) throws Throwable {
-    String tableNameSelect = testName.getMethodName() + "_Select";
-    String tableNameSelectMM = testName.getMethodName() + "_SelectMM";
-
-    insertRecords(tableName, tableNameSelect, false, 
OperationType.REPL_TEST_ACID_INSERT_SELECT);
-    selectStmtList.add("select key from " + tableNameSelect + " order by key");
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-
-    insertRecords(tableNameMM, tableNameSelectMM, true, 
OperationType.REPL_TEST_ACID_INSERT_SELECT);
-    selectStmtList.add("select key from " + tableNameSelectMM + " order by 
key");
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-  }
-
-  private void appendMerge(List<String> selectStmtList, List<String[]> 
expectedValues) throws Throwable {
-    String tableName = testName.getMethodName() + "testMerge";
-    String tableNameMerge = testName.getMethodName() + "_Merge";
-
-    insertForMerge(tableName, tableNameMerge, false);
-    selectStmtList.add("select last_update_user from " + tableName + " order 
by last_update_user");
-    expectedValues.add(new String[] {"creation", "creation", "creation", 
"creation", "creation",
-            "creation", "creation", "merge_update", "merge_insert", 
"merge_insert"});
-    selectStmtList.add("select ID from " + tableNameMerge + " order by ID");
-    expectedValues.add(new String[] {"1", "4", "7", "8", "8", "11"});
-  }
-
-  private void appendCreateAsSelect(String tableName, String tableNameMM,
-                                    List<String> selectStmtList, 
List<String[]> expectedValues) throws Throwable {
-     String tableNameCTAS = testName.getMethodName() + "_CTAS";
-    String tableNameCTASMM = testName.getMethodName() + "_CTASMM";
-
-    insertRecords(tableName, tableNameCTAS, false, 
OperationType.REPL_TEST_ACID_CTAS);
-    selectStmtList.add("select key from " + tableNameCTAS + " order by key");
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-
-    insertRecords(tableNameMM, tableNameCTASMM, true, 
OperationType.REPL_TEST_ACID_CTAS);
-    selectStmtList.add("select key from " + tableNameCTASMM + " order by key");
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-  }
-
-  private void appendImport(String tableName, String tableNameMM,
-                            List<String> selectStmtList, List<String[]> 
expectedValues) throws Throwable {
-     String tableNameImport = testName.getMethodName() + "_Import";
-    String tableNameImportMM = testName.getMethodName() + "_ImportMM";
-
-    insertRecords(tableName, tableNameImport, false, 
OperationType.REPL_TEST_ACID_INSERT_IMPORT);
-    selectStmtList.add("select key from " + tableNameImport + " order by key");
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-
-    insertRecords(tableNameMM, tableNameImportMM, true, 
OperationType.REPL_TEST_ACID_INSERT_IMPORT);
-    selectStmtList.add("select key from " + tableNameImportMM + " order by 
key");
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-  }
-
-  private void appendInsertOverwrite(String tableName, String tableNameMM,
-                                     List<String> selectStmtList, 
List<String[]> expectedValues) throws Throwable {
-    String tableNameOW = tableName + "_OW";
-    String tableNameOWMM = testName.getMethodName() +"_OWMM";
-
-    insertRecords(tableName, tableNameOW, false, 
OperationType.REPL_TEST_ACID_INSERT_OVERWRITE);
-    selectStmtList.add("select key from " + tableNameOW + " order by key");
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-
-    insertRecords(tableNameMM, tableNameOWMM, true, 
OperationType.REPL_TEST_ACID_INSERT_OVERWRITE);
-    selectStmtList.add("select key from " + tableNameOWMM + " order by key");
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-  }
-
-  //TODO: need to check why its failing. Loading to acid table from local path 
is failing.
-  private void appendLoadLocal(String tableName, String tableNameMM,
-                               List<String> selectStmtList, List<String[]> 
expectedValues) throws Throwable {
-    String tableNameLL = testName.getMethodName() +"_LL";
-    String tableNameLLMM = testName.getMethodName() +"_LLMM";
-
-    insertRecords(tableName, tableNameLL, false, 
OperationType.REPL_TEST_ACID_INSERT_LOADLOCAL);
-    selectStmtList.add("select key from " + tableNameLL + " order by key");
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-
-    insertRecords(tableNameMM, tableNameLLMM, true, 
OperationType.REPL_TEST_ACID_INSERT_LOADLOCAL);
-    selectStmtList.add("select key from " + tableNameLLMM + " order by key");
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-  }
-
-  private void appendInsertUnion(String tableName, String tableNameMM,
-                                 List<String> selectStmtList, List<String[]> 
expectedValues) throws Throwable {
-    String tableNameUnion = testName.getMethodName() +"_UNION";
-    String tableNameUnionMM = testName.getMethodName() +"_UNIONMM";
-    String[] resultArrayUnion = new String[]{"1", "1", "2", "2", "3", "3", 
"4", "4", "5", "5"};
-
-    insertRecords(tableName, tableNameUnion, false, 
OperationType.REPL_TEST_ACID_INSERT_UNION);
-    selectStmtList.add( "select key from " + tableNameUnion + " order by key");
-    expectedValues.add(resultArrayUnion);
-    selectStmtList.add("select key from " + tableNameUnion + "_nopart" + " 
order by key");
-    expectedValues.add(resultArrayUnion);
-
-    insertRecords(tableNameMM, tableNameUnionMM, true, 
OperationType.REPL_TEST_ACID_INSERT_UNION);
-    selectStmtList.add( "select key from " + tableNameUnionMM + " order by 
key");
-    expectedValues.add(resultArrayUnion);
-    selectStmtList.add("select key from " + tableNameUnionMM + "_nopart" + " 
order by key");
-    expectedValues.add(resultArrayUnion);
-  }
-
-  private void appendMultiStatementTxn(List<String> selectStmtList, 
List<String[]> expectedValues) throws Throwable {
-    String tableName = testName.getMethodName() + "testMultiStatementTxn";
-    String[] resultArray = new String[]{"1", "2", "3", "4", "5"};
-    String tableNameMM = tableName + "_MM";
-    String tableProperty = "'transactional'='true'";
-
-    insertIntoDB(primaryDbName, tableName, tableProperty, resultArray, true);
-    selectStmtList.add("select key from " + tableName + " order by key");
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-
-    tableProperty = setMMtableProperty(tableProperty);
-    insertIntoDB(primaryDbName, tableNameMM, tableProperty, resultArray, true);
-    selectStmtList.add("select key from " + tableNameMM + " order by key");
-    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
-  }
-
-  private void appendMultiStatementTxnUpdateDelete(List<String> 
selectStmtList, List<String[]> expectedValues)
+  private void appendMultiStatementTxnUpdateDelete(WarehouseInstance primary, 
String primaryDbName, String primaryDbNameExtra,
+                                                   List<String> 
selectStmtList, List<String[]> expectedValues)
           throws Throwable {
-    String tableName = testName.getMethodName() + 
"testMultiStatementTxnUpdate";
-    String tableNameDelete = testName.getMethodName() + 
"testMultiStatementTxnDelete";
+    String tableName = "testMultiStatementTxnUpdate";
+    String tableNameDelete = "testMultiStatementTxnDelete";
     String[] resultArray = new String[]{"1", "2", "3", "4", "5"};
     String tableProperty = "'transactional'='true'";
+    String tableStorage = "STORED AS ORC";
 
-    insertIntoDB(primaryDbName, tableName, tableProperty, resultArray, true);
+    ReplicationTestUtils.insertIntoDB(primary, primaryDbName, tableName, 
tableProperty,
+            tableStorage, resultArray, true);
     updateRecords(tableName);
     selectStmtList.add("select value from " + tableName + " order by value");
     expectedValues.add(new String[] {"1", "100", "100", "100", "100"});
 
-    insertIntoDB(primaryDbName, tableNameDelete, tableProperty, resultArray, 
true);
+    ReplicationTestUtils.insertIntoDB(primary, primaryDbName, tableNameDelete, 
tableProperty,
+            tableStorage, resultArray, true);
     deleteRecords(tableNameDelete);
     selectStmtList.add("select count(*) from " + tableNameDelete);
     expectedValues.add(new String[] {"0"});
@@ -370,8 +244,8 @@ public class 
TestReplicationScenariosIncrementalLoadAcidTables {
 
   @Test
   public void testReplCM() throws Throwable {
-    String tableName = testName.getMethodName();
-    String tableNameMM = testName.getMethodName() + "_MM";
+    String tableName = "testcm";
+    String tableNameMM = tableName + "_MM";
     String[] result = new String[]{"5"};
 
     WarehouseInstance.Tuple incrementalDump;
@@ -380,44 +254,37 @@ public class 
TestReplicationScenariosIncrementalLoadAcidTables {
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(bootStrapDump.lastReplicationId);
 
-    insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+    ReplicationTestUtils.insertRecords(primary, primaryDbName, 
primaryDbNameExtra,
+            tableName, null, false, 
ReplicationTestUtils.OperationType.REPL_TEST_ACID_INSERT);
     incrementalDump = primary.dump(primaryDbName, 
bootStrapDump.lastReplicationId);
     primary.run("drop table " + primaryDbName + "." + tableName);
     replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
             .run("REPL STATUS " + 
replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
-    verifyResultsInReplica(Lists.newArrayList("select count(*) from " + 
tableName,
+    verifyResultsInReplicaInt(Lists.newArrayList("select count(*) from " + 
tableName,
                                               "select count(*) from " + 
tableName + "_nopart"),
                             Lists.newArrayList(result, result));
 
-    insertRecords(tableNameMM, null, true, 
OperationType.REPL_TEST_ACID_INSERT);
+    ReplicationTestUtils.insertRecords(primary, primaryDbName, 
primaryDbNameExtra,
+            tableNameMM, null, true, 
ReplicationTestUtils.OperationType.REPL_TEST_ACID_INSERT);
     incrementalDump = primary.dump(primaryDbName, 
bootStrapDump.lastReplicationId);
     primary.run("drop table " + primaryDbName + "." + tableNameMM);
     replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
             .run("REPL STATUS " + 
replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
-    verifyResultsInReplica(Lists.newArrayList("select count(*) from " + 
tableNameMM,
+    verifyResultsInReplicaInt(Lists.newArrayList("select count(*) from " + 
tableNameMM,
             "select count(*) from " + tableNameMM + "_nopart"),
             Lists.newArrayList(result, result));
   }
 
-  private void verifyResultsInReplica(List<String> selectStmtList, 
List<String[]> expectedValues) throws Throwable  {
-    for (int idx = 0; idx < selectStmtList.size(); idx++) {
-      replica.run("use " + replicatedDbName)
-              .run(selectStmtList.get(idx))
-              .verifyResults(expectedValues.get(idx));
-    }
+  private void verifyResultsInReplicaInt(List<String> selectStmtList, 
List<String[]> expectedValues) throws Throwable  {
+    ReplicationTestUtils.verifyResultsInReplica(replica, replicatedDbName, 
selectStmtList, expectedValues);
   }
 
-  private WarehouseInstance.Tuple verifyIncrementalLoad(List<String> 
selectStmtList,
-                                                  List<String[]> 
expectedValues, String lastReplId) throws Throwable {
-    WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, 
lastReplId);
-    replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
-            .run("REPL STATUS " + 
replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
-    verifyResultsInReplica(selectStmtList, expectedValues);
 
-    replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
-            .run("REPL STATUS " + 
replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
-    verifyResultsInReplica(selectStmtList, expectedValues);
-    return incrementalDump;
+  private WarehouseInstance.Tuple verifyIncrementalLoadInt(List<String> 
selectStmtList,
+                                                List<String[]> expectedValues, 
String lastReplId) throws Throwable {
+    return ReplicationTestUtils.verifyIncrementalLoad(primary, replica, 
primaryDbName,
+            replicatedDbName, selectStmtList, expectedValues, lastReplId);
+
   }
 
   private void deleteRecords(String tableName) throws Throwable {
@@ -434,217 +301,88 @@ public class 
TestReplicationScenariosIncrementalLoadAcidTables {
             .verifyResults(new String[] {"1", "100", "100", "100", "100"});
   }
 
-  private void truncateTable(String dbName, String tableName) throws Throwable 
{
-    primary.run("use " + dbName)
-            .run("truncate table " + tableName)
-            .run("select count(*) from " + tableName)
-            .verifyResult("0")
-            .run("truncate table " + tableName + "_nopart")
-            .run("select count(*) from " + tableName + "_nopart")
-            .verifyResult("0");
-  }
-
-  private WarehouseInstance.Tuple verifyLoad(String tableName, String 
tableNameOp, String lastReplId) throws Throwable {
-    String[] resultArray = new String[]{"1", "2", "3", "4", "5"};
-    if (tableNameOp == null) {
-      return verifyIncrementalLoad(Lists.newArrayList("select key from " + 
tableName + " order by key",
-              "select key from " + tableName + "_nopart order by key"),
-              Lists.newArrayList(resultArray, resultArray), lastReplId);
-    }
-    return verifyIncrementalLoad(Lists.newArrayList("select key from " + 
tableName + " order by key",
-                                                    "select key from " + 
tableNameOp + " order by key",
-                                                    "select key from " + 
tableName + "_nopart" + " order by key",
-                                                    "select key from " + 
tableNameOp + "_nopart" + " order by key"),
-                    Lists.newArrayList(resultArray, resultArray, resultArray, 
resultArray), lastReplId);
-  }
-
-  private void insertIntoDB(String dbName, String tableName, String 
tableProperty, String[] resultArray, boolean isTxn)
-          throws Throwable {
-    String txnStrStart = "START TRANSACTION";
-    String txnStrCommit = "COMMIT";
-    if (!isTxn) {
-      txnStrStart = "use " + dbName; //dummy
-      txnStrCommit = "use " + dbName; //dummy
-    }
-    primary.run("use " + dbName);
-    primary.run("CREATE TABLE " + tableName + " (key int, value int) 
PARTITIONED BY (load_date date) " +
-            "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " 
+ tableProperty + ")")
-            .run("SHOW TABLES LIKE '" + tableName + "'")
-            .verifyResult(tableName)
-            .run("CREATE TABLE " + tableName + "_nopart (key int, value int) " 
+
-                    "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC 
TBLPROPERTIES ( " + tableProperty + ")")
-            .run("SHOW TABLES LIKE '" + tableName + "_nopart'")
-            .run("ALTER TABLE " + tableName + " ADD PARTITION 
(load_date='2016-03-03')")
-            .run(txnStrStart)
-            .run("INSERT INTO " + tableName + " partition 
(load_date='2016-03-01') VALUES (1, 1)")
-            .run("INSERT INTO " + tableName + " partition 
(load_date='2016-03-01') VALUES (2, 2)")
-            .run("INSERT INTO " + tableName + " partition 
(load_date='2016-03-02') VALUES (3, 3)")
-            .run("INSERT INTO " + tableName + " partition 
(load_date='2016-03-03') VALUES (4, 4)")
-            .run("INSERT INTO " + tableName + " partition 
(load_date='2016-03-02') VALUES (5, 5)")
-            .run("select key from " + tableName + " order by key")
-            .verifyResults(resultArray)
-            .run("INSERT INTO " + tableName + "_nopart (key, value) select 
key, value from " + tableName)
-            .run("select key from " + tableName + "_nopart" + " order by key")
-            .verifyResults(resultArray)
-            .run(txnStrCommit);
-  }
-
-  private void insertIntoDB(String dbName, String tableName, String 
tableProperty, String[] resultArray)
-          throws Throwable {
-    insertIntoDB(dbName, tableName, tableProperty, resultArray, false);
+  private WarehouseInstance.Tuple prepareDataAndDump(String primaryDbName, 
String fromReplId) throws Throwable {
+    WarehouseInstance.Tuple tuple =  primaryMigration.run("use " + 
primaryDbName)
+            .run("create table tacid (id int) clustered by(id) into 3 buckets 
stored as orc ")
+            .run("insert into tacid values(1)")
+            .run("insert into tacid values(2)")
+            .run("insert into tacid values(3)")
+            .run("create table tacidpart (place string) partitioned by 
(country string) clustered by(place) " +
+                    "into 3 buckets stored as orc ")
+            .run("alter table tacidpart add partition(country='france')")
+            .run("insert into tacidpart partition(country='india') 
values('mumbai')")
+            .run("insert into tacidpart partition(country='us') values('sf')")
+            .run("insert into tacidpart partition(country='france') 
values('paris')")
+            .run("create table tflat (rank int) stored as orc 
tblproperties(\"transactional\"=\"false\")")
+            .run("insert into tflat values(11)")
+            .run("insert into tflat values(22)")
+            .run("create table tflattext (id int) ")
+            .run("insert into tflattext values(111), (222)")
+            .run("create table tflattextpart (id int) partitioned by (country 
string) ")
+            .run("insert into tflattextpart partition(country='india') 
values(1111), (2222)")
+            .run("insert into tflattextpart partition(country='us') 
values(3333)")
+            .run("create table avro_table ROW FORMAT SERDE 
'org.apache.hadoop.hive.serde2.avro.AvroSerDe' " +
+                    "stored as avro tblproperties ('avro.schema.url'='" + 
primaryMigration.avroSchemaFile.toUri().toString() + "')")
+            .run("insert into avro_table values('str1', 10)")
+            .dump(primaryDbName, fromReplId);
+    assertFalse(isTransactionalTable(primaryMigration.getTable(primaryDbName, 
"tacid")));
+    assertFalse(isTransactionalTable(primaryMigration.getTable(primaryDbName, 
"tacidpart")));
+    assertFalse(isTransactionalTable(primaryMigration.getTable(primaryDbName, 
"tflat")));
+    assertFalse(isTransactionalTable(primaryMigration.getTable(primaryDbName, 
"tflattext")));
+    assertFalse(isTransactionalTable(primaryMigration.getTable(primaryDbName, 
"tflattextpart")));
+    Table avroTable = primaryMigration.getTable(replicatedDbName, 
"avro_table");
+    assertFalse(isTransactionalTable(avroTable));
+    assertFalse(MetaStoreUtils.isExternalTable(avroTable));
+    return tuple;
+  }
+
+  private void verifyLoadExecution(String replicatedDbName, String lastReplId) 
throws Throwable {
+    replicaMigration.run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"tacid", "tacidpart", "tflat", 
"tflattext", "tflattextpart",
+                    "avro_table"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(lastReplId)
+            .run("select id from tacid order by id")
+            .verifyResults(new String[]{"1", "2", "3"})
+            .run("select country from tacidpart order by country")
+            .verifyResults(new String[] {"france", "india", "us"})
+            .run("select rank from tflat order by rank")
+            .verifyResults(new String[] {"11", "22"})
+            .run("select id from tflattext order by id")
+            .verifyResults(new String[] {"111", "222"})
+            .run("select id from tflattextpart order by id")
+            .verifyResults(new String[] {"1111", "2222", "3333"})
+            .run("select col1 from avro_table")
+            .verifyResults(new String[] {"str1"});
+
+    assertTrue(isFullAcidTable(replicaMigration.getTable(replicatedDbName, 
"tacid")));
+    assertTrue(isFullAcidTable(replicaMigration.getTable(replicatedDbName, 
"tacidpart")));
+    assertTrue(isFullAcidTable(replicaMigration.getTable(replicatedDbName, 
"tflat")));
+    assertTrue(!isFullAcidTable(replicaMigration.getTable(replicatedDbName, 
"tflattext")));
+    assertTrue(!isFullAcidTable(replicaMigration.getTable(replicatedDbName, 
"tflattextpart")));
+    
assertTrue(isTransactionalTable(replicaMigration.getTable(replicatedDbName, 
"tflattext")));
+    
assertTrue(isTransactionalTable(replicaMigration.getTable(replicatedDbName, 
"tflattextpart")));
+
+    Table avroTable = replicaMigration.getTable(replicatedDbName, 
"avro_table");
+    assertTrue(MetaStoreUtils.isExternalTable(avroTable));
+    Path tablePath = new 
PathBuilder(replicaMigration.externalTableWarehouseRoot.toString()).addDescendant(replicatedDbName
 + ".db")
+            .addDescendant("avro_table")
+            .build();
+    assertEquals(avroTable.getSd().getLocation().toLowerCase(), 
tablePath.toUri().toString().toLowerCase());
   }
 
-  private void insertRecords(String tableName, String tableNameOp, boolean 
isMMTable,
-                             OperationType opType) throws Throwable {
-    insertRecordsIntoDB(primaryDbName, tableName, tableNameOp, isMMTable, 
opType);
-  }
-
-  private void insertRecordsIntoDB(String DbName, String tableName, String 
tableNameOp, boolean isMMTable,
-                             OperationType opType) throws Throwable {
-    String[] resultArray = new String[]{"1", "2", "3", "4", "5"};
-    String tableProperty = "'transactional'='true'";
-    if (isMMTable) {
-      tableProperty = setMMtableProperty(tableProperty);
-    }
-    primary.run("use " + DbName);
-
-    switch (opType) {
-      case REPL_TEST_ACID_INSERT:
-        insertIntoDB(DbName, tableName, tableProperty, resultArray);
-        insertIntoDB(primaryDbNameExtra, tableName, tableProperty, 
resultArray);
-        return;
-      case REPL_TEST_ACID_INSERT_OVERWRITE:
-        primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) 
PARTITIONED BY (load_date date) " +
-              "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( 
"+ tableProperty + " )")
-        .run("INSERT INTO " + tableNameOp + " partition 
(load_date='2016-03-01') VALUES (2, 2)")
-        .run("INSERT INTO " + tableNameOp + " partition 
(load_date='2016-03-01') VALUES (10, 12)")
-        .run("INSERT INTO " + tableNameOp + " partition 
(load_date='2016-03-02') VALUES (11, 1)")
-        .run("select key from " + tableNameOp + " order by key")
-        .verifyResults(new String[]{"2", "10", "11"})
-        .run("insert overwrite table " + tableNameOp + " select * from " + 
tableName)
-        .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " +
-                "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES 
( "+ tableProperty + " )")
-        .run("INSERT INTO " + tableNameOp + "_nopart VALUES (2, 2)")
-        .run("INSERT INTO " + tableNameOp + "_nopart VALUES (10, 12)")
-        .run("INSERT INTO " + tableNameOp + "_nopart VALUES (11, 1)")
-        .run("select key from " + tableNameOp + "_nopart" + " order by key")
-        .verifyResults(new String[]{"2", "10", "11"})
-        .run("insert overwrite table " + tableNameOp + "_nopart select * from 
" + tableName + "_nopart")
-        .run("select key from " + tableNameOp + "_nopart" + " order by key");
-        break;
-      case REPL_TEST_ACID_INSERT_SELECT:
-        primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) 
PARTITIONED BY (load_date date) " +
-            "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " 
+ tableProperty + " )")
-        .run("insert into " + tableNameOp + " partition (load_date) select * 
from " + tableName)
-        .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " +
-                "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES 
( " + tableProperty + " )")
-        .run("insert into " + tableNameOp + "_nopart select * from " + 
tableName + "_nopart");
-        break;
-      case REPL_TEST_ACID_INSERT_IMPORT:
-        String path = "hdfs:///tmp/" + DbName + "/";
-        String exportPath = "'" + path + tableName + "/'";
-        String exportPathNoPart = "'" + path + tableName + "_nopart/'";
-        primary.run("export table " + tableName + " to " + exportPath)
-        .run("import table " + tableNameOp + " from " + exportPath)
-        .run("export table " + tableName + "_nopart to " + exportPathNoPart)
-        .run("import table " + tableNameOp + "_nopart from " + 
exportPathNoPart);
-        break;
-      case REPL_TEST_ACID_CTAS:
-        primary.run("create table " + tableNameOp + " as select * from " + 
tableName)
-                .run("create table " + tableNameOp + "_nopart as select * from 
" + tableName + "_nopart");
-        break;
-      case REPL_TEST_ACID_INSERT_LOADLOCAL:
-        // For simplicity setting key and value as same value
-        StringBuilder buf = new StringBuilder();
-        boolean nextVal = false;
-        for (String key : resultArray) {
-          if (nextVal) {
-            buf.append(',');
-          }
-          buf.append('(');
-          buf.append(key);
-          buf.append(',');
-          buf.append(key);
-          buf.append(')');
-          nextVal = true;
-        }
-
-        primary.run("CREATE TABLE " + tableNameOp + "_temp (key int, value 
int) STORED AS ORC")
-        .run("INSERT INTO TABLE " + tableNameOp + "_temp VALUES " + 
buf.toString())
-        .run("SELECT key FROM " + tableNameOp + "_temp")
-        .verifyResults(resultArray)
-        .run("CREATE TABLE " + tableNameOp + " (key int, value int) 
PARTITIONED BY (load_date date) " +
-              "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( 
" + tableProperty + ")")
-        .run("SHOW TABLES LIKE '" + tableNameOp + "'")
-        .verifyResult(tableNameOp)
-        .run("INSERT OVERWRITE LOCAL DIRECTORY './test.dat' STORED AS ORC 
SELECT * FROM " + tableNameOp + "_temp")
-        .run("LOAD DATA LOCAL INPATH './test.dat/000000_0' OVERWRITE INTO 
TABLE " + tableNameOp +
-                " PARTITION (load_date='2008-08-15')")
-        .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " +
-                      "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC 
TBLPROPERTIES ( " + tableProperty + ")")
-        .run("SHOW TABLES LIKE '" + tableNameOp + "_nopart'")
-        .verifyResult(tableNameOp + "_nopart")
-        .run("LOAD DATA LOCAL INPATH './test.dat/000000_0' OVERWRITE INTO 
TABLE " + tableNameOp + "_nopart");
-        break;
-      case REPL_TEST_ACID_INSERT_UNION:
-        primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) 
PARTITIONED BY (load_date date) " +
-                "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES 
( " + tableProperty + ")")
-                .run("SHOW TABLES LIKE '" + tableNameOp + "'")
-                .verifyResult(tableNameOp)
-                .run("insert overwrite table " + tableNameOp + " partition 
(load_date) select * from " + tableName +
-                    " union all select * from " + tableName)
-                .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value 
int) " +
-                "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES 
( " + tableProperty + ")")
-                .run("insert overwrite table " + tableNameOp + "_nopart select 
* from " + tableName +
-                        "_nopart union all select * from " + tableName + 
"_nopart");
-        resultArray = new String[]{"1", "2", "3", "4", "5", "1", "2", "3", 
"4", "5"};
-        break;
-      default:
-        return;
-    }
-    primary.run("select key from " + tableNameOp + " order by 
key").verifyResults(resultArray);
-    primary.run("select key from " + tableNameOp + "_nopart" + " order by 
key").verifyResults(resultArray);
-  }
-
-  private String setMMtableProperty(String tableProperty) throws Throwable  {
-    return tableProperty.concat(", 'transactional_properties' = 
'insert_only'");
-  }
-
-  private void insertForMerge(String tableName, String tableNameMerge, boolean 
isMMTable) throws Throwable  {
-    String tableProperty = "'transactional'='true'";
-    if (isMMTable) {
-      tableProperty = setMMtableProperty(tableProperty);
-    }
-    primary.run("use " + primaryDbName)
-        .run("CREATE TABLE " + tableName + "( ID int, TranValue string, 
last_update_user string) PARTITIONED BY " +
-                "(tran_date string) CLUSTERED BY (ID) into 5 buckets STORED AS 
ORC TBLPROPERTIES " +
-                " ( "+ tableProperty + " )")
-        .run("SHOW TABLES LIKE '" + tableName + "'")
-        .verifyResult(tableName)
-        .run("CREATE TABLE " + tableNameMerge + " ( ID int, TranValue string, 
tran_date string) STORED AS ORC ")
-        .run("SHOW TABLES LIKE '" + tableNameMerge + "'")
-        .verifyResult(tableNameMerge)
-        .run("INSERT INTO " + tableName + " PARTITION (tran_date) VALUES (1, 
'value_01', 'creation', '20170410')," +
-                " (2, 'value_02', 'creation', '20170410'), (3, 'value_03', 
'creation', '20170410'), " +
-                " (4, 'value_04', 'creation', '20170410'), (5, 'value_05', 
'creation', '20170413'), " +
-                " (6, 'value_06', 'creation', '20170413'), (7, 'value_07', 
'creation', '20170413'),  " +
-                " (8, 'value_08', 'creation', '20170413'), (9, 'value_09', 
'creation', '20170413'), " +
-                " (10, 'value_10','creation', '20170413')")
-        .run("select ID from " + tableName + " order by ID")
-        .verifyResults(new String[] {"1", "2", "3", "4", "5", "6", "7", "8", 
"9", "10"})
-        .run("INSERT INTO " + tableNameMerge + " VALUES (1, 'value_01', 
'20170410'), " +
-                " (4, NULL, '20170410'), (7, 'value_77777', '20170413'), " +
-                " (8, NULL, '20170413'), (8, 'value_08', '20170415'), " +
-                "(11, 'value_11', '20170415')")
-        .run("select ID from " + tableNameMerge + " order by ID")
-        .verifyResults(new String[] {"1", "4", "7", "8", "8", "11"})
-        .run("MERGE INTO " + tableName + " AS T USING " + tableNameMerge + " 
AS S ON T.ID = S.ID and" +
-                " T.tran_date = S.tran_date WHEN MATCHED AND (T.TranValue != 
S.TranValue AND S.TranValue " +
-                " IS NOT NULL) THEN UPDATE SET TranValue = S.TranValue, 
last_update_user = " +
-                " 'merge_update' WHEN MATCHED AND S.TranValue IS NULL THEN 
DELETE WHEN NOT MATCHED " +
-                " THEN INSERT VALUES (S.ID, S.TranValue,'merge_insert', 
S.tran_date)")
-        .run("select last_update_user from " + tableName + " order by 
last_update_user")
-        .verifyResults(new String[] {"creation", "creation", "creation", 
"creation", "creation",
-                "creation", "creation", "merge_update", "merge_insert", 
"merge_insert"});
+  @Test
+  public void testMigrationManagedToAcid() throws Throwable {
+    WarehouseInstance.Tuple tupleForBootStrap = 
primaryMigration.dump(primaryDbName, null);
+    WarehouseInstance.Tuple tuple = prepareDataAndDump(primaryDbName, null);
+    WarehouseInstance.Tuple tupleForIncremental = 
primaryMigration.dump(primaryDbName, tupleForBootStrap.lastReplicationId);
+    replicaMigration.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
+
+    replicaMigration.run("drop database if exists " + replicatedDbName + " 
cascade");
+    replicaMigration.loadWithoutExplain(replicatedDbName, 
tupleForBootStrap.dumpLocation);
+    replicaMigration.loadWithoutExplain(replicatedDbName, 
tupleForIncremental.dumpLocation);
+    verifyLoadExecution(replicatedDbName, 
tupleForIncremental.lastReplicationId);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 7900779..92f2456 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -55,8 +55,11 @@ import 
org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule;
 import org.apache.hive.hcatalog.listener.DbNotificationListener;
 import org.codehaus.plexus.util.ExceptionUtils;
 import org.slf4j.Logger;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 
 import java.io.Closeable;
+import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -79,10 +82,13 @@ public class WarehouseInstance implements Closeable {
   MiniDFSCluster miniDFSCluster;
   private HiveMetaStoreClient client;
   public final Path warehouseRoot;
+  public final Path externalTableWarehouseRoot;
+  public Path avroSchemaFile;
 
   private static int uniqueIdentifier = 0;
 
   private final static String LISTENER_CLASS = 
DbNotificationListener.class.getCanonicalName();
+  private final static String AVRO_SCHEMA_FILE_NAME = "avro_table.avsc";
 
   WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map<String, String> 
overridesForHiveConf,
       String keyNameForEncryptedZone) throws Exception {
@@ -93,12 +99,15 @@ public class WarehouseInstance implements Closeable {
     DistributedFileSystem fs = miniDFSCluster.getFileSystem();
 
     warehouseRoot = mkDir(fs, "/warehouse" + uniqueIdentifier);
+    externalTableWarehouseRoot = mkDir(fs, "/external" + uniqueIdentifier);
     if (StringUtils.isNotEmpty(keyNameForEncryptedZone)) {
       fs.createEncryptionZone(warehouseRoot, keyNameForEncryptedZone);
+      fs.createEncryptionZone(externalTableWarehouseRoot, 
keyNameForEncryptedZone);
     }
     Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier);
     this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString();
-    initialize(cmRootPath.toString(), warehouseRoot.toString(), 
overridesForHiveConf);
+    initialize(cmRootPath.toString(), warehouseRoot.toString(), 
externalTableWarehouseRoot.toString(),
+            overridesForHiveConf);
   }
 
   WarehouseInstance(Logger logger, MiniDFSCluster cluster,
@@ -106,7 +115,7 @@ public class WarehouseInstance implements Closeable {
     this(logger, cluster, overridesForHiveConf, null);
   }
 
-  private void initialize(String cmRoot, String warehouseRoot,
+  private void initialize(String cmRoot, String warehouseRoot, String 
externalTableWarehouseRoot,
       Map<String, String> overridesForHiveConf) throws Exception {
     hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), 
TestReplicationScenarios.class);
     for (Map.Entry<String, String> entry : overridesForHiveConf.entrySet()) {
@@ -126,6 +135,7 @@ public class WarehouseInstance implements Closeable {
     //    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, hiveInTest);
     // turn on db notification listener on meta store
     hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, warehouseRoot);
+    hiveConf.setVar(HiveConf.ConfVars.HIVE_METASTORE_WAREHOUSE_EXTERNAL, 
externalTableWarehouseRoot);
     hiveConf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, 
LISTENER_CLASS);
     hiveConf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
     hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
@@ -138,7 +148,6 @@ public class WarehouseInstance implements Closeable {
     hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
     hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
     hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
-    hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
     if 
(!hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER).equals("org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"))
 {
       hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, 
"false");
     }
@@ -153,6 +162,7 @@ public class WarehouseInstance implements Closeable {
     FileSystem testPathFileSystem = FileSystem.get(testPath.toUri(), hiveConf);
     testPathFileSystem.mkdirs(testPath);
 
+    avroSchemaFile = createAvroSchemaFile(testPathFileSystem, testPath);
     driver = DriverFactory.newDriver(hiveConf);
     SessionState.start(new CliSessionState(hiveConf));
     client = new HiveMetaStoreClient(hiveConf);
@@ -171,6 +181,49 @@ public class WarehouseInstance implements Closeable {
     return PathBuilder.fullyQualifiedHDFSUri(path, fs);
   }
 
+  private Path createAvroSchemaFile(FileSystem fs, Path testPath) throws 
IOException {
+    Path schemaFile = new Path(testPath, AVRO_SCHEMA_FILE_NAME);
+    String[] schemaVals = new String[] { "{",
+            "  \"type\" : \"record\",",
+            "  \"name\" : \"table1\",",
+            "  \"doc\" : \"Sqoop import of table1\",",
+            "  \"fields\" : [ {",
+            "    \"name\" : \"col1\",",
+            "    \"type\" : [ \"null\", \"string\" ],",
+            "    \"default\" : null,",
+            "    \"columnName\" : \"col1\",",
+            "    \"sqlType\" : \"12\"",
+            "  }, {",
+            "    \"name\" : \"col2\",",
+            "    \"type\" : [ \"null\", \"long\" ],",
+            "    \"default\" : null,",
+            "    \"columnName\" : \"col2\",",
+            "    \"sqlType\" : \"13\"",
+            "  } ],",
+            "  \"tableName\" : \"table1\"",
+            "}"
+    };
+    createTestDataFile(schemaFile.toUri().getPath(), schemaVals);
+    return schemaFile;
+  }
+
+  private void createTestDataFile(String filename, String[] lines) throws 
IOException {
+    FileWriter writer = null;
+    try {
+      File file = new File(filename);
+      file.deleteOnExit();
+      writer = new FileWriter(file);
+      int i=0;
+      for (String line : lines) {
+        writer.write(line + "\n");
+      }
+    } finally {
+      if (writer != null) {
+        writer.close();
+      }
+    }
+  }
+
   public HiveConf getConf() {
     return hiveConf;
   }
@@ -433,6 +486,10 @@ public class WarehouseInstance implements Closeable {
     assertEquals(expectedCount, 
client.getNotificationEventsCount(rqst).getEventsCount());
   }
 
+  public boolean isAcidEnabled() {
+    return hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
+  }
+
   @Override
   public void close() throws IOException {
     if (miniDFSCluster != null && miniDFSCluster.isClusterUp()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java 
b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
index 0372064..93e9d48 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
@@ -23,6 +23,8 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.CmRecycleResponse;
+import org.apache.hadoop.hive.metastore.api.CmRecycleRequest;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.FireEventRequest;
 import org.apache.hadoop.hive.metastore.api.FireEventResponse;
@@ -125,6 +127,10 @@ public final class SynchronizedMetaStoreClient {
     client.addWriteNotificationLog(rqst);
   }
 
+  public synchronized CmRecycleResponse recycleDirToCmPath(CmRecycleRequest 
request) throws MetaException, TException {
+    return client.recycleDirToCmPath(request);
+  }
+
   public synchronized void close() {
     client.close();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 57f71a8..608cbd5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -2027,8 +2027,11 @@ public class Driver implements IDriver {
         }
         else if(plan.getOperation() == HiveOperation.ROLLBACK) {
           releaseLocksAndCommitOrRollback(false);
-        }
-        else {
+        } else if (!queryTxnMgr.isTxnOpen() && queryState.getHiveOperation() 
== HiveOperation.REPLLOAD) {
+          // repl load during migration, commits the explicit txn and start 
some internal txns. Call
+          // releaseLocksAndCommitOrRollback to do the clean up.
+          releaseLocksAndCommitOrRollback(false);
+        } else {
           //txn (if there is one started) is not finished
         }
       } catch (LockException e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 6b28cca..a96d54d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -21,18 +21,15 @@ package org.apache.hadoop.hive.ql.exec;
 import static org.apache.commons.lang.StringUtils.join;
 import static 
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE;
 
-import java.io.BufferedWriter;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.Serializable;
-import java.io.Writer;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
 import java.sql.SQLException;
-import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -53,7 +50,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -152,13 +148,11 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.metadata.CheckConstraint;
-import org.apache.hadoop.hive.metastore.CheckResult;
 import org.apache.hadoop.hive.ql.metadata.DefaultConstraint;
 import org.apache.hadoop.hive.ql.metadata.ForeignKeyInfo;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreChecker;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.NotNullConstraint;
 import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -251,6 +245,7 @@ import org.apache.hadoop.hive.ql.plan.TruncateTableDesc;
 import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.security.authorization.AuthorizationUtils;
 import 
org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationTranslator;
 import 
org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizationTranslator;
@@ -288,7 +283,6 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.hive.common.util.AnnotationUtils;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.apache.hive.common.util.ReflectionUtil;
-import org.apache.hive.common.util.RetryUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.stringtemplate.v4.ST;
@@ -4701,7 +4695,20 @@ public class DDLTask extends Task<DDLWork> implements 
Serializable {
     // create the table
     if (crtTbl.getReplaceMode()) {
       ReplicationSpec replicationSpec = crtTbl.getReplicationSpec();
-      long writeId = replicationSpec != null && 
replicationSpec.isInReplicationScope() ? crtTbl.getReplWriteId() : 0L;
+      long writeId = 0;
+      if (replicationSpec != null && replicationSpec.isInReplicationScope()) {
+        if (replicationSpec.isMigratingToTxnTable()) {
+          // for migration we start the transaction and allocate write id in 
repl txn task for migration.
+          String writeIdPara = conf.get(ReplUtils.REPL_CURRENT_TBL_WRITE_ID);
+          if (writeIdPara == null) {
+            throw new HiveException("DDLTask : Write id is not set in the 
config by open txn task for migration");
+          }
+          writeId = Long.parseLong(writeIdPara);
+        } else {
+          writeId = crtTbl.getReplWriteId();
+        }
+      }
+
       // replace-mode creates are really alters using CreateTableDesc.
       db.alterTable(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), 
tbl, false, null,
               true, writeId);

http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index c1cc633..ca4391f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -375,6 +376,17 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
 
         checkFileFormats(db, tbd, table);
 
+        // for transactional table if write id is not set during replication 
from a cluster with STRICT_MANAGED set
+        // to false then set it now.
+        if (tbd.getWriteId() <= 0 && 
AcidUtils.isTransactionalTable(table.getParameters())) {
+          String writeId = conf.get(ReplUtils.REPL_CURRENT_TBL_WRITE_ID);
+          if (writeId == null) {
+            throw new HiveException("MoveTask : Write id is not set in the 
config by open txn task for migration");
+          }
+          tbd.setWriteId(Long.parseLong(writeId));
+          
tbd.setStmtId(driverContext.getCtx().getHiveTxnManager().getStmtIdAndIncrement());
+        }
+
         boolean isFullAcidOp = work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID
             && !tbd.isMmTable(); //it seems that LoadTableDesc has 
Operation.INSERT only for CTAS...
 

http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
index c2953c5..c91b78e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
@@ -18,10 +18,12 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -33,6 +35,7 @@ import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import java.util.List;
+import org.apache.hadoop.hive.common.ValidTxnList;
 
 /**
  * ReplTxnTask.
@@ -91,12 +94,38 @@ public class ReplTxnTask extends Task<ReplTxnWork> {
         LOG.info("Replayed OpenTxn Event for policy " + replPolicy + " with 
srcTxn " +
                 work.getTxnIds().toString() + " and target txn id " + 
txnIds.toString());
         return 0;
+      case REPL_MIGRATION_OPEN_TXN:
+          // if transaction is already opened (mostly by repl load command), 
then close it.
+          if (txnManager.isTxnOpen()) {
+            long txnId = txnManager.getCurrentTxnId();
+            txnManager.commitTxn();
+            LOG.info("Committed txn from REPL_MIGRATION_OPEN_TXN : " + txnId);
+          }
+          Long txnIdMigration = txnManager.openTxn(driverContext.getCtx(), 
user);
+          long writeId = txnManager.getTableWriteId(work.getDbName(), 
work.getTableName());
+          String validTxnList = txnManager.getValidTxns().toString();
+          conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList);
+          conf.set(ReplUtils.REPL_CURRENT_TBL_WRITE_ID, 
Long.toString(writeId));
+          LOG.info("Started open txn for migration : " + txnIdMigration + " 
with  valid txn list : " +
+                  validTxnList + " and write id " + writeId);
+          return 0;
       case REPL_ABORT_TXN:
         for (long txnId : work.getTxnIds()) {
           txnManager.replRollbackTxn(replPolicy, txnId);
           LOG.info("Replayed AbortTxn Event for policy " + replPolicy + " with 
srcTxn " + txnId);
         }
         return 0;
+      case REPL_MIGRATION_COMMIT_TXN:
+          assert (work.getReplLastIdInfo() != null);
+          long txnIdMigrationCommit = txnManager.getCurrentTxnId();
+          CommitTxnRequest commitTxnRequestMigr = new 
CommitTxnRequest(txnIdMigrationCommit);
+          commitTxnRequestMigr.setReplLastIdInfo(work.getReplLastIdInfo());
+          txnManager.replCommitTxn(commitTxnRequestMigr);
+          conf.unset(ValidTxnList.VALID_TXNS_KEY);
+          conf.unset(ReplUtils.REPL_CURRENT_TBL_WRITE_ID);
+          LOG.info("Committed Migration Txn with replLastIdInfo: " + 
work.getReplLastIdInfo() + " for txnId: " +
+                  txnIdMigrationCommit);
+          return 0;
       case REPL_COMMIT_TXN:
         // Currently only one commit txn per event is supported.
         assert (work.getTxnIds().size() == 1);
@@ -145,4 +174,8 @@ public class ReplTxnTask extends Task<ReplTxnWork> {
   public String getName() {
     return "REPL_TRANSACTION";
   }
+
+  public ReplTxnWork.OperationType getOperationType() {
+    return work.getOperationType();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
index d203ae4..357c693 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -18,13 +18,17 @@
 package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
@@ -33,14 +37,18 @@ import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration;
 
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
+import static 
org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.getHiveUpdater;
+
 public class FSTableEvent implements TableEvent {
   private final Path fromPath;
   private final MetaData metadata;
+  private final HiveConf hiveConf;
 
   FSTableEvent(HiveConf hiveConf, String metadataDir) {
     try {
@@ -48,6 +56,7 @@ public class FSTableEvent implements TableEvent {
       fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), 
fromURI.getPath());
       FileSystem fs = FileSystem.get(fromURI, hiveConf);
       metadata = EximUtil.readMetaData(fs, new Path(fromPath, 
EximUtil.METADATA_NAME));
+      this.hiveConf = hiveConf;
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -67,9 +76,30 @@ public class FSTableEvent implements TableEvent {
   public ImportTableDesc tableDesc(String dbName) throws SemanticException {
     try {
       Table table = new Table(metadata.getTable());
-      ImportTableDesc tableDesc =
-          new ImportTableDesc(StringUtils.isBlank(dbName) ? table.getDbName() 
: dbName, table);
+      // The table can be non acid in case of replication from 2.6 cluster.
+      if (!AcidUtils.isTransactionalTable(table)
+              && 
hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES)
+              && (table.getTableType() == TableType.MANAGED_TABLE)) {
+        Hive hiveDb = Hive.get(hiveConf);
+        //TODO : dump metadata should be read to make sure that migration is 
required.
+        HiveStrictManagedMigration.TableMigrationOption migrationOption
+                = 
HiveStrictManagedMigration.determineMigrationTypeAutomatically(table.getTTable(),
+                table.getTableType(),null, (Configuration)hiveConf,
+                hiveDb.getMSC(),true);
+        HiveStrictManagedMigration.migrateTable(table.getTTable(), 
table.getTableType(),
+                migrationOption, false,
+                getHiveUpdater(hiveConf), hiveDb.getMSC(), 
(Configuration)hiveConf);
+        // If the conversion is from non transactional to transactional table
+        if (AcidUtils.isTransactionalTable(table)) {
+          replicationSpec().setMigratingToTxnTable();
+        }
+      }
+      ImportTableDesc tableDesc
+              = new ImportTableDesc(StringUtils.isBlank(dbName) ? 
table.getDbName() : dbName, table);
       tableDesc.setReplicationSpec(replicationSpec());
+      if (table.getTableType() == TableType.EXTERNAL_TABLE) {
+        tableDesc.setExternal(true);
+      }
       return tableDesc;
     } catch (Exception e) {
       throw new SemanticException(e);

http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 172b4ac..8102997 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -229,12 +229,12 @@ public class LoadPartitions {
 
     // if move optimization is enabled, copy the files directly to the target 
path. No need to create the staging dir.
     LoadFileType loadFileType;
-    if (event.replicationSpec().isInReplicationScope() &&
+    if (event.replicationSpec().isInReplicationScope() && 
!event.replicationSpec().isMigratingToTxnTable() &&
             context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
       loadFileType = LoadFileType.IGNORE;
     } else {
-      loadFileType =
-              event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : 
LoadFileType.OVERWRITE_EXISTING;
+      loadFileType = event.replicationSpec().isReplace() ? 
LoadFileType.REPLACE_ALL :
+        event.replicationSpec().isMigratingToTxnTable() ? 
LoadFileType.KEEP_EXISTING : LoadFileType.OVERWRITE_EXISTING;
       tmpPath = 
PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, 
context.pathInfo);
     }
 
@@ -283,11 +283,23 @@ public class LoadPartitions {
                                     LoadFileType loadFileType) {
     MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, 
null, false);
     if (AcidUtils.isTransactionalTable(table)) {
-      LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
-              Collections.singletonList(tmpPath),
-              Collections.singletonList(new Path(partSpec.getLocation())),
-              true, null, null);
-      moveWork.setMultiFilesDesc(loadFilesWork);
+      if (event.replicationSpec().isMigratingToTxnTable()) {
+        // Write-id is hardcoded to 1 so that for migration, we just move all 
original files under delta_1_1 dir.
+        // It is used only for transactional tables created after migrating 
from non-ACID table.
+        // ReplTxnTask added earlier in the DAG ensure that the write-id is 
made valid in HMS metadata.
+        LoadTableDesc loadTableWork = new LoadTableDesc(
+                tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(),
+                loadFileType, 1L
+        );
+        loadTableWork.setInheritTableSpecs(false);
+        moveWork.setLoadTableWork(loadTableWork);
+      } else {
+        LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
+                Collections.singletonList(tmpPath),
+                Collections.singletonList(new Path(partSpec.getLocation())),
+                true, null, null);
+        moveWork.setMultiFilesDesc(loadFilesWork);
+      }
     } else {
       LoadTableDesc loadTableWork = new LoadTableDesc(
               tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(),

http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 8538463..1d454fd 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -54,6 +56,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -193,6 +196,17 @@ public class LoadTable {
       Task<?> replTxnTask = TaskFactory.get(replTxnWork, context.hiveConf);
       parentTask.addDependentTask(replTxnTask);
       parentTask = replTxnTask;
+    } else if (replicationSpec.isMigratingToTxnTable()) {
+      // Non-transactional table is converted to transactional table.
+      // The write-id 1 is used to copy data for the given table and also no 
writes are aborted.
+      ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(
+              AcidUtils.getFullTableName(tblDesc.getDatabaseName(), 
tblDesc.getTableName()),
+              new long[0], new BitSet(), 1);
+      ReplTxnWork replTxnWork = new ReplTxnWork(tblDesc.getDatabaseName(), 
tblDesc.getTableName(), null,
+              validWriteIdList.writeToString(), 
ReplTxnWork.OperationType.REPL_WRITEID_STATE);
+      Task<?> replTxnTask = TaskFactory.get(replTxnWork, context.hiveConf);
+      parentTask.addDependentTask(replTxnTask);
+      parentTask = replTxnTask;
     }
     if (!isPartitioned(tblDesc)) {
       LOG.debug("adding dependent ReplTxnTask/CopyWork/MoveWork for table");
@@ -223,12 +237,12 @@ public class LoadTable {
 
     // if move optimization is enabled, copy the files directly to the target 
path. No need to create the staging dir.
     LoadFileType loadFileType;
-    if (replicationSpec.isInReplicationScope() &&
+    if (replicationSpec.isInReplicationScope() && 
!replicationSpec.isMigratingToTxnTable() &&
             context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
       loadFileType = LoadFileType.IGNORE;
     } else {
-      loadFileType =
-              replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : 
LoadFileType.OVERWRITE_EXISTING;
+      loadFileType = replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL :
+              replicationSpec.isMigratingToTxnTable() ? 
LoadFileType.KEEP_EXISTING : LoadFileType.OVERWRITE_EXISTING;
       tmpPath = PathUtils.getExternalTmpPath(tgtPath, context.pathInfo);
     }
 
@@ -241,11 +255,22 @@ public class LoadTable {
 
     MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, 
null, false);
     if (AcidUtils.isTransactionalTable(table)) {
-      LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
-              Collections.singletonList(tmpPath),
-              Collections.singletonList(tgtPath),
-              true, null, null);
-      moveWork.setMultiFilesDesc(loadFilesWork);
+      if (replicationSpec.isMigratingToTxnTable()) {
+        // Write-id is hardcoded to 1 so that for migration, we just move all 
original files under delta_1_1 dir.
+        // However, it unused if it is non-ACID table.
+        // ReplTxnTask added earlier in the DAG ensure that the write-id is 
made valid in HMS metadata.
+        LoadTableDesc loadTableWork = new LoadTableDesc(
+                tmpPath, Utilities.getTableDesc(table), new TreeMap<>(),
+                loadFileType, 1L
+        );
+        moveWork.setLoadTableWork(loadTableWork);
+      } else {
+        LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
+                Collections.singletonList(tmpPath),
+                Collections.singletonList(tgtPath),
+                true, null, null);
+        moveWork.setMultiFilesDesc(loadFilesWork);
+      }
     } else {
       LoadTableDesc loadTableWork = new LoadTableDesc(
               tmpPath, Utilities.getTableDesc(table), new TreeMap<>(),

http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index 5f15998..ae6411d 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -21,7 +21,10 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ReplLastIdInfo;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -47,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
+import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 import org.slf4j.Logger;
 
@@ -233,17 +237,47 @@ public class IncrementalLoadTasksBuilder {
     return addUpdateReplStateTasks(StringUtils.isEmpty(context.tableName), 
messageHandler.getUpdatedMetadata(), tasks);
   }
 
+  private Task<? extends Serializable> getMigrationCommitTxnTask(String 
dbName, String tableName,
+                                                    List<Map <String, String>> 
partSpec, String replState,
+                                                    boolean needUpdateDBReplId,
+                                                    Task<? extends 
Serializable> preCursor) throws SemanticException {
+    ReplLastIdInfo replLastIdInfo = new ReplLastIdInfo(dbName, 
Long.parseLong(replState));
+    replLastIdInfo.setTable(tableName);
+    replLastIdInfo.setNeedUpdateDBReplId(needUpdateDBReplId);
+    if (partSpec != null && !partSpec.isEmpty()) {
+      List<String> partitionList = new ArrayList<>();
+      for (Map <String, String> part : partSpec) {
+        try {
+          partitionList.add(Warehouse.makePartName(part, false));
+        } catch (MetaException e) {
+          throw new SemanticException(e.getMessage());
+        }
+      }
+      replLastIdInfo.setPartitionList(partitionList);
+    }
+
+    Task<? extends Serializable> updateReplIdTxnTask = TaskFactory.get(new 
ReplTxnWork(replLastIdInfo, ReplTxnWork
+            .OperationType.REPL_MIGRATION_COMMIT_TXN), conf);
+
+    if (preCursor != null) {
+      preCursor.addDependentTask(updateReplIdTxnTask);
+      log.debug("Added {}:{} as a precursor of {}:{}", preCursor.getClass(), 
preCursor.getId(),
+              updateReplIdTxnTask.getClass(), updateReplIdTxnTask.getId());
+    }
+    return updateReplIdTxnTask;
+  }
+
   private Task<? extends Serializable> tableUpdateReplStateTask(String dbName, 
String tableName,
                                                     Map<String, String> 
partSpec, String replState,
                                                     Task<? extends 
Serializable> preCursor) throws SemanticException {
     HashMap<String, String> mapProp = new HashMap<>();
     mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
 
-    AlterTableDesc alterTblDesc =  new AlterTableDesc(
+    AlterTableDesc alterTblDesc = new AlterTableDesc(
             AlterTableDesc.AlterTableTypes.ADDPROPS, new 
ReplicationSpec(replState, replState));
     alterTblDesc.setProps(mapProp);
     alterTblDesc.setOldName(StatsUtils.getFullyQualifiedTableName(dbName, 
tableName));
-    alterTblDesc.setPartSpec((HashMap<String, String>)partSpec);
+    alterTblDesc.setPartSpec((HashMap<String, String>) partSpec);
 
     Task<? extends Serializable> updateReplIdTask = TaskFactory.get(new 
DDLWork(inputs, outputs, alterTblDesc), conf);
 
@@ -283,8 +317,17 @@ public class IncrementalLoadTasksBuilder {
       return importTasks;
     }
 
+    boolean needCommitTx = updatedMetaDataTracker.isNeedCommitTxn();
+    // In migration flow, we should have only one table update per event.
+    if (needCommitTx) {
+      // currently, only commit txn event can have updates in multiple table. 
Commit txn does not starts
+      // a txn and thus needCommitTx must have set to false.
+      assert updatedMetaDataTracker.getUpdateMetaDataList().size() <= 1;
+    }
+
     // Create a barrier task for dependency collection of import tasks
     Task<? extends Serializable> barrierTask = TaskFactory.get(new 
DependencyCollectionWork(), conf);
+
     List<Task<? extends Serializable>> tasks = new ArrayList<>();
     Task<? extends Serializable> updateReplIdTask;
 
@@ -292,21 +335,43 @@ public class IncrementalLoadTasksBuilder {
       String replState = updateMetaData.getReplState();
       String dbName = updateMetaData.getDbName();
       String tableName = updateMetaData.getTableName();
+
       // If any partition is updated, then update repl state in partition 
object
-      for (final Map<String, String> partSpec : 
updateMetaData.getPartitionsList()) {
-        updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, 
partSpec, replState, barrierTask);
-        tasks.add(updateReplIdTask);
+      if (needCommitTx) {
+        if (updateMetaData.getPartitionsList().size() > 0) {
+          updateReplIdTask = getMigrationCommitTxnTask(dbName, tableName,
+                  updateMetaData.getPartitionsList(), replState, 
isDatabaseLoad, barrierTask);
+          tasks.add(updateReplIdTask);
+          // commit txn task will update repl id for table and database also.
+          break;
+        }
+      } else {
+        for (final Map<String, String> partSpec : 
updateMetaData.getPartitionsList()) {
+          updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, 
partSpec, replState, barrierTask);
+          tasks.add(updateReplIdTask);
+        }
       }
 
+      // If any table/partition is updated, then update repl state in table 
object
       if (tableName != null) {
-        // If any table/partition is updated, then update repl state in table 
object
+        if (needCommitTx) {
+          updateReplIdTask = getMigrationCommitTxnTask(dbName, tableName, null,
+                  replState, isDatabaseLoad, barrierTask);
+          tasks.add(updateReplIdTask);
+          // commit txn task will update repl id for database also.
+          break;
+        }
         updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, 
replState, barrierTask);
         tasks.add(updateReplIdTask);
       }
 
-      // For table level load, need not update replication state for the 
database
-      if (isDatabaseLoad) {
-        // If any table/partition is updated, then update repl state in db 
object
+      // If any table/partition is updated, then update repl state in db object
+      if (needCommitTx) {
+        updateReplIdTask = getMigrationCommitTxnTask(dbName, null, null,
+                replState, isDatabaseLoad, barrierTask);
+        tasks.add(updateReplIdTask);
+      } else if (isDatabaseLoad) {
+        // For table level load, need not update replication state for the 
database
         updateReplIdTask = dbUpdateReplStateTask(dbName, replState, 
barrierTask);
         tasks.add(updateReplIdTask);
       }

Reply via email to