http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java index 314ca48..b71cfa4 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java @@ -17,13 +17,21 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; +import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import org.apache.hadoop.hive.shims.Utils; - +import org.apache.hadoop.hive.ql.parse.WarehouseInstance; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; +import static org.apache.hadoop.hive.ql.io.AcidUtils.isFullAcidTable; +import static org.apache.hadoop.hive.ql.io.AcidUtils.isTransactionalTable; + +import org.apache.hadoop.hive.ql.parse.ReplicationTestUtils; import org.junit.rules.TestName; @@ -40,7 +48,9 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import com.google.common.collect.Lists; /** @@ -52,14 +62,9 @@ public class TestReplicationScenariosIncrementalLoadAcidTables { protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenariosIncrementalLoadAcidTables.class); static WarehouseInstance primary; - private static WarehouseInstance replica, replicaNonAcid; + private static WarehouseInstance replica, replicaNonAcid, replicaMigration, primaryMigration; private static HiveConf conf; private String primaryDbName, replicatedDbName, primaryDbNameExtra; - private enum OperationType { - REPL_TEST_ACID_INSERT, REPL_TEST_ACID_INSERT_SELECT, REPL_TEST_ACID_CTAS, - REPL_TEST_ACID_INSERT_OVERWRITE, REPL_TEST_ACID_INSERT_IMPORT, REPL_TEST_ACID_INSERT_LOADLOCAL, - REPL_TEST_ACID_INSERT_UNION - } @BeforeClass public static void classLevelSetup() throws Exception { @@ -67,7 +72,7 @@ public class TestReplicationScenariosIncrementalLoadAcidTables { overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), GzipJSONMessageEncoder.class.getCanonicalName()); - internalBeforeClassSetup(overrides, TestReplicationScenariosAcidTables.class); + internalBeforeClassSetup(overrides, TestReplicationScenariosIncrementalLoadAcidTables.class); } static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz) @@ -100,6 +105,36 @@ public class TestReplicationScenariosIncrementalLoadAcidTables { put("hive.metastore.client.capability.check", "false"); }}; replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1); + + HashMap<String, String> overridesForHiveConfReplicaMigration = new HashMap<String, String>() {{ + put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); + put("hive.support.concurrency", "true"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.strict.managed.tables", "true"); + }}; + replicaMigration = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConfReplicaMigration); + + HashMap<String, String> overridesForHiveConfPrimaryMigration = new HashMap<String, String>() {{ + put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.support.concurrency", "false"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); + put("hive.strict.managed.tables", "false"); + }}; + primaryMigration = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConfPrimaryMigration); } @AfterClass @@ -138,231 +173,70 @@ public class TestReplicationScenariosIncrementalLoadAcidTables { String tableName = testName.getMethodName() + "testInsert"; String tableNameMM = tableName + "_MM"; - appendInsert(tableName, tableNameMM, selectStmtList, expectedValues); - appendDelete(selectStmtList, expectedValues); - appendUpdate(selectStmtList, expectedValues); - appendTruncate(selectStmtList, expectedValues); - appendInsertIntoFromSelect(tableName, tableNameMM, selectStmtList, expectedValues); - appendMerge(selectStmtList, expectedValues); - appendCreateAsSelect(tableName, tableNameMM, selectStmtList, expectedValues); - appendImport(tableName, tableNameMM, selectStmtList, expectedValues); - appendInsertOverwrite(tableName, tableNameMM, selectStmtList, expectedValues); - appendLoadLocal(tableName, tableNameMM, selectStmtList, expectedValues); - appendInsertUnion(tableName, tableNameMM, selectStmtList, expectedValues); - appendMultiStatementTxn(selectStmtList, expectedValues); - appendMultiStatementTxnUpdateDelete(selectStmtList, expectedValues); - appendAlterTable(selectStmtList, expectedValues); - - verifyIncrementalLoad(selectStmtList, expectedValues, bootStrapDump.lastReplicationId); - } - - private void appendInsert(String tableName, String tableNameMM, + ReplicationTestUtils.appendInsert(primary, primaryDbName, primaryDbNameExtra, tableName, + tableNameMM, selectStmtList, expectedValues); + appendDelete(primary, primaryDbName, primaryDbNameExtra, selectStmtList, expectedValues); + appendUpdate(primary, primaryDbName, primaryDbNameExtra, selectStmtList, expectedValues); + ReplicationTestUtils.appendTruncate(primary, primaryDbName, primaryDbNameExtra, + selectStmtList, expectedValues); + ReplicationTestUtils.appendInsertIntoFromSelect(primary, primaryDbName, primaryDbNameExtra, + tableName, tableNameMM, selectStmtList, expectedValues); + ReplicationTestUtils.appendMerge(primary, primaryDbName, primaryDbNameExtra, selectStmtList, expectedValues); + ReplicationTestUtils.appendCreateAsSelect(primary, primaryDbName, primaryDbNameExtra, tableName, + tableNameMM, selectStmtList, expectedValues); + ReplicationTestUtils.appendImport(primary, primaryDbName, primaryDbNameExtra, tableName, + tableNameMM, selectStmtList, expectedValues); + ReplicationTestUtils.appendInsertOverwrite(primary, primaryDbName, primaryDbNameExtra, tableName, + tableNameMM, selectStmtList, expectedValues); + ReplicationTestUtils.appendLoadLocal(primary, primaryDbName, primaryDbNameExtra, tableName, + tableNameMM, selectStmtList, expectedValues); + ReplicationTestUtils.appendInsertUnion(primary, primaryDbName, primaryDbNameExtra, tableName, + tableNameMM, selectStmtList, expectedValues); + ReplicationTestUtils.appendMultiStatementTxn(primary, primaryDbName, primaryDbNameExtra, + selectStmtList, expectedValues); + appendMultiStatementTxnUpdateDelete(primary, primaryDbName, primaryDbNameExtra, selectStmtList, expectedValues); + ReplicationTestUtils.appendAlterTable(primary, primaryDbName, primaryDbNameExtra, selectStmtList, expectedValues); + + verifyIncrementalLoadInt(selectStmtList, expectedValues, bootStrapDump.lastReplicationId); + } + + private void appendDelete(WarehouseInstance primary, String primaryDbName, String primaryDbNameExtra, List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { - insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); - selectStmtList.add("select key from " + tableName + " order by key"); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT); - selectStmtList.add("select key from " + tableNameMM + " order by key"); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - } - - private void appendDelete(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { - String tableName = testName.getMethodName() + "testDelete"; - insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); + String tableName = "testDelete"; + ReplicationTestUtils.insertRecords(primary, primaryDbName, primaryDbNameExtra, + tableName, null, false, ReplicationTestUtils.OperationType.REPL_TEST_ACID_INSERT); deleteRecords(tableName); selectStmtList.add("select count(*) from " + tableName); expectedValues.add(new String[] {"0"}); } - private void appendUpdate(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { - String tableName = testName.getMethodName() + "testUpdate"; - insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); + private void appendUpdate(WarehouseInstance primary, String primaryDbName, String primaryDbNameExtra, + List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { + String tableName = "testUpdate"; + ReplicationTestUtils.insertRecords(primary, primaryDbName, primaryDbNameExtra, + tableName, null, false, ReplicationTestUtils.OperationType.REPL_TEST_ACID_INSERT); updateRecords(tableName); selectStmtList.add("select value from " + tableName + " order by value"); expectedValues.add(new String[] {"1", "100", "100", "100", "100"}); } - private void appendTruncate(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { - String tableName = testName.getMethodName() + "testTruncate"; - String tableNameMM = tableName + "_MM"; - - insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); - truncateTable(primaryDbName, tableName); - selectStmtList.add("select count(*) from " + tableName); - expectedValues.add(new String[] {"0"}); - selectStmtList.add("select count(*) from " + tableName + "_nopart"); - expectedValues.add(new String[] {"0"}); - - insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT); - truncateTable(primaryDbName, tableNameMM); - selectStmtList.add("select count(*) from " + tableNameMM); - expectedValues.add(new String[] {"0"}); - selectStmtList.add("select count(*) from " + tableNameMM + "_nopart"); - expectedValues.add(new String[] {"0"}); - } - - private void appendAlterTable(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { - String tableName = testName.getMethodName() + "testAlterTable"; - String tableNameMM = tableName + "_MM"; - - insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); - primary.run("use " + primaryDbName) - .run("alter table " + tableName + " change value value1 int ") - .run("select value1 from " + tableName) - .verifyResults(new String[]{"1", "2", "3", "4", "5"}) - .run("alter table " + tableName + "_nopart change value value1 int ") - .run("select value1 from " + tableName + "_nopart") - .verifyResults(new String[]{"1", "2", "3", "4", "5"}); - selectStmtList.add("select value1 from " + tableName ); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - selectStmtList.add("select value1 from " + tableName + "_nopart"); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - - insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT); - primary.run("use " + primaryDbName) - .run("alter table " + tableNameMM + " change value value1 int ") - .run("select value1 from " + tableNameMM) - .verifyResults(new String[]{"1", "2", "3", "4", "5"}) - .run("alter table " + tableNameMM + "_nopart change value value1 int ") - .run("select value1 from " + tableNameMM + "_nopart") - .verifyResults(new String[]{"1", "2", "3", "4", "5"}); - selectStmtList.add("select value1 from " + tableNameMM ); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - selectStmtList.add("select value1 from " + tableNameMM + "_nopart"); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - } - - private void appendInsertIntoFromSelect(String tableName, String tableNameMM, - List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { - String tableNameSelect = testName.getMethodName() + "_Select"; - String tableNameSelectMM = testName.getMethodName() + "_SelectMM"; - - insertRecords(tableName, tableNameSelect, false, OperationType.REPL_TEST_ACID_INSERT_SELECT); - selectStmtList.add("select key from " + tableNameSelect + " order by key"); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - - insertRecords(tableNameMM, tableNameSelectMM, true, OperationType.REPL_TEST_ACID_INSERT_SELECT); - selectStmtList.add("select key from " + tableNameSelectMM + " order by key"); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - } - - private void appendMerge(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { - String tableName = testName.getMethodName() + "testMerge"; - String tableNameMerge = testName.getMethodName() + "_Merge"; - - insertForMerge(tableName, tableNameMerge, false); - selectStmtList.add("select last_update_user from " + tableName + " order by last_update_user"); - expectedValues.add(new String[] {"creation", "creation", "creation", "creation", "creation", - "creation", "creation", "merge_update", "merge_insert", "merge_insert"}); - selectStmtList.add("select ID from " + tableNameMerge + " order by ID"); - expectedValues.add(new String[] {"1", "4", "7", "8", "8", "11"}); - } - - private void appendCreateAsSelect(String tableName, String tableNameMM, - List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { - String tableNameCTAS = testName.getMethodName() + "_CTAS"; - String tableNameCTASMM = testName.getMethodName() + "_CTASMM"; - - insertRecords(tableName, tableNameCTAS, false, OperationType.REPL_TEST_ACID_CTAS); - selectStmtList.add("select key from " + tableNameCTAS + " order by key"); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - - insertRecords(tableNameMM, tableNameCTASMM, true, OperationType.REPL_TEST_ACID_CTAS); - selectStmtList.add("select key from " + tableNameCTASMM + " order by key"); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - } - - private void appendImport(String tableName, String tableNameMM, - List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { - String tableNameImport = testName.getMethodName() + "_Import"; - String tableNameImportMM = testName.getMethodName() + "_ImportMM"; - - insertRecords(tableName, tableNameImport, false, OperationType.REPL_TEST_ACID_INSERT_IMPORT); - selectStmtList.add("select key from " + tableNameImport + " order by key"); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - - insertRecords(tableNameMM, tableNameImportMM, true, OperationType.REPL_TEST_ACID_INSERT_IMPORT); - selectStmtList.add("select key from " + tableNameImportMM + " order by key"); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - } - - private void appendInsertOverwrite(String tableName, String tableNameMM, - List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { - String tableNameOW = tableName + "_OW"; - String tableNameOWMM = testName.getMethodName() +"_OWMM"; - - insertRecords(tableName, tableNameOW, false, OperationType.REPL_TEST_ACID_INSERT_OVERWRITE); - selectStmtList.add("select key from " + tableNameOW + " order by key"); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - - insertRecords(tableNameMM, tableNameOWMM, true, OperationType.REPL_TEST_ACID_INSERT_OVERWRITE); - selectStmtList.add("select key from " + tableNameOWMM + " order by key"); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - } - - //TODO: need to check why its failing. Loading to acid table from local path is failing. - private void appendLoadLocal(String tableName, String tableNameMM, - List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { - String tableNameLL = testName.getMethodName() +"_LL"; - String tableNameLLMM = testName.getMethodName() +"_LLMM"; - - insertRecords(tableName, tableNameLL, false, OperationType.REPL_TEST_ACID_INSERT_LOADLOCAL); - selectStmtList.add("select key from " + tableNameLL + " order by key"); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - - insertRecords(tableNameMM, tableNameLLMM, true, OperationType.REPL_TEST_ACID_INSERT_LOADLOCAL); - selectStmtList.add("select key from " + tableNameLLMM + " order by key"); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - } - - private void appendInsertUnion(String tableName, String tableNameMM, - List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { - String tableNameUnion = testName.getMethodName() +"_UNION"; - String tableNameUnionMM = testName.getMethodName() +"_UNIONMM"; - String[] resultArrayUnion = new String[]{"1", "1", "2", "2", "3", "3", "4", "4", "5", "5"}; - - insertRecords(tableName, tableNameUnion, false, OperationType.REPL_TEST_ACID_INSERT_UNION); - selectStmtList.add( "select key from " + tableNameUnion + " order by key"); - expectedValues.add(resultArrayUnion); - selectStmtList.add("select key from " + tableNameUnion + "_nopart" + " order by key"); - expectedValues.add(resultArrayUnion); - - insertRecords(tableNameMM, tableNameUnionMM, true, OperationType.REPL_TEST_ACID_INSERT_UNION); - selectStmtList.add( "select key from " + tableNameUnionMM + " order by key"); - expectedValues.add(resultArrayUnion); - selectStmtList.add("select key from " + tableNameUnionMM + "_nopart" + " order by key"); - expectedValues.add(resultArrayUnion); - } - - private void appendMultiStatementTxn(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { - String tableName = testName.getMethodName() + "testMultiStatementTxn"; - String[] resultArray = new String[]{"1", "2", "3", "4", "5"}; - String tableNameMM = tableName + "_MM"; - String tableProperty = "'transactional'='true'"; - - insertIntoDB(primaryDbName, tableName, tableProperty, resultArray, true); - selectStmtList.add("select key from " + tableName + " order by key"); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - - tableProperty = setMMtableProperty(tableProperty); - insertIntoDB(primaryDbName, tableNameMM, tableProperty, resultArray, true); - selectStmtList.add("select key from " + tableNameMM + " order by key"); - expectedValues.add(new String[]{"1", "2", "3", "4", "5"}); - } - - private void appendMultiStatementTxnUpdateDelete(List<String> selectStmtList, List<String[]> expectedValues) + private void appendMultiStatementTxnUpdateDelete(WarehouseInstance primary, String primaryDbName, String primaryDbNameExtra, + List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { - String tableName = testName.getMethodName() + "testMultiStatementTxnUpdate"; - String tableNameDelete = testName.getMethodName() + "testMultiStatementTxnDelete"; + String tableName = "testMultiStatementTxnUpdate"; + String tableNameDelete = "testMultiStatementTxnDelete"; String[] resultArray = new String[]{"1", "2", "3", "4", "5"}; String tableProperty = "'transactional'='true'"; + String tableStorage = "STORED AS ORC"; - insertIntoDB(primaryDbName, tableName, tableProperty, resultArray, true); + ReplicationTestUtils.insertIntoDB(primary, primaryDbName, tableName, tableProperty, + tableStorage, resultArray, true); updateRecords(tableName); selectStmtList.add("select value from " + tableName + " order by value"); expectedValues.add(new String[] {"1", "100", "100", "100", "100"}); - insertIntoDB(primaryDbName, tableNameDelete, tableProperty, resultArray, true); + ReplicationTestUtils.insertIntoDB(primary, primaryDbName, tableNameDelete, tableProperty, + tableStorage, resultArray, true); deleteRecords(tableNameDelete); selectStmtList.add("select count(*) from " + tableNameDelete); expectedValues.add(new String[] {"0"}); @@ -370,8 +244,8 @@ public class TestReplicationScenariosIncrementalLoadAcidTables { @Test public void testReplCM() throws Throwable { - String tableName = testName.getMethodName(); - String tableNameMM = testName.getMethodName() + "_MM"; + String tableName = "testcm"; + String tableNameMM = tableName + "_MM"; String[] result = new String[]{"5"}; WarehouseInstance.Tuple incrementalDump; @@ -380,44 +254,37 @@ public class TestReplicationScenariosIncrementalLoadAcidTables { .run("REPL STATUS " + replicatedDbName) .verifyResult(bootStrapDump.lastReplicationId); - insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT); + ReplicationTestUtils.insertRecords(primary, primaryDbName, primaryDbNameExtra, + tableName, null, false, ReplicationTestUtils.OperationType.REPL_TEST_ACID_INSERT); incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId); primary.run("drop table " + primaryDbName + "." + tableName); replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation) .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId); - verifyResultsInReplica(Lists.newArrayList("select count(*) from " + tableName, + verifyResultsInReplicaInt(Lists.newArrayList("select count(*) from " + tableName, "select count(*) from " + tableName + "_nopart"), Lists.newArrayList(result, result)); - insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT); + ReplicationTestUtils.insertRecords(primary, primaryDbName, primaryDbNameExtra, + tableNameMM, null, true, ReplicationTestUtils.OperationType.REPL_TEST_ACID_INSERT); incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId); primary.run("drop table " + primaryDbName + "." + tableNameMM); replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation) .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId); - verifyResultsInReplica(Lists.newArrayList("select count(*) from " + tableNameMM, + verifyResultsInReplicaInt(Lists.newArrayList("select count(*) from " + tableNameMM, "select count(*) from " + tableNameMM + "_nopart"), Lists.newArrayList(result, result)); } - private void verifyResultsInReplica(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { - for (int idx = 0; idx < selectStmtList.size(); idx++) { - replica.run("use " + replicatedDbName) - .run(selectStmtList.get(idx)) - .verifyResults(expectedValues.get(idx)); - } + private void verifyResultsInReplicaInt(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable { + ReplicationTestUtils.verifyResultsInReplica(replica, replicatedDbName, selectStmtList, expectedValues); } - private WarehouseInstance.Tuple verifyIncrementalLoad(List<String> selectStmtList, - List<String[]> expectedValues, String lastReplId) throws Throwable { - WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, lastReplId); - replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation) - .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId); - verifyResultsInReplica(selectStmtList, expectedValues); - replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation) - .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId); - verifyResultsInReplica(selectStmtList, expectedValues); - return incrementalDump; + private WarehouseInstance.Tuple verifyIncrementalLoadInt(List<String> selectStmtList, + List<String[]> expectedValues, String lastReplId) throws Throwable { + return ReplicationTestUtils.verifyIncrementalLoad(primary, replica, primaryDbName, + replicatedDbName, selectStmtList, expectedValues, lastReplId); + } private void deleteRecords(String tableName) throws Throwable { @@ -434,217 +301,88 @@ public class TestReplicationScenariosIncrementalLoadAcidTables { .verifyResults(new String[] {"1", "100", "100", "100", "100"}); } - private void truncateTable(String dbName, String tableName) throws Throwable { - primary.run("use " + dbName) - .run("truncate table " + tableName) - .run("select count(*) from " + tableName) - .verifyResult("0") - .run("truncate table " + tableName + "_nopart") - .run("select count(*) from " + tableName + "_nopart") - .verifyResult("0"); - } - - private WarehouseInstance.Tuple verifyLoad(String tableName, String tableNameOp, String lastReplId) throws Throwable { - String[] resultArray = new String[]{"1", "2", "3", "4", "5"}; - if (tableNameOp == null) { - return verifyIncrementalLoad(Lists.newArrayList("select key from " + tableName + " order by key", - "select key from " + tableName + "_nopart order by key"), - Lists.newArrayList(resultArray, resultArray), lastReplId); - } - return verifyIncrementalLoad(Lists.newArrayList("select key from " + tableName + " order by key", - "select key from " + tableNameOp + " order by key", - "select key from " + tableName + "_nopart" + " order by key", - "select key from " + tableNameOp + "_nopart" + " order by key"), - Lists.newArrayList(resultArray, resultArray, resultArray, resultArray), lastReplId); - } - - private void insertIntoDB(String dbName, String tableName, String tableProperty, String[] resultArray, boolean isTxn) - throws Throwable { - String txnStrStart = "START TRANSACTION"; - String txnStrCommit = "COMMIT"; - if (!isTxn) { - txnStrStart = "use " + dbName; //dummy - txnStrCommit = "use " + dbName; //dummy - } - primary.run("use " + dbName); - primary.run("CREATE TABLE " + tableName + " (key int, value int) PARTITIONED BY (load_date date) " + - "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")") - .run("SHOW TABLES LIKE '" + tableName + "'") - .verifyResult(tableName) - .run("CREATE TABLE " + tableName + "_nopart (key int, value int) " + - "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")") - .run("SHOW TABLES LIKE '" + tableName + "_nopart'") - .run("ALTER TABLE " + tableName + " ADD PARTITION (load_date='2016-03-03')") - .run(txnStrStart) - .run("INSERT INTO " + tableName + " partition (load_date='2016-03-01') VALUES (1, 1)") - .run("INSERT INTO " + tableName + " partition (load_date='2016-03-01') VALUES (2, 2)") - .run("INSERT INTO " + tableName + " partition (load_date='2016-03-02') VALUES (3, 3)") - .run("INSERT INTO " + tableName + " partition (load_date='2016-03-03') VALUES (4, 4)") - .run("INSERT INTO " + tableName + " partition (load_date='2016-03-02') VALUES (5, 5)") - .run("select key from " + tableName + " order by key") - .verifyResults(resultArray) - .run("INSERT INTO " + tableName + "_nopart (key, value) select key, value from " + tableName) - .run("select key from " + tableName + "_nopart" + " order by key") - .verifyResults(resultArray) - .run(txnStrCommit); - } - - private void insertIntoDB(String dbName, String tableName, String tableProperty, String[] resultArray) - throws Throwable { - insertIntoDB(dbName, tableName, tableProperty, resultArray, false); + private WarehouseInstance.Tuple prepareDataAndDump(String primaryDbName, String fromReplId) throws Throwable { + WarehouseInstance.Tuple tuple = primaryMigration.run("use " + primaryDbName) + .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ") + .run("insert into tacid values(1)") + .run("insert into tacid values(2)") + .run("insert into tacid values(3)") + .run("create table tacidpart (place string) partitioned by (country string) clustered by(place) " + + "into 3 buckets stored as orc ") + .run("alter table tacidpart add partition(country='france')") + .run("insert into tacidpart partition(country='india') values('mumbai')") + .run("insert into tacidpart partition(country='us') values('sf')") + .run("insert into tacidpart partition(country='france') values('paris')") + .run("create table tflat (rank int) stored as orc tblproperties(\"transactional\"=\"false\")") + .run("insert into tflat values(11)") + .run("insert into tflat values(22)") + .run("create table tflattext (id int) ") + .run("insert into tflattext values(111), (222)") + .run("create table tflattextpart (id int) partitioned by (country string) ") + .run("insert into tflattextpart partition(country='india') values(1111), (2222)") + .run("insert into tflattextpart partition(country='us') values(3333)") + .run("create table avro_table ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' " + + "stored as avro tblproperties ('avro.schema.url'='" + primaryMigration.avroSchemaFile.toUri().toString() + "')") + .run("insert into avro_table values('str1', 10)") + .dump(primaryDbName, fromReplId); + assertFalse(isTransactionalTable(primaryMigration.getTable(primaryDbName, "tacid"))); + assertFalse(isTransactionalTable(primaryMigration.getTable(primaryDbName, "tacidpart"))); + assertFalse(isTransactionalTable(primaryMigration.getTable(primaryDbName, "tflat"))); + assertFalse(isTransactionalTable(primaryMigration.getTable(primaryDbName, "tflattext"))); + assertFalse(isTransactionalTable(primaryMigration.getTable(primaryDbName, "tflattextpart"))); + Table avroTable = primaryMigration.getTable(replicatedDbName, "avro_table"); + assertFalse(isTransactionalTable(avroTable)); + assertFalse(MetaStoreUtils.isExternalTable(avroTable)); + return tuple; + } + + private void verifyLoadExecution(String replicatedDbName, String lastReplId) throws Throwable { + replicaMigration.run("use " + replicatedDbName) + .run("show tables") + .verifyResults(new String[] {"tacid", "tacidpart", "tflat", "tflattext", "tflattextpart", + "avro_table"}) + .run("repl status " + replicatedDbName) + .verifyResult(lastReplId) + .run("select id from tacid order by id") + .verifyResults(new String[]{"1", "2", "3"}) + .run("select country from tacidpart order by country") + .verifyResults(new String[] {"france", "india", "us"}) + .run("select rank from tflat order by rank") + .verifyResults(new String[] {"11", "22"}) + .run("select id from tflattext order by id") + .verifyResults(new String[] {"111", "222"}) + .run("select id from tflattextpart order by id") + .verifyResults(new String[] {"1111", "2222", "3333"}) + .run("select col1 from avro_table") + .verifyResults(new String[] {"str1"}); + + assertTrue(isFullAcidTable(replicaMigration.getTable(replicatedDbName, "tacid"))); + assertTrue(isFullAcidTable(replicaMigration.getTable(replicatedDbName, "tacidpart"))); + assertTrue(isFullAcidTable(replicaMigration.getTable(replicatedDbName, "tflat"))); + assertTrue(!isFullAcidTable(replicaMigration.getTable(replicatedDbName, "tflattext"))); + assertTrue(!isFullAcidTable(replicaMigration.getTable(replicatedDbName, "tflattextpart"))); + assertTrue(isTransactionalTable(replicaMigration.getTable(replicatedDbName, "tflattext"))); + assertTrue(isTransactionalTable(replicaMigration.getTable(replicatedDbName, "tflattextpart"))); + + Table avroTable = replicaMigration.getTable(replicatedDbName, "avro_table"); + assertTrue(MetaStoreUtils.isExternalTable(avroTable)); + Path tablePath = new PathBuilder(replicaMigration.externalTableWarehouseRoot.toString()).addDescendant(replicatedDbName + ".db") + .addDescendant("avro_table") + .build(); + assertEquals(avroTable.getSd().getLocation().toLowerCase(), tablePath.toUri().toString().toLowerCase()); } - private void insertRecords(String tableName, String tableNameOp, boolean isMMTable, - OperationType opType) throws Throwable { - insertRecordsIntoDB(primaryDbName, tableName, tableNameOp, isMMTable, opType); - } - - private void insertRecordsIntoDB(String DbName, String tableName, String tableNameOp, boolean isMMTable, - OperationType opType) throws Throwable { - String[] resultArray = new String[]{"1", "2", "3", "4", "5"}; - String tableProperty = "'transactional'='true'"; - if (isMMTable) { - tableProperty = setMMtableProperty(tableProperty); - } - primary.run("use " + DbName); - - switch (opType) { - case REPL_TEST_ACID_INSERT: - insertIntoDB(DbName, tableName, tableProperty, resultArray); - insertIntoDB(primaryDbNameExtra, tableName, tableProperty, resultArray); - return; - case REPL_TEST_ACID_INSERT_OVERWRITE: - primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) PARTITIONED BY (load_date date) " + - "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( "+ tableProperty + " )") - .run("INSERT INTO " + tableNameOp + " partition (load_date='2016-03-01') VALUES (2, 2)") - .run("INSERT INTO " + tableNameOp + " partition (load_date='2016-03-01') VALUES (10, 12)") - .run("INSERT INTO " + tableNameOp + " partition (load_date='2016-03-02') VALUES (11, 1)") - .run("select key from " + tableNameOp + " order by key") - .verifyResults(new String[]{"2", "10", "11"}) - .run("insert overwrite table " + tableNameOp + " select * from " + tableName) - .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " + - "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( "+ tableProperty + " )") - .run("INSERT INTO " + tableNameOp + "_nopart VALUES (2, 2)") - .run("INSERT INTO " + tableNameOp + "_nopart VALUES (10, 12)") - .run("INSERT INTO " + tableNameOp + "_nopart VALUES (11, 1)") - .run("select key from " + tableNameOp + "_nopart" + " order by key") - .verifyResults(new String[]{"2", "10", "11"}) - .run("insert overwrite table " + tableNameOp + "_nopart select * from " + tableName + "_nopart") - .run("select key from " + tableNameOp + "_nopart" + " order by key"); - break; - case REPL_TEST_ACID_INSERT_SELECT: - primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) PARTITIONED BY (load_date date) " + - "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + " )") - .run("insert into " + tableNameOp + " partition (load_date) select * from " + tableName) - .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " + - "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + " )") - .run("insert into " + tableNameOp + "_nopart select * from " + tableName + "_nopart"); - break; - case REPL_TEST_ACID_INSERT_IMPORT: - String path = "hdfs:///tmp/" + DbName + "/"; - String exportPath = "'" + path + tableName + "/'"; - String exportPathNoPart = "'" + path + tableName + "_nopart/'"; - primary.run("export table " + tableName + " to " + exportPath) - .run("import table " + tableNameOp + " from " + exportPath) - .run("export table " + tableName + "_nopart to " + exportPathNoPart) - .run("import table " + tableNameOp + "_nopart from " + exportPathNoPart); - break; - case REPL_TEST_ACID_CTAS: - primary.run("create table " + tableNameOp + " as select * from " + tableName) - .run("create table " + tableNameOp + "_nopart as select * from " + tableName + "_nopart"); - break; - case REPL_TEST_ACID_INSERT_LOADLOCAL: - // For simplicity setting key and value as same value - StringBuilder buf = new StringBuilder(); - boolean nextVal = false; - for (String key : resultArray) { - if (nextVal) { - buf.append(','); - } - buf.append('('); - buf.append(key); - buf.append(','); - buf.append(key); - buf.append(')'); - nextVal = true; - } - - primary.run("CREATE TABLE " + tableNameOp + "_temp (key int, value int) STORED AS ORC") - .run("INSERT INTO TABLE " + tableNameOp + "_temp VALUES " + buf.toString()) - .run("SELECT key FROM " + tableNameOp + "_temp") - .verifyResults(resultArray) - .run("CREATE TABLE " + tableNameOp + " (key int, value int) PARTITIONED BY (load_date date) " + - "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")") - .run("SHOW TABLES LIKE '" + tableNameOp + "'") - .verifyResult(tableNameOp) - .run("INSERT OVERWRITE LOCAL DIRECTORY './test.dat' STORED AS ORC SELECT * FROM " + tableNameOp + "_temp") - .run("LOAD DATA LOCAL INPATH './test.dat/000000_0' OVERWRITE INTO TABLE " + tableNameOp + - " PARTITION (load_date='2008-08-15')") - .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " + - "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")") - .run("SHOW TABLES LIKE '" + tableNameOp + "_nopart'") - .verifyResult(tableNameOp + "_nopart") - .run("LOAD DATA LOCAL INPATH './test.dat/000000_0' OVERWRITE INTO TABLE " + tableNameOp + "_nopart"); - break; - case REPL_TEST_ACID_INSERT_UNION: - primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) PARTITIONED BY (load_date date) " + - "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")") - .run("SHOW TABLES LIKE '" + tableNameOp + "'") - .verifyResult(tableNameOp) - .run("insert overwrite table " + tableNameOp + " partition (load_date) select * from " + tableName + - " union all select * from " + tableName) - .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " + - "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")") - .run("insert overwrite table " + tableNameOp + "_nopart select * from " + tableName + - "_nopart union all select * from " + tableName + "_nopart"); - resultArray = new String[]{"1", "2", "3", "4", "5", "1", "2", "3", "4", "5"}; - break; - default: - return; - } - primary.run("select key from " + tableNameOp + " order by key").verifyResults(resultArray); - primary.run("select key from " + tableNameOp + "_nopart" + " order by key").verifyResults(resultArray); - } - - private String setMMtableProperty(String tableProperty) throws Throwable { - return tableProperty.concat(", 'transactional_properties' = 'insert_only'"); - } - - private void insertForMerge(String tableName, String tableNameMerge, boolean isMMTable) throws Throwable { - String tableProperty = "'transactional'='true'"; - if (isMMTable) { - tableProperty = setMMtableProperty(tableProperty); - } - primary.run("use " + primaryDbName) - .run("CREATE TABLE " + tableName + "( ID int, TranValue string, last_update_user string) PARTITIONED BY " + - "(tran_date string) CLUSTERED BY (ID) into 5 buckets STORED AS ORC TBLPROPERTIES " + - " ( "+ tableProperty + " )") - .run("SHOW TABLES LIKE '" + tableName + "'") - .verifyResult(tableName) - .run("CREATE TABLE " + tableNameMerge + " ( ID int, TranValue string, tran_date string) STORED AS ORC ") - .run("SHOW TABLES LIKE '" + tableNameMerge + "'") - .verifyResult(tableNameMerge) - .run("INSERT INTO " + tableName + " PARTITION (tran_date) VALUES (1, 'value_01', 'creation', '20170410')," + - " (2, 'value_02', 'creation', '20170410'), (3, 'value_03', 'creation', '20170410'), " + - " (4, 'value_04', 'creation', '20170410'), (5, 'value_05', 'creation', '20170413'), " + - " (6, 'value_06', 'creation', '20170413'), (7, 'value_07', 'creation', '20170413'), " + - " (8, 'value_08', 'creation', '20170413'), (9, 'value_09', 'creation', '20170413'), " + - " (10, 'value_10','creation', '20170413')") - .run("select ID from " + tableName + " order by ID") - .verifyResults(new String[] {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}) - .run("INSERT INTO " + tableNameMerge + " VALUES (1, 'value_01', '20170410'), " + - " (4, NULL, '20170410'), (7, 'value_77777', '20170413'), " + - " (8, NULL, '20170413'), (8, 'value_08', '20170415'), " + - "(11, 'value_11', '20170415')") - .run("select ID from " + tableNameMerge + " order by ID") - .verifyResults(new String[] {"1", "4", "7", "8", "8", "11"}) - .run("MERGE INTO " + tableName + " AS T USING " + tableNameMerge + " AS S ON T.ID = S.ID and" + - " T.tran_date = S.tran_date WHEN MATCHED AND (T.TranValue != S.TranValue AND S.TranValue " + - " IS NOT NULL) THEN UPDATE SET TranValue = S.TranValue, last_update_user = " + - " 'merge_update' WHEN MATCHED AND S.TranValue IS NULL THEN DELETE WHEN NOT MATCHED " + - " THEN INSERT VALUES (S.ID, S.TranValue,'merge_insert', S.tran_date)") - .run("select last_update_user from " + tableName + " order by last_update_user") - .verifyResults(new String[] {"creation", "creation", "creation", "creation", "creation", - "creation", "creation", "merge_update", "merge_insert", "merge_insert"}); + @Test + public void testMigrationManagedToAcid() throws Throwable { + WarehouseInstance.Tuple tupleForBootStrap = primaryMigration.dump(primaryDbName, null); + WarehouseInstance.Tuple tuple = prepareDataAndDump(primaryDbName, null); + WarehouseInstance.Tuple tupleForIncremental = primaryMigration.dump(primaryDbName, tupleForBootStrap.lastReplicationId); + replicaMigration.loadWithoutExplain(replicatedDbName, tuple.dumpLocation); + verifyLoadExecution(replicatedDbName, tuple.lastReplicationId); + + replicaMigration.run("drop database if exists " + replicatedDbName + " cascade"); + replicaMigration.loadWithoutExplain(replicatedDbName, tupleForBootStrap.dumpLocation); + replicaMigration.loadWithoutExplain(replicatedDbName, tupleForIncremental.dumpLocation); + verifyLoadExecution(replicatedDbName, tupleForIncremental.lastReplicationId); } }
http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 7900779..92f2456 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -55,8 +55,11 @@ import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule; import org.apache.hive.hcatalog.listener.DbNotificationListener; import org.codehaus.plexus.util.ExceptionUtils; import org.slf4j.Logger; +import org.apache.hadoop.hive.ql.exec.Utilities; import java.io.Closeable; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.net.URI; import java.util.ArrayList; @@ -79,10 +82,13 @@ public class WarehouseInstance implements Closeable { MiniDFSCluster miniDFSCluster; private HiveMetaStoreClient client; public final Path warehouseRoot; + public final Path externalTableWarehouseRoot; + public Path avroSchemaFile; private static int uniqueIdentifier = 0; private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName(); + private final static String AVRO_SCHEMA_FILE_NAME = "avro_table.avsc"; WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map<String, String> overridesForHiveConf, String keyNameForEncryptedZone) throws Exception { @@ -93,12 +99,15 @@ public class WarehouseInstance implements Closeable { DistributedFileSystem fs = miniDFSCluster.getFileSystem(); warehouseRoot = mkDir(fs, "/warehouse" + uniqueIdentifier); + externalTableWarehouseRoot = mkDir(fs, "/external" + uniqueIdentifier); if (StringUtils.isNotEmpty(keyNameForEncryptedZone)) { fs.createEncryptionZone(warehouseRoot, keyNameForEncryptedZone); + fs.createEncryptionZone(externalTableWarehouseRoot, keyNameForEncryptedZone); } Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier); this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString(); - initialize(cmRootPath.toString(), warehouseRoot.toString(), overridesForHiveConf); + initialize(cmRootPath.toString(), warehouseRoot.toString(), externalTableWarehouseRoot.toString(), + overridesForHiveConf); } WarehouseInstance(Logger logger, MiniDFSCluster cluster, @@ -106,7 +115,7 @@ public class WarehouseInstance implements Closeable { this(logger, cluster, overridesForHiveConf, null); } - private void initialize(String cmRoot, String warehouseRoot, + private void initialize(String cmRoot, String warehouseRoot, String externalTableWarehouseRoot, Map<String, String> overridesForHiveConf) throws Exception { hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), TestReplicationScenarios.class); for (Map.Entry<String, String> entry : overridesForHiveConf.entrySet()) { @@ -126,6 +135,7 @@ public class WarehouseInstance implements Closeable { // hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, hiveInTest); // turn on db notification listener on meta store hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, warehouseRoot); + hiveConf.setVar(HiveConf.ConfVars.HIVE_METASTORE_WAREHOUSE_EXTERNAL, externalTableWarehouseRoot); hiveConf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS, LISTENER_CLASS); hiveConf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); hiveConf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); @@ -138,7 +148,6 @@ public class WarehouseInstance implements Closeable { hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); if (!hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER).equals("org.apache.hadoop.hive.ql.lockmgr.DbTxnManager")) { hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); } @@ -153,6 +162,7 @@ public class WarehouseInstance implements Closeable { FileSystem testPathFileSystem = FileSystem.get(testPath.toUri(), hiveConf); testPathFileSystem.mkdirs(testPath); + avroSchemaFile = createAvroSchemaFile(testPathFileSystem, testPath); driver = DriverFactory.newDriver(hiveConf); SessionState.start(new CliSessionState(hiveConf)); client = new HiveMetaStoreClient(hiveConf); @@ -171,6 +181,49 @@ public class WarehouseInstance implements Closeable { return PathBuilder.fullyQualifiedHDFSUri(path, fs); } + private Path createAvroSchemaFile(FileSystem fs, Path testPath) throws IOException { + Path schemaFile = new Path(testPath, AVRO_SCHEMA_FILE_NAME); + String[] schemaVals = new String[] { "{", + " \"type\" : \"record\",", + " \"name\" : \"table1\",", + " \"doc\" : \"Sqoop import of table1\",", + " \"fields\" : [ {", + " \"name\" : \"col1\",", + " \"type\" : [ \"null\", \"string\" ],", + " \"default\" : null,", + " \"columnName\" : \"col1\",", + " \"sqlType\" : \"12\"", + " }, {", + " \"name\" : \"col2\",", + " \"type\" : [ \"null\", \"long\" ],", + " \"default\" : null,", + " \"columnName\" : \"col2\",", + " \"sqlType\" : \"13\"", + " } ],", + " \"tableName\" : \"table1\"", + "}" + }; + createTestDataFile(schemaFile.toUri().getPath(), schemaVals); + return schemaFile; + } + + private void createTestDataFile(String filename, String[] lines) throws IOException { + FileWriter writer = null; + try { + File file = new File(filename); + file.deleteOnExit(); + writer = new FileWriter(file); + int i=0; + for (String line : lines) { + writer.write(line + "\n"); + } + } finally { + if (writer != null) { + writer.close(); + } + } + } + public HiveConf getConf() { return hiveConf; } @@ -433,6 +486,10 @@ public class WarehouseInstance implements Closeable { assertEquals(expectedCount, client.getNotificationEventsCount(rqst).getEventsCount()); } + public boolean isAcidEnabled() { + return hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY); + } + @Override public void close() throws IOException { if (miniDFSCluster != null && miniDFSCluster.isClusterUp()) { http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java index 0372064..93e9d48 100644 --- a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java +++ b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java @@ -23,6 +23,8 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.CmRecycleResponse; +import org.apache.hadoop.hive.metastore.api.CmRecycleRequest; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.FireEventRequest; import org.apache.hadoop.hive.metastore.api.FireEventResponse; @@ -125,6 +127,10 @@ public final class SynchronizedMetaStoreClient { client.addWriteNotificationLog(rqst); } + public synchronized CmRecycleResponse recycleDirToCmPath(CmRecycleRequest request) throws MetaException, TException { + return client.recycleDirToCmPath(request); + } + public synchronized void close() { client.close(); } http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 57f71a8..608cbd5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -2027,8 +2027,11 @@ public class Driver implements IDriver { } else if(plan.getOperation() == HiveOperation.ROLLBACK) { releaseLocksAndCommitOrRollback(false); - } - else { + } else if (!queryTxnMgr.isTxnOpen() && queryState.getHiveOperation() == HiveOperation.REPLLOAD) { + // repl load during migration, commits the explicit txn and start some internal txns. Call + // releaseLocksAndCommitOrRollback to do the clean up. + releaseLocksAndCommitOrRollback(false); + } else { //txn (if there is one started) is not finished } } catch (LockException e) { http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 6b28cca..a96d54d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -21,18 +21,15 @@ package org.apache.hadoop.hive.ql.exec; import static org.apache.commons.lang.StringUtils.join; import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE; -import java.io.BufferedWriter; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.Serializable; -import java.io.Writer; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.sql.SQLException; -import java.util.AbstractList; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -53,7 +50,6 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -152,13 +148,11 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObject.HiveLockObjectData; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.metadata.CheckConstraint; -import org.apache.hadoop.hive.metastore.CheckResult; import org.apache.hadoop.hive.ql.metadata.DefaultConstraint; import org.apache.hadoop.hive.ql.metadata.ForeignKeyInfo; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveMaterializedViewsRegistry; -import org.apache.hadoop.hive.metastore.HiveMetaStoreChecker; import org.apache.hadoop.hive.ql.metadata.InvalidTableException; import org.apache.hadoop.hive.ql.metadata.NotNullConstraint; import org.apache.hadoop.hive.ql.metadata.Partition; @@ -251,6 +245,7 @@ import org.apache.hadoop.hive.ql.plan.TruncateTableDesc; import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc; import org.apache.hadoop.hive.ql.plan.UnlockTableDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.security.authorization.AuthorizationUtils; import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationTranslator; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizationTranslator; @@ -288,7 +283,6 @@ import org.apache.hadoop.util.ToolRunner; import org.apache.hive.common.util.AnnotationUtils; import org.apache.hive.common.util.HiveStringUtils; import org.apache.hive.common.util.ReflectionUtil; -import org.apache.hive.common.util.RetryUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.stringtemplate.v4.ST; @@ -4701,7 +4695,20 @@ public class DDLTask extends Task<DDLWork> implements Serializable { // create the table if (crtTbl.getReplaceMode()) { ReplicationSpec replicationSpec = crtTbl.getReplicationSpec(); - long writeId = replicationSpec != null && replicationSpec.isInReplicationScope() ? crtTbl.getReplWriteId() : 0L; + long writeId = 0; + if (replicationSpec != null && replicationSpec.isInReplicationScope()) { + if (replicationSpec.isMigratingToTxnTable()) { + // for migration we start the transaction and allocate write id in repl txn task for migration. + String writeIdPara = conf.get(ReplUtils.REPL_CURRENT_TBL_WRITE_ID); + if (writeIdPara == null) { + throw new HiveException("DDLTask : Write id is not set in the config by open txn task for migration"); + } + writeId = Long.parseLong(writeIdPara); + } else { + writeId = crtTbl.getReplWriteId(); + } + } + // replace-mode creates are really alters using CreateTableDesc. db.alterTable(tbl.getCatName(), tbl.getDbName(), tbl.getTableName(), tbl, false, null, true, writeId); http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index c1cc633..ca4391f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj; import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -375,6 +376,17 @@ public class MoveTask extends Task<MoveWork> implements Serializable { checkFileFormats(db, tbd, table); + // for transactional table if write id is not set during replication from a cluster with STRICT_MANAGED set + // to false then set it now. + if (tbd.getWriteId() <= 0 && AcidUtils.isTransactionalTable(table.getParameters())) { + String writeId = conf.get(ReplUtils.REPL_CURRENT_TBL_WRITE_ID); + if (writeId == null) { + throw new HiveException("MoveTask : Write id is not set in the config by open txn task for migration"); + } + tbd.setWriteId(Long.parseLong(writeId)); + tbd.setStmtId(driverContext.getCtx().getHiveTxnManager().getStmtIdAndIncrement()); + } + boolean isFullAcidOp = work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID && !tbd.isMmTable(); //it seems that LoadTableDesc has Operation.INSERT only for CTAS... http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java index c2953c5..c91b78e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java @@ -18,10 +18,12 @@ package org.apache.hadoop.hive.ql.exec; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.CommitTxnRequest; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -33,6 +35,7 @@ import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import java.util.List; +import org.apache.hadoop.hive.common.ValidTxnList; /** * ReplTxnTask. @@ -91,12 +94,38 @@ public class ReplTxnTask extends Task<ReplTxnWork> { LOG.info("Replayed OpenTxn Event for policy " + replPolicy + " with srcTxn " + work.getTxnIds().toString() + " and target txn id " + txnIds.toString()); return 0; + case REPL_MIGRATION_OPEN_TXN: + // if transaction is already opened (mostly by repl load command), then close it. + if (txnManager.isTxnOpen()) { + long txnId = txnManager.getCurrentTxnId(); + txnManager.commitTxn(); + LOG.info("Committed txn from REPL_MIGRATION_OPEN_TXN : " + txnId); + } + Long txnIdMigration = txnManager.openTxn(driverContext.getCtx(), user); + long writeId = txnManager.getTableWriteId(work.getDbName(), work.getTableName()); + String validTxnList = txnManager.getValidTxns().toString(); + conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList); + conf.set(ReplUtils.REPL_CURRENT_TBL_WRITE_ID, Long.toString(writeId)); + LOG.info("Started open txn for migration : " + txnIdMigration + " with valid txn list : " + + validTxnList + " and write id " + writeId); + return 0; case REPL_ABORT_TXN: for (long txnId : work.getTxnIds()) { txnManager.replRollbackTxn(replPolicy, txnId); LOG.info("Replayed AbortTxn Event for policy " + replPolicy + " with srcTxn " + txnId); } return 0; + case REPL_MIGRATION_COMMIT_TXN: + assert (work.getReplLastIdInfo() != null); + long txnIdMigrationCommit = txnManager.getCurrentTxnId(); + CommitTxnRequest commitTxnRequestMigr = new CommitTxnRequest(txnIdMigrationCommit); + commitTxnRequestMigr.setReplLastIdInfo(work.getReplLastIdInfo()); + txnManager.replCommitTxn(commitTxnRequestMigr); + conf.unset(ValidTxnList.VALID_TXNS_KEY); + conf.unset(ReplUtils.REPL_CURRENT_TBL_WRITE_ID); + LOG.info("Committed Migration Txn with replLastIdInfo: " + work.getReplLastIdInfo() + " for txnId: " + + txnIdMigrationCommit); + return 0; case REPL_COMMIT_TXN: // Currently only one commit txn per event is supported. assert (work.getTxnIds().size() == 1); @@ -145,4 +174,8 @@ public class ReplTxnTask extends Task<ReplTxnWork> { public String getName() { return "REPL_TRANSACTION"; } + + public ReplTxnWork.OperationType getOperationType() { + return work.getOperationType(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java index d203ae4..357c693 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -18,13 +18,17 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; @@ -33,14 +37,18 @@ import org.apache.hadoop.hive.ql.parse.repl.load.MetaData; import org.apache.hadoop.hive.ql.plan.AddPartitionDesc; import org.apache.hadoop.hive.ql.plan.ImportTableDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration; import java.net.URI; import java.util.ArrayList; import java.util.List; +import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.getHiveUpdater; + public class FSTableEvent implements TableEvent { private final Path fromPath; private final MetaData metadata; + private final HiveConf hiveConf; FSTableEvent(HiveConf hiveConf, String metadataDir) { try { @@ -48,6 +56,7 @@ public class FSTableEvent implements TableEvent { fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath()); FileSystem fs = FileSystem.get(fromURI, hiveConf); metadata = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME)); + this.hiveConf = hiveConf; } catch (Exception e) { throw new RuntimeException(e); } @@ -67,9 +76,30 @@ public class FSTableEvent implements TableEvent { public ImportTableDesc tableDesc(String dbName) throws SemanticException { try { Table table = new Table(metadata.getTable()); - ImportTableDesc tableDesc = - new ImportTableDesc(StringUtils.isBlank(dbName) ? table.getDbName() : dbName, table); + // The table can be non acid in case of replication from 2.6 cluster. + if (!AcidUtils.isTransactionalTable(table) + && hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES) + && (table.getTableType() == TableType.MANAGED_TABLE)) { + Hive hiveDb = Hive.get(hiveConf); + //TODO : dump metadata should be read to make sure that migration is required. + HiveStrictManagedMigration.TableMigrationOption migrationOption + = HiveStrictManagedMigration.determineMigrationTypeAutomatically(table.getTTable(), + table.getTableType(),null, (Configuration)hiveConf, + hiveDb.getMSC(),true); + HiveStrictManagedMigration.migrateTable(table.getTTable(), table.getTableType(), + migrationOption, false, + getHiveUpdater(hiveConf), hiveDb.getMSC(), (Configuration)hiveConf); + // If the conversion is from non transactional to transactional table + if (AcidUtils.isTransactionalTable(table)) { + replicationSpec().setMigratingToTxnTable(); + } + } + ImportTableDesc tableDesc + = new ImportTableDesc(StringUtils.isBlank(dbName) ? table.getDbName() : dbName, table); tableDesc.setReplicationSpec(replicationSpec()); + if (table.getTableType() == TableType.EXTERNAL_TABLE) { + tableDesc.setExternal(true); + } return tableDesc; } catch (Exception e) { throw new SemanticException(e); http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 172b4ac..8102997 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -229,12 +229,12 @@ public class LoadPartitions { // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. LoadFileType loadFileType; - if (event.replicationSpec().isInReplicationScope() && + if (event.replicationSpec().isInReplicationScope() && !event.replicationSpec().isMigratingToTxnTable() && context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { loadFileType = LoadFileType.IGNORE; } else { - loadFileType = - event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + loadFileType = event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : + event.replicationSpec().isMigratingToTxnTable() ? LoadFileType.KEEP_EXISTING : LoadFileType.OVERWRITE_EXISTING; tmpPath = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo); } @@ -283,11 +283,23 @@ public class LoadPartitions { LoadFileType loadFileType) { MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { - LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( - Collections.singletonList(tmpPath), - Collections.singletonList(new Path(partSpec.getLocation())), - true, null, null); - moveWork.setMultiFilesDesc(loadFilesWork); + if (event.replicationSpec().isMigratingToTxnTable()) { + // Write-id is hardcoded to 1 so that for migration, we just move all original files under delta_1_1 dir. + // It is used only for transactional tables created after migrating from non-ACID table. + // ReplTxnTask added earlier in the DAG ensure that the write-id is made valid in HMS metadata. + LoadTableDesc loadTableWork = new LoadTableDesc( + tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), + loadFileType, 1L + ); + loadTableWork.setInheritTableSpecs(false); + moveWork.setLoadTableWork(loadTableWork); + } else { + LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( + Collections.singletonList(tmpPath), + Collections.singletonList(new Path(partSpec.getLocation())), + true, null, null); + moveWork.setMultiFilesDesc(loadFilesWork); + } } else { LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index 8538463..1d454fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -54,6 +56,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; +import java.util.BitSet; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -193,6 +196,17 @@ public class LoadTable { Task<?> replTxnTask = TaskFactory.get(replTxnWork, context.hiveConf); parentTask.addDependentTask(replTxnTask); parentTask = replTxnTask; + } else if (replicationSpec.isMigratingToTxnTable()) { + // Non-transactional table is converted to transactional table. + // The write-id 1 is used to copy data for the given table and also no writes are aborted. + ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList( + AcidUtils.getFullTableName(tblDesc.getDatabaseName(), tblDesc.getTableName()), + new long[0], new BitSet(), 1); + ReplTxnWork replTxnWork = new ReplTxnWork(tblDesc.getDatabaseName(), tblDesc.getTableName(), null, + validWriteIdList.writeToString(), ReplTxnWork.OperationType.REPL_WRITEID_STATE); + Task<?> replTxnTask = TaskFactory.get(replTxnWork, context.hiveConf); + parentTask.addDependentTask(replTxnTask); + parentTask = replTxnTask; } if (!isPartitioned(tblDesc)) { LOG.debug("adding dependent ReplTxnTask/CopyWork/MoveWork for table"); @@ -223,12 +237,12 @@ public class LoadTable { // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir. LoadFileType loadFileType; - if (replicationSpec.isInReplicationScope() && + if (replicationSpec.isInReplicationScope() && !replicationSpec.isMigratingToTxnTable() && context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) { loadFileType = LoadFileType.IGNORE; } else { - loadFileType = - replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING; + loadFileType = replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : + replicationSpec.isMigratingToTxnTable() ? LoadFileType.KEEP_EXISTING : LoadFileType.OVERWRITE_EXISTING; tmpPath = PathUtils.getExternalTmpPath(tgtPath, context.pathInfo); } @@ -241,11 +255,22 @@ public class LoadTable { MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false); if (AcidUtils.isTransactionalTable(table)) { - LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( - Collections.singletonList(tmpPath), - Collections.singletonList(tgtPath), - true, null, null); - moveWork.setMultiFilesDesc(loadFilesWork); + if (replicationSpec.isMigratingToTxnTable()) { + // Write-id is hardcoded to 1 so that for migration, we just move all original files under delta_1_1 dir. + // However, it unused if it is non-ACID table. + // ReplTxnTask added earlier in the DAG ensure that the write-id is made valid in HMS metadata. + LoadTableDesc loadTableWork = new LoadTableDesc( + tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), + loadFileType, 1L + ); + moveWork.setLoadTableWork(loadTableWork); + } else { + LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc( + Collections.singletonList(tmpPath), + Collections.singletonList(tgtPath), + true, null, null); + moveWork.setMultiFilesDesc(loadFilesWork); + } } else { LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), http://git-wip-us.apache.org/repos/asf/hive/blob/10cfba20/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java index 5f15998..ae6411d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java @@ -21,7 +21,10 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.ReplLastIdInfo; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -47,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.AlterDatabaseDesc; import org.apache.hadoop.hive.ql.plan.AlterTableDesc; import org.apache.hadoop.hive.ql.plan.DDLWork; import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; +import org.apache.hadoop.hive.ql.plan.ReplTxnWork; import org.apache.hadoop.hive.ql.stats.StatsUtils; import org.slf4j.Logger; @@ -233,17 +237,47 @@ public class IncrementalLoadTasksBuilder { return addUpdateReplStateTasks(StringUtils.isEmpty(context.tableName), messageHandler.getUpdatedMetadata(), tasks); } + private Task<? extends Serializable> getMigrationCommitTxnTask(String dbName, String tableName, + List<Map <String, String>> partSpec, String replState, + boolean needUpdateDBReplId, + Task<? extends Serializable> preCursor) throws SemanticException { + ReplLastIdInfo replLastIdInfo = new ReplLastIdInfo(dbName, Long.parseLong(replState)); + replLastIdInfo.setTable(tableName); + replLastIdInfo.setNeedUpdateDBReplId(needUpdateDBReplId); + if (partSpec != null && !partSpec.isEmpty()) { + List<String> partitionList = new ArrayList<>(); + for (Map <String, String> part : partSpec) { + try { + partitionList.add(Warehouse.makePartName(part, false)); + } catch (MetaException e) { + throw new SemanticException(e.getMessage()); + } + } + replLastIdInfo.setPartitionList(partitionList); + } + + Task<? extends Serializable> updateReplIdTxnTask = TaskFactory.get(new ReplTxnWork(replLastIdInfo, ReplTxnWork + .OperationType.REPL_MIGRATION_COMMIT_TXN), conf); + + if (preCursor != null) { + preCursor.addDependentTask(updateReplIdTxnTask); + log.debug("Added {}:{} as a precursor of {}:{}", preCursor.getClass(), preCursor.getId(), + updateReplIdTxnTask.getClass(), updateReplIdTxnTask.getId()); + } + return updateReplIdTxnTask; + } + private Task<? extends Serializable> tableUpdateReplStateTask(String dbName, String tableName, Map<String, String> partSpec, String replState, Task<? extends Serializable> preCursor) throws SemanticException { HashMap<String, String> mapProp = new HashMap<>(); mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState); - AlterTableDesc alterTblDesc = new AlterTableDesc( + AlterTableDesc alterTblDesc = new AlterTableDesc( AlterTableDesc.AlterTableTypes.ADDPROPS, new ReplicationSpec(replState, replState)); alterTblDesc.setProps(mapProp); alterTblDesc.setOldName(StatsUtils.getFullyQualifiedTableName(dbName, tableName)); - alterTblDesc.setPartSpec((HashMap<String, String>)partSpec); + alterTblDesc.setPartSpec((HashMap<String, String>) partSpec); Task<? extends Serializable> updateReplIdTask = TaskFactory.get(new DDLWork(inputs, outputs, alterTblDesc), conf); @@ -283,8 +317,17 @@ public class IncrementalLoadTasksBuilder { return importTasks; } + boolean needCommitTx = updatedMetaDataTracker.isNeedCommitTxn(); + // In migration flow, we should have only one table update per event. + if (needCommitTx) { + // currently, only commit txn event can have updates in multiple table. Commit txn does not starts + // a txn and thus needCommitTx must have set to false. + assert updatedMetaDataTracker.getUpdateMetaDataList().size() <= 1; + } + // Create a barrier task for dependency collection of import tasks Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf); + List<Task<? extends Serializable>> tasks = new ArrayList<>(); Task<? extends Serializable> updateReplIdTask; @@ -292,21 +335,43 @@ public class IncrementalLoadTasksBuilder { String replState = updateMetaData.getReplState(); String dbName = updateMetaData.getDbName(); String tableName = updateMetaData.getTableName(); + // If any partition is updated, then update repl state in partition object - for (final Map<String, String> partSpec : updateMetaData.getPartitionsList()) { - updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask); - tasks.add(updateReplIdTask); + if (needCommitTx) { + if (updateMetaData.getPartitionsList().size() > 0) { + updateReplIdTask = getMigrationCommitTxnTask(dbName, tableName, + updateMetaData.getPartitionsList(), replState, isDatabaseLoad, barrierTask); + tasks.add(updateReplIdTask); + // commit txn task will update repl id for table and database also. + break; + } + } else { + for (final Map<String, String> partSpec : updateMetaData.getPartitionsList()) { + updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask); + tasks.add(updateReplIdTask); + } } + // If any table/partition is updated, then update repl state in table object if (tableName != null) { - // If any table/partition is updated, then update repl state in table object + if (needCommitTx) { + updateReplIdTask = getMigrationCommitTxnTask(dbName, tableName, null, + replState, isDatabaseLoad, barrierTask); + tasks.add(updateReplIdTask); + // commit txn task will update repl id for database also. + break; + } updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask); tasks.add(updateReplIdTask); } - // For table level load, need not update replication state for the database - if (isDatabaseLoad) { - // If any table/partition is updated, then update repl state in db object + // If any table/partition is updated, then update repl state in db object + if (needCommitTx) { + updateReplIdTask = getMigrationCommitTxnTask(dbName, null, null, + replState, isDatabaseLoad, barrierTask); + tasks.add(updateReplIdTask); + } else if (isDatabaseLoad) { + // For table level load, need not update replication state for the database updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask); tasks.add(updateReplIdTask); }