HIVE-19435: Incremental replication cause data loss if a table is dropped followed by create and insert-into with different partition type (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Thejas M Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/91b66c5c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/91b66c5c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/91b66c5c Branch: refs/heads/branch-3.0.0 Commit: 91b66c5c5a6106d42903385b7a49e6b32400cc39 Parents: 11a7164 Author: Sankar Hariappan <sank...@apache.org> Authored: Sun May 13 13:02:25 2018 +0530 Committer: Sankar Hariappan <sank...@apache.org> Committed: Sun May 13 13:02:25 2018 +0530 ---------------------------------------------------------------------- ...TestReplicationScenariosAcrossInstances.java | 58 ++++++++++++++++++++ .../hive/ql/parse/ImportSemanticAnalyzer.java | 38 ++++++++++--- 2 files changed, 88 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/91b66c5c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index 70e1aa7..df9bde0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -675,4 +675,62 @@ public class TestReplicationScenariosAcrossInstances { .run("select id from table2 order by id") .verifyResults(new String[] {"2"}); } + + @Test + public void testIncrementalReplWithDropAndCreateTableDifferentPartitionTypeAndInsert() throws Throwable { + // Bootstrap dump with empty db + WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName, null); + + // Bootstrap load in replica + replica.load(replicatedDbName, bootstrapTuple.dumpLocation) + .status(replicatedDbName) + .verifyResult(bootstrapTuple.lastReplicationId); + + // First incremental dump + WarehouseInstance.Tuple firstIncremental = primary.run("use " + primaryDbName) + .run("create table table1 (id int) partitioned by (country string)") + .run("create table table2 (id int)") + .run("create table table3 (id int) partitioned by (country string)") + .run("insert into table1 partition(country='india') values(1)") + .run("insert into table2 values(2)") + .run("insert into table3 partition(country='india') values(3)") + .dump(primaryDbName, bootstrapTuple.lastReplicationId); + + // Second incremental dump + WarehouseInstance.Tuple secondIncremental = primary.run("use " + primaryDbName) + .run("drop table table1") + .run("drop table table2") + .run("drop table table3") + .run("create table table1 (id int)") + .run("insert into table1 values (10)") + .run("create table table2 (id int) partitioned by (country string)") + .run("insert into table2 partition(country='india') values(20)") + .run("create table table3 (id int) partitioned by (name string, rank int)") + .run("insert into table3 partition(name='adam', rank=100) values(30)") + .dump(primaryDbName, firstIncremental.lastReplicationId); + + // First incremental load + replica.load(replicatedDbName, firstIncremental.dumpLocation) + .status(replicatedDbName) + .verifyResult(firstIncremental.lastReplicationId) + .run("use " + replicatedDbName) + .run("select id from table1") + .verifyResults(new String[] {"1"}) + .run("select * from table2") + .verifyResults(new String[] {"2"}) + .run("select id from table3") + .verifyResults(new String[] {"3"}); + + // Second incremental load + replica.load(replicatedDbName, secondIncremental.dumpLocation) + .status(replicatedDbName) + .verifyResult(secondIncremental.lastReplicationId) + .run("use " + replicatedDbName) + .run("select * from table1") + .verifyResults(new String[] {"10"}) + .run("select id from table2") + .verifyResults(new String[] {"20"}) + .run("select id from table3") + .verifyResults(new String[] {"30"}); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/91b66c5c/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index e6a7012..eb6708b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -898,14 +898,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { } } - private static Table createNewTableMetadataObject(ImportTableDesc tblDesk) + private static Table createNewTableMetadataObject(ImportTableDesc tblDesc) throws SemanticException { - Table newTable = new Table(tblDesk.getDatabaseName(), tblDesk.getTableName()); + Table newTable = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName()); //so that we know the type of table we are creating: acid/MM to match what was exported - newTable.setParameters(tblDesk.getTblProps()); - if(tblDesk.isExternal() && AcidUtils.isTransactionalTable(newTable)) { + newTable.setParameters(tblDesc.getTblProps()); + if(tblDesc.isExternal() && AcidUtils.isTransactionalTable(newTable)) { throw new SemanticException("External tables may not be transactional: " + - Warehouse.getQualifiedName(tblDesk.getDatabaseName(), tblDesk.getTableName())); + Warehouse.getQualifiedName(tblDesc.getDatabaseName(), tblDesc.getTableName())); } return newTable; } @@ -1027,14 +1027,36 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { x.getTasks().add(t); } } else { + // If table of current event has partition flag different from existing table, it means, some + // of the previous events in same batch have drop and create table events with same same but + // different partition flag. In this case, should go with current event's table type and so + // create the dummy table object for adding repl tasks. + boolean isOldTableValid = true; + if (table.isPartitioned() != isPartitioned(tblDesc)) { + table = createNewTableMetadataObject(tblDesc); + isOldTableValid = false; + } + // Table existed, and is okay to replicate into, not dropping and re-creating. - if (table.isPartitioned()) { + if (isPartitioned(tblDesc)) { x.getLOG().debug("table partitioned"); for (AddPartitionDesc addPartitionDesc : partitionDescs) { addPartitionDesc.setReplicationSpec(replicationSpec); Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; - if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { + if (isOldTableValid) { + // If existing table is valid but the partition spec is different, then ignore partition + // validation and create new partition. + try { + ptn = x.getHive().getPartition(table, partSpec, false); + } catch (HiveException ex) { + ptn = null; + table = createNewTableMetadataObject(tblDesc); + isOldTableValid = false; + } + } + + if (ptn == null) { if (!replicationSpec.isMetadataOnly()){ x.getTasks().add(addSinglePartition( fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId, isSourceMm)); @@ -1079,7 +1101,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { x.getLOG().debug("table non-partitioned"); if (!replicationSpec.isMetadataOnly()) { // repl-imports are replace-into unless the event is insert-into - loadTable(fromURI, table, replicationSpec.isReplace(), table.getDataLocation(), + loadTable(fromURI, table, replicationSpec.isReplace(), new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId, isSourceMm); } else { x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec));