HIVE-20911: External Table Replication for Hive (Anishek Agarwal, reviewed by Sankar Hariappan, Ashutosh Bapat)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b3ef75ea Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b3ef75ea Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b3ef75ea Branch: refs/heads/master Commit: b3ef75eaa1e828f8c80d95ea7c32abcd1f000ef4 Parents: 0dbb896 Author: Anishek Agarwal <anis...@gmail.com> Authored: Tue Jan 8 13:56:02 2019 +0530 Committer: Anishek Agarwal <anis...@gmail.com> Committed: Tue Jan 8 13:56:02 2019 +0530 ---------------------------------------------------------------------- .../apache/hadoop/hive/common/FileUtils.java | 2 + .../org/apache/hadoop/hive/conf/HiveConf.java | 4 + .../parse/BaseReplicationAcrossInstances.java | 83 ++++ .../TestReplTableMigrationWithJsonFormat.java | 29 ++ .../hive/ql/parse/TestReplicationScenarios.java | 4 +- ...TestReplicationScenariosAcrossInstances.java | 200 ++------- .../TestReplicationScenariosExternalTables.java | 420 +++++++++++++++++++ ...ationScenariosIncrementalLoadAcidTables.java | 3 - .../TestReplicationScenariosMigration.java | 33 -- .../TestReplicationWithTableMigration.java | 235 +++++++---- .../hadoop/hive/ql/parse/WarehouseInstance.java | 81 +--- .../java/org/apache/hadoop/hive/ql/Context.java | 5 +- .../apache/hadoop/hive/ql/exec/TaskFactory.java | 5 + .../exec/repl/ExternalTableCopyTaskBuilder.java | 150 +++++++ .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 96 +++-- .../hive/ql/exec/repl/ReplExternalTables.java | 272 ++++++++++++ .../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 53 ++- .../hadoop/hive/ql/exec/repl/ReplLoadWork.java | 19 +- .../filesystem/BootstrapEventsIterator.java | 1 + .../filesystem/DatabaseEventsIterator.java | 6 + .../events/filesystem/FSTableEvent.java | 44 +- .../bootstrap/load/table/LoadPartitions.java | 91 ++-- .../repl/bootstrap/load/table/LoadTable.java | 73 +++- .../IncrementalLoadTasksBuilder.java | 15 +- .../hive/ql/exec/repl/util/ReplUtils.java | 3 +- .../apache/hadoop/hive/ql/metadata/Table.java | 2 +- .../hive/ql/parse/BaseSemanticAnalyzer.java | 3 +- .../hive/ql/parse/ImportSemanticAnalyzer.java | 116 +++-- .../ql/parse/ReplicationSemanticAnalyzer.java | 34 +- .../hadoop/hive/ql/parse/ReplicationSpec.java | 9 + .../hadoop/hive/ql/parse/repl/CopyUtils.java | 2 +- .../hadoop/hive/ql/parse/repl/dump/Utils.java | 7 +- .../repl/dump/events/DropTableHandler.java | 3 +- .../parse/repl/dump/events/InsertHandler.java | 4 + .../parse/repl/dump/io/PartitionSerializer.java | 4 - .../ql/parse/repl/dump/io/TableSerializer.java | 29 +- .../hive/ql/parse/repl/load/MetadataJson.java | 10 +- .../parse/repl/load/message/InsertHandler.java | 24 ++ .../parse/repl/load/message/MessageHandler.java | 4 + .../parse/repl/load/message/TableHandler.java | 103 ++++- .../hive/ql/exec/repl/TestReplDumpTask.java | 13 +- .../queries/clientpositive/repl_2_exim_basic.q | 1 - .../clientpositive/repl_2_exim_basic.q.out | 3 +- .../ptest2/conf/deployed/master-mr2.properties | 2 +- 44 files changed, 1711 insertions(+), 589 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/common/src/java/org/apache/hadoop/hive/common/FileUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index 56748fd..23a3a6b 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -638,6 +638,8 @@ public final class FileUtils { public static boolean distCp(FileSystem srcFS, List<Path> srcPaths, Path dst, boolean deleteSource, String doAsUser, HiveConf conf, HadoopShims shims) throws IOException { + LOG.debug("copying srcPaths : {}, to DestPath :{} ,with doAs: {}", + StringUtils.join(",", srcPaths), dst.toString(), doAsUser); boolean copied = false; if (doAsUser == null){ copied = shims.runDistCp(srcPaths, dst, conf); http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 6a7c4ab..b213609 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -502,6 +502,10 @@ public class HiveConf extends Configuration { + " Schemes of the file system which does not support atomic move (rename) can be specified here to \n " + " speed up the repl load operation. In file system like HDFS where move operation is atomic, this \n" + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."), + REPL_EXTERNAL_TABLE_BASE_DIR("hive.repl.replica.external.table.base.dir", "/", + "This is the base directory on the target/replica warehouse under which data for " + + "external tables is stored. This is relative base path and hence prefixed to the source " + + "external table path on target cluster."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java new file mode 100644 index 0000000..d321cca --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.shims.Utils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; + +public class BaseReplicationAcrossInstances { + @Rule + public final TestName testName = new TestName(); + + protected static final Logger LOG = LoggerFactory.getLogger(BaseReplicationAcrossInstances.class); + static WarehouseInstance primary; + static WarehouseInstance replica; + String primaryDbName, replicatedDbName; + static HiveConf conf; + + static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz) + throws Exception { + conf = new HiveConf(clazz); + conf.set("dfs.client.use.datanode.hostname", "true"); + conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); + MiniDFSCluster miniDFSCluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + Map<String, String> localOverrides = new HashMap<String, String>() {{ + put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); + put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true"); + }}; + localOverrides.putAll(overrides); + primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides); + replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides); + } + + @AfterClass + public static void classLevelTearDown() throws IOException { + primary.close(); + replica.close(); + } + + @Before + public void setup() throws Throwable { + primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); + replicatedDbName = "replicated_" + primaryDbName; + primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + + SOURCE_OF_REPLICATION + "' = '1,2,3')"); + } + + @After + public void tearDown() throws Throwable { + primary.run("drop database if exists " + primaryDbName + " cascade"); + replica.run("drop database if exists " + replicatedDbName + " cascade"); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java new file mode 100644 index 0000000..0151ed0 --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplTableMigrationWithJsonFormat.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.junit.BeforeClass; + +import java.util.Collections; + +public class TestReplTableMigrationWithJsonFormat extends TestReplicationWithTableMigration { + @BeforeClass + public static void classLevelSetup() throws Exception { + internalBeforeClassSetup(Collections.emptyMap()); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java index 98cbd97..c85a2a4 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java @@ -91,6 +91,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -393,7 +394,8 @@ public class TestReplicationScenarios { HiveConf confTemp = new HiveConf(); confTemp.set("hive.repl.enable.move.optimization", "true"); ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, tuple.dumpLocation, replicadb, - null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId)); + null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId), + Collections.emptyList()); Task replLoadTask = TaskFactory.get(replLoadWork, confTemp); replLoadTask.initialize(null, null, new DriverContext(driver.getContext()), null); replLoadTask.executeTask(null); http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java index b50f9a8..0df99b3 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java @@ -22,35 +22,29 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; -import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; -import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; -import org.apache.hadoop.hive.ql.util.DependencyResolver; -import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; -import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; +import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; +import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; +import org.apache.hadoop.hive.ql.util.DependencyResolver; +import org.apache.hadoop.security.UserGroupInformation; import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import javax.annotation.Nullable; import java.io.IOException; -import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; @@ -59,8 +53,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import javax.annotation.Nullable; +import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -68,63 +62,17 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; -import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; -import org.apache.hadoop.hive.ql.ErrorMsg; - -public class TestReplicationScenariosAcrossInstances { - @Rule - public final TestName testName = new TestName(); - - protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class); - static WarehouseInstance primary; - private static WarehouseInstance replica; - private String primaryDbName, replicatedDbName; - private static HiveConf conf; +public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcrossInstances { @BeforeClass public static void classLevelSetup() throws Exception { HashMap<String, String> overrides = new HashMap<>(); overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), GzipJSONMessageEncoder.class.getCanonicalName()); + overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, + UserGroupInformation.getCurrentUser().getUserName()); - internalBeforeClassSetup(overrides, TestReplicationScenarios.class); - } - - static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz) - throws Exception { - conf = new HiveConf(clazz); - conf.set("dfs.client.use.datanode.hostname", "true"); - conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); - MiniDFSCluster miniDFSCluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); - Map<String, String> localOverrides = new HashMap<String, String>() {{ - put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); - put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true"); - }}; - localOverrides.putAll(overrides); - primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides); - replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides); - } - - @AfterClass - public static void classLevelTearDown() throws IOException { - primary.close(); - replica.close(); - } - - @Before - public void setup() throws Throwable { - primaryDbName = testName.getMethodName() + "_" + +System.currentTimeMillis(); - replicatedDbName = "replicated_" + primaryDbName; - primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + - SOURCE_OF_REPLICATION + "' = '1,2,3')"); - } - - @After - public void tearDown() throws Throwable { - primary.run("drop database if exists " + primaryDbName + " cascade"); - replica.run("drop database if exists " + replicatedDbName + " cascade"); + internalBeforeClassSetup(overrides, TestReplicationScenariosAcrossInstances.class); } @Test @@ -365,8 +313,10 @@ public class TestReplicationScenariosAcrossInstances { .dump(primaryDbName, null); // each table creation itself takes more than one task, give we are giving a max of 1, we should hit multiple runs. - replica.hiveConf.setIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS, 1); - replica.load(replicatedDbName, tuple.dumpLocation) + List<String> withClause = Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS.varname + "'='1'"); + + replica.load(replicatedDbName, tuple.dumpLocation, withClause) .run("use " + replicatedDbName) .run("show tables") .verifyResults(new String[] { "t1", "t2", "t3" }) @@ -433,7 +383,8 @@ public class TestReplicationScenariosAcrossInstances { .run("create table table2 (a int, city string) partitioned by (country string)") .run("create table table3 (i int, j int)") .run("insert into table1 values (1,2)") - .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'")); + .dump(primaryDbName, null, + Collections.singletonList("'hive.repl.dump.metadata.only'='true'")); replica.load(replicatedDbName, bootstrapTuple.dumpLocation) .run("use " + replicatedDbName) @@ -1181,7 +1132,7 @@ public class TestReplicationScenariosAcrossInstances { .run("use " + importDbFromReplica) .run("import table t1 from " + exportPath) .run("select country from t1") - .verifyResults(Arrays.asList("india")); + .verifyResults(Collections.singletonList("india")); // Check if table/partition in C doesn't have ckpt property t1 = replica.getTable(importDbFromReplica, "t1"); @@ -1568,107 +1519,6 @@ public class TestReplicationScenariosAcrossInstances { .run(" drop database if exists " + replicatedDbName_CM + " cascade"); } - @Test - public void testDumpExternalTableSetFalse() throws Throwable { - WarehouseInstance.Tuple tuple = primary - .run("use " + primaryDbName) - .run("create external table t1 (id int)") - .run("insert into table t1 values (1)") - .run("insert into table t1 values (2)") - .run("create external table t2 (place string) partitioned by (country string)") - .run("insert into table t2 partition(country='india') values ('bangalore')") - .run("insert into table t2 partition(country='us') values ('austin')") - .run("insert into table t2 partition(country='france') values ('paris')") - .dump(primaryDbName, null); - - replica.load(replicatedDbName, tuple.dumpLocation) - .run("repl status " + replicatedDbName) - .verifyResult(tuple.lastReplicationId) - .run("use " + replicatedDbName) - .run("show tables like 't1'") - .verifyFailure(new String[] {"t1"}) - .run("show tables like 't2'") - .verifyFailure(new String[] {"t2"}); - - tuple = primary.run("use " + primaryDbName) - .run("create external table t3 (id int)") - .run("insert into table t3 values (10)") - .run("insert into table t3 values (20)") - .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId - + " with ('hive.repl.dump.metadata.only'='true')"); - - replica.load(replicatedDbName, tuple.dumpLocation) - .run("use " + replicatedDbName) - .run("show tables like 't3'") - .verifyResult("t3") - .run("select id from t3 where id = 10") - .verifyFailure(new String[] {"10"}); - } - - @Test - public void testDumpExternalTableSetTrue() throws Throwable { - WarehouseInstance.Tuple tuple = primary - .run("use " + primaryDbName) - .run("create external table t1 (id int)") - .run("insert into table t1 values (1)") - .run("insert into table t1 values (2)") - .run("create external table t2 (place string) partitioned by (country string)") - .run("insert into table t2 partition(country='india') values ('bangalore')") - .run("insert into table t2 partition(country='us') values ('austin')") - .run("insert into table t2 partition(country='france') values ('paris')") - .dump("repl dump " + primaryDbName + " with ('hive.repl.include.external.tables'='true')"); - - replica.load(replicatedDbName, tuple.dumpLocation) - .run("use " + replicatedDbName) - .run("show tables like 't1'") - .verifyResult("t1") - .run("show tables like 't2'") - .verifyResult("t2") - .run("repl status " + replicatedDbName) - .verifyResult(tuple.lastReplicationId) - .run("select country from t2 where country = 'us'") - .verifyResult("us") - .run("select country from t2 where country = 'france'") - .verifyResult("france"); - - tuple = primary.run("use " + primaryDbName) - .run("create external table t3 (id int)") - .run("insert into table t3 values (10)") - .run("create external table t4 as select id from t3") - .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId - + " with ('hive.repl.include.external.tables'='true')"); - - replica.load(replicatedDbName, tuple.dumpLocation) - .run("use " + replicatedDbName) - .run("show tables like 't3'") - .verifyResult("t3") - .run("select id from t3") - .verifyResult("10") - .run("select id from t4") - .verifyResult(null); // Returns null as create table event doesn't list files - } - - @Test - public void testDumpExternalTableWithAddPartitionEvent() throws Throwable { - WarehouseInstance.Tuple tuple = primary.dump("repl dump " + primaryDbName); - - replica.load(replicatedDbName, tuple.dumpLocation); - - tuple = primary.run("use " + primaryDbName) - .run("create external table t1 (place string) partitioned by (country string)") - .run("alter table t1 add partition(country='india')") - .run("alter table t1 add partition(country='us')") - .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId - + " with ('hive.repl.include.external.tables'='true')"); - - replica.load(replicatedDbName, tuple.dumpLocation) - .run("use " + replicatedDbName) - .run("show tables like 't1'") - .verifyResult("t1") - .run("show partitions t1") - .verifyResults(new String[] { "country=india", "country=us" }); - } - // This requires the tables are loaded in a fixed sorted order. @Test public void testBootstrapLoadRetryAfterFailureForAlterTable() throws Throwable { http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java new file mode 100644 index 0000000..0e3cefc --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java @@ -0,0 +1,420 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestReplicationScenariosExternalTables extends BaseReplicationAcrossInstances { + + private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base"; + + @BeforeClass + public static void classLevelSetup() throws Exception { + HashMap<String, String> overrides = new HashMap<>(); + overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false"); + overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true"); + overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, + UserGroupInformation.getCurrentUser().getUserName()); + + internalBeforeClassSetup(overrides, TestReplicationScenarios.class); + } + + @Test + public void replicationWithoutExternalTables() throws Throwable { + List<String> loadWithClause = externalTableBasePathWithClause(); + List<String> dumpWithClause = Collections.singletonList + ("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='false'"); + + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .run("create external table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") + .dump(primaryDbName, null, dumpWithClause); + + // the _external_tables_file info only should be created if external tables are to be replicated not otherwise + assertFalse(primary.miniDFSCluster.getFileSystem() + .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME))); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyFailure(new String[] { "t1" }) + .run("show tables like 't2'") + .verifyFailure(new String[] { "t2" }); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t3 (id int)") + .run("insert into table t3 values (10)") + .run("insert into table t3 values (20)") + .dump(primaryDbName, tuple.lastReplicationId, dumpWithClause); + + // the _external_tables_file info only should be created if external tables are to be replicated not otherwise + assertFalse(primary.miniDFSCluster.getFileSystem() + .exists(new Path(tuple.dumpLocation, FILE_NAME))); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't3'") + .verifyFailure(new String[] { "t3" }); + } + + @Test + public void externalTableReplicationWithDefaultPaths() throws Throwable { + //creates external tables with partitions + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (1)") + .run("insert into table t1 values (2)") + .run("create external table t2 (place string) partitioned by (country string)") + .run("insert into table t2 partition(country='india') values ('bangalore')") + .run("insert into table t2 partition(country='us') values ('austin')") + .run("insert into table t2 partition(country='france') values ('paris')") + .dump("repl dump " + primaryDbName); + + // verify that the external table info is written correctly for bootstrap + assertExternalFileInfo(Arrays.asList("t1", "t2"), + new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); + + List<String> withClauseOptions = externalTableBasePathWithClause(); + + replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("repl status " + replicatedDbName) + .verifyResult(tuple.lastReplicationId) + .run("select country from t2 where country = 'us'") + .verifyResult("us") + .run("select country from t2 where country = 'france'") + .verifyResult("france"); + + assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName + ".t1"); + assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2"); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t3 (id int)") + .run("insert into table t3 values (10)") + .run("create external table t4 as select id from t3") + .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId); + + // verify that the external table info is written correctly for incremental + assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), + new Path(tuple.dumpLocation, FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't3'") + .verifyResult("t3") + .run("select id from t3") + .verifyResult("10") + .run("select id from t4") + .verifyResult("10"); + + assertTablePartitionLocation(primaryDbName + ".t3", replicatedDbName + ".t3"); + + tuple = primary.run("use " + primaryDbName) + .run("drop table t1") + .dump("repl dump " + primaryDbName + " from " + tuple.lastReplicationId); + + // verify that the external table info is written correctly for incremental + assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), + new Path(tuple.dumpLocation, FILE_NAME)); + } + + /** + * @param sourceTableName -- Provide the fully qualified table name + * @param replicaTableName -- Provide the fully qualified table name + */ + private void assertTablePartitionLocation(String sourceTableName, String replicaTableName) + throws HiveException { + Hive hiveForPrimary = Hive.get(primary.hiveConf); + Table sourceTable = hiveForPrimary.getTable(sourceTableName); + Path sourceLocation = sourceTable.getDataLocation(); + Hive hiveForReplica = Hive.get(replica.hiveConf); + Table replicaTable = hiveForReplica.getTable(replicaTableName); + Path dataLocation = replicaTable.getDataLocation(); + assertEquals(REPLICA_EXTERNAL_BASE + sourceLocation.toUri().getPath(), + dataLocation.toUri().getPath()); + if (sourceTable.isPartitioned()) { + Set<Partition> sourcePartitions = hiveForPrimary.getAllPartitionsOf(sourceTable); + Set<Partition> replicaPartitions = hiveForReplica.getAllPartitionsOf(replicaTable); + assertEquals(sourcePartitions.size(), replicaPartitions.size()); + List<String> expectedPaths = + sourcePartitions.stream() + .map(p -> REPLICA_EXTERNAL_BASE + p.getDataLocation().toUri().getPath()) + .collect(Collectors.toList()); + List<String> actualPaths = + replicaPartitions.stream() + .map(p -> p.getDataLocation().toUri().getPath()) + .collect(Collectors.toList()); + assertTrue(expectedPaths.containsAll(actualPaths)); + } + } + + @Test + public void externalTableReplicationWithCustomPaths() throws Throwable { + Path externalTableLocation = + new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + "a/"); + DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + List<String> loadWithClause = externalTableBasePathWithClause(); + + WarehouseInstance.Tuple bootstrapTuple = primary.run("use " + primaryDbName) + .run("create external table a (i int, j int) " + + "row format delimited fields terminated by ',' " + + "location '" + externalTableLocation.toUri() + "'") + .dump(primaryDbName, null); + + replica.load(replicatedDbName, bootstrapTuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 'a'") + .verifyResults(Collections.singletonList("a")) + .run("select * From a").verifyResults(Collections.emptyList()); + + assertTablePartitionLocation(primaryDbName + ".a", replicatedDbName + ".a"); + + //externally add data to location + try (FSDataOutputStream outputStream = + fs.create(new Path(externalTableLocation, "file1.txt"))) { + outputStream.write("1,2\n".getBytes()); + outputStream.write("13,21\n".getBytes()); + } + + WarehouseInstance.Tuple incrementalTuple = primary.run("create table b (i int)") + .dump(primaryDbName, bootstrapTuple.lastReplicationId); + + replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause) + .run("select i From a") + .verifyResults(new String[] { "1", "13" }) + .run("select j from a") + .verifyResults(new String[] { "2", "21" }); + + // alter table location to something new. + externalTableLocation = + new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/new_location/a/"); + incrementalTuple = primary.run("use " + primaryDbName) + .run("alter table a set location '" + externalTableLocation + "'") + .dump(primaryDbName, incrementalTuple.lastReplicationId); + + replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select i From a") + .verifyResults(Collections.emptyList()); + assertTablePartitionLocation(primaryDbName + ".a", replicatedDbName + ".a"); + } + + @Test + public void externalTableWithPartitions() throws Throwable { + Path externalTableLocation = + new Path("/" + testName.getMethodName() + "/t2/"); + DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + List<String> loadWithClause = externalTableBasePathWithClause(); + + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create external table t2 (place string) partitioned by (country string) row format " + + "delimited fields terminated by ',' location '" + externalTableLocation.toString() + + "'") + .run("insert into t2 partition(country='india') values ('bangalore')") + .dump("repl dump " + primaryDbName); + + assertExternalFileInfo(Collections.singletonList("t2"), + new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't2'") + .verifyResults(new String[] { "t2" }) + .run("select place from t2") + .verifyResults(new String[] { "bangalore" }); + + assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2"); + + // add new data externally, to a partition, but under the table level top directory + Path partitionDir = new Path(externalTableLocation, "country=india"); + try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) { + outputStream.write("pune\n".getBytes()); + outputStream.write("mumbai\n".getBytes()); + } + + tuple = primary.run("use " + primaryDbName) + .run("insert into t2 partition(country='australia') values ('sydney')") + .dump(primaryDbName, tuple.lastReplicationId); + + assertExternalFileInfo(Collections.singletonList("t2"), + new Path(tuple.dumpLocation, FILE_NAME)); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select distinct(country) from t2") + .verifyResults(new String[] { "india", "australia" }) + .run("select place from t2 where country='india'") + .verifyResults(new String[] { "bangalore", "pune", "mumbai" }) + .run("select place from t2 where country='australia'") + .verifyResults(new String[] { "sydney" }); + + Path customPartitionLocation = + new Path("/" + testName.getMethodName() + "/partition_data/t2/country=france"); + fs.mkdirs(externalTableLocation, new FsPermission("777")); + + // add new partitions to the table, at an external location than the table level directory + try (FSDataOutputStream outputStream = fs + .create(new Path(customPartitionLocation, "file.txt"))) { + outputStream.write("paris".getBytes()); + } + + tuple = primary.run("use " + primaryDbName) + .run("ALTER TABLE t2 ADD PARTITION (country='france') LOCATION '" + customPartitionLocation + .toString() + "'") + .dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select place from t2 where country='france'") + .verifyResults(new String[] { "paris" }); + + // change the location of the partition via alter command + String tmpLocation = "/tmp/" + System.nanoTime(); + primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation), new FsPermission("777")); + + tuple = primary.run("use " + primaryDbName) + .run("alter table t2 partition (country='france') set location '" + tmpLocation + "'") + .dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("select place from t2 where country='france'") + .verifyResults(new String[] {}); + } + + @Test + public void externalTableIncrementalReplication() throws Throwable { + WarehouseInstance.Tuple tuple = primary.dump("repl dump " + primaryDbName); + replica.load(replicatedDbName, tuple.dumpLocation); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t1 (place string) partitioned by (country string)") + .run("alter table t1 add partition(country='india')") + .run("alter table t1 add partition(country='us')") + .dump(primaryDbName, tuple.lastReplicationId); + + List<String> loadWithClause = externalTableBasePathWithClause(); + replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show partitions t1") + .verifyResults(new String[] { "country=india", "country=us" }); + + Hive hive = Hive.get(replica.getConf()); + Set<Partition> partitions = + hive.getAllPartitionsOf(hive.getTable(replicatedDbName + ".t1")); + List<String> paths = partitions.stream().map(p -> p.getDataLocation().toUri().getPath()) + .collect(Collectors.toList()); + + tuple = primary + .run("alter table t1 drop partition (country='india')") + .run("alter table t1 drop partition (country='us')") + .dump(primaryDbName, tuple.lastReplicationId); + + replica.load(replicatedDbName, tuple.dumpLocation) + .run("select * From t1") + .verifyResults(new String[] {}); + + for (String path : paths) { + assertTrue(replica.miniDFSCluster.getFileSystem().exists(new Path(path))); + } + + } + + private List<String> externalTableBasePathWithClause() throws IOException, SemanticException { + Path externalTableLocation = new Path(REPLICA_EXTERNAL_BASE); + DistributedFileSystem fileSystem = replica.miniDFSCluster.getFileSystem(); + externalTableLocation = PathBuilder.fullyQualifiedHDFSUri(externalTableLocation, fileSystem); + fileSystem.mkdirs(externalTableLocation); + + // this is required since the same filesystem is used in both source and target + return Collections.singletonList( + "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + + externalTableLocation.toString() + "'" + ); + } + + private void assertExternalFileInfo(List<String> expected, Path externalTableInfoFile) + throws IOException { + DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem(); + assertTrue(fileSystem.exists(externalTableInfoFile)); + InputStream inputStream = fileSystem.open(externalTableInfoFile); + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + Set<String> tableNames = new HashSet<>(); + for (String line = reader.readLine(); line != null; line = reader.readLine()) { + String[] components = line.split(","); + assertEquals("The file should have tableName,base64encoded(data_location)", + 2, components.length); + tableNames.add(components[0]); + assertTrue(components[1].length() > 0); + } + assertTrue(expected.containsAll(tableNames)); + reader.close(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java index 97775b3..5529d9e 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java @@ -22,10 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.hive.ql.parse.WarehouseInstance; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; -import org.apache.hadoop.hive.ql.parse.ReplicationTestUtils; - import org.junit.rules.TestName; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosMigration.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosMigration.java deleted file mode 100644 index 5b8e424..0000000 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosMigration.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.parse; - -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; -import java.util.HashMap; -import org.junit.BeforeClass; - -public class TestReplicationScenariosMigration extends org.apache.hadoop.hive.ql.parse.TestReplicationScenarios { - @BeforeClass - public static void setUpBeforeClass() throws Exception { - HashMap<String, String> overrideProperties = new HashMap<>(); - overrideProperties.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), - GzipJSONMessageEncoder.class.getCanonicalName()); - internalBeforeClassSetup(overrideProperties, true); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java index ec64f4b..58561d4 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java @@ -17,19 +17,24 @@ */ package org.apache.hadoop.hive.ql.parse; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments; import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; import org.apache.hadoop.hive.shims.Utils; -import org.apache.hadoop.hive.ql.parse.WarehouseInstance; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.parse.repl.PathBuilder; import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION; -import org.apache.hadoop.hive.ql.parse.ReplicationTestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +52,6 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.junit.rules.TestName; -import com.google.common.collect.Lists; import static org.apache.hadoop.hive.ql.io.AcidUtils.isFullAcidTable; import static org.apache.hadoop.hive.ql.io.AcidUtils.isTransactionalTable; import static org.junit.Assert.assertEquals; @@ -58,38 +62,48 @@ import static org.junit.Assert.assertTrue; * TestReplicationWithTableMigration - test replication for Hive2 to Hive3 (Strict managed tables) */ public class TestReplicationWithTableMigration { + private final static String AVRO_SCHEMA_FILE_NAME = "avro_table.avsc"; + @Rule public final TestName testName = new TestName(); protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationWithTableMigration.class); private static WarehouseInstance primary, replica; private String primaryDbName, replicatedDbName; - private static HiveConf conf; + private Path avroSchemaFile = null; @BeforeClass public static void classLevelSetup() throws Exception { - conf = new HiveConf(TestReplicationWithTableMigration.class); + HashMap<String, String> overrideProperties = new HashMap<>(); + overrideProperties.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + internalBeforeClassSetup(overrideProperties); + } + + static void internalBeforeClassSetup(Map<String, String> overrideConfigs) throws Exception { + HiveConf conf = new HiveConf(TestReplicationWithTableMigration.class); conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); MiniDFSCluster miniDFSCluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); - HashMap<String, String> overridesForHiveConf = new HashMap<String, String>() {{ - put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); - put("hive.support.concurrency", "true"); - put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); - put("hive.metastore.client.capability.check", "false"); - put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); - put("hive.exec.dynamic.partition.mode", "nonstrict"); - put("hive.strict.checks.bucketing", "false"); - put("hive.mapred.mode", "nonstrict"); - put("mapred.input.dir.recursive", "true"); - put("hive.metastore.disallow.incompatible.col.type.changes", "false"); - put("hive.strict.managed.tables", "true"); + new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + final DistributedFileSystem fs = miniDFSCluster.getFileSystem(); + HashMap<String, String> hiveConfigs = new HashMap<String, String>() {{ + put("fs.defaultFS", fs.getUri().toString()); + put("hive.support.concurrency", "true"); + put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"); + put("hive.metastore.client.capability.check", "false"); + put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); + put("hive.exec.dynamic.partition.mode", "nonstrict"); + put("hive.strict.checks.bucketing", "false"); + put("hive.mapred.mode", "nonstrict"); + put("mapred.input.dir.recursive", "true"); + put("hive.metastore.disallow.incompatible.col.type.changes", "false"); + put("hive.strict.managed.tables", "true"); }}; - replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf); + replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs); - HashMap<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{ - put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString()); + HashMap<String, String> configsForPrimary = new HashMap<String, String>() {{ + put("fs.defaultFS", fs.getUri().toString()); put("hive.metastore.client.capability.check", "false"); put("hive.repl.bootstrap.dump.open.txn.timeout", "1s"); put("hive.exec.dynamic.partition.mode", "nonstrict"); @@ -101,7 +115,40 @@ public class TestReplicationWithTableMigration { put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager"); put("hive.strict.managed.tables", "false"); }}; - primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1); + configsForPrimary.putAll(overrideConfigs); + primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary); + } + + private static Path createAvroSchemaFile(FileSystem fs, Path testPath) throws IOException { + Path schemaFile = new Path(testPath, AVRO_SCHEMA_FILE_NAME); + String[] schemaVals = new String[] { "{", + " \"type\" : \"record\",", + " \"name\" : \"table1\",", + " \"doc\" : \"Sqoop import of table1\",", + " \"fields\" : [ {", + " \"name\" : \"col1\",", + " \"type\" : [ \"null\", \"string\" ],", + " \"default\" : null,", + " \"columnName\" : \"col1\",", + " \"sqlType\" : \"12\"", + " }, {", + " \"name\" : \"col2\",", + " \"type\" : [ \"null\", \"long\" ],", + " \"default\" : null,", + " \"columnName\" : \"col2\",", + " \"sqlType\" : \"13\"", + " } ],", + " \"tableName\" : \"table1\"", + "}" + }; + + try (FSDataOutputStream stream = fs.create(schemaFile)) { + for (String line : schemaVals) { + stream.write((line + "\n").getBytes()); + } + } + fs.deleteOnExit(schemaFile); + return schemaFile; } @AfterClass @@ -116,6 +163,12 @@ public class TestReplicationWithTableMigration { replicatedDbName = "replicated_" + primaryDbName; primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" + SOURCE_OF_REPLICATION + "' = '1,2,3')"); + if (avroSchemaFile == null) { + Path testPath = new Path("/tmp/avro_schema/definition/" + System.nanoTime()); + DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem(); + fs.mkdirs(testPath, new FsPermission("777")); + avroSchemaFile = PathBuilder.fullyQualifiedHDFSUri(createAvroSchemaFile(fs, testPath), fs); + } } @After @@ -125,39 +178,52 @@ public class TestReplicationWithTableMigration { } private WarehouseInstance.Tuple prepareDataAndDump(String primaryDbName, String fromReplId) throws Throwable { - WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) - .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ") - .run("insert into tacid values(1)") - .run("insert into tacid values(2)") - .run("insert into tacid values(3)") - .run("create table tacidpart (place string) partitioned by (country string) clustered by(place) " + - "into 3 buckets stored as orc ") - .run("alter table tacidpart add partition(country='france')") - .run("insert into tacidpart partition(country='india') values('mumbai')") - .run("insert into tacidpart partition(country='us') values('sf')") - .run("insert into tacidpart partition(country='france') values('paris')") - .run("create table tflat (rank int) stored as orc tblproperties(\"transactional\"=\"false\")") - .run("insert into tflat values(11)") - .run("insert into tflat values(22)") - .run("create table tflattext (id int) ") - .run("insert into tflattext values(111), (222)") - .run("create table tflattextpart (id int) partitioned by (country string) ") - .run("insert into tflattextpart partition(country='india') values(1111), (2222)") - .run("insert into tflattextpart partition(country='us') values(3333)") - .run("create table tacidloc (id int) clustered by(id) into 3 buckets stored as orc LOCATION '/tmp' ") - .run("insert into tacidloc values(1)") - .run("insert into tacidloc values(2)") - .run("insert into tacidloc values(3)") - .run("create table tacidpartloc (place string) partitioned by (country string) clustered by(place) " + - "into 3 buckets stored as orc ") - .run("alter table tacidpartloc add partition(country='france') LOCATION '/tmp/part'") - .run("insert into tacidpartloc partition(country='india') values('mumbai')") - .run("insert into tacidpartloc partition(country='us') values('sf')") - .run("insert into tacidpartloc partition(country='france') values('paris')") - .run("create table avro_table ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' " + - "stored as avro tblproperties ('avro.schema.url'='" + primary.avroSchemaFile.toUri().toString() + "')") - .run("insert into avro_table values('str1', 10)") - .dump(primaryDbName, fromReplId); + WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName) + .run("create table tacid (id int) clustered by(id) into 3 buckets stored as orc ") + .run("insert into tacid values(1)") + .run("insert into tacid values(2)") + .run("insert into tacid values(3)") + .run( + "create table tacidpart (place string) partitioned by (country string) clustered by(place) " + + + "into 3 buckets stored as orc ") + .run("alter table tacidpart add partition(country='france')") + .run("insert into tacidpart partition(country='india') values('mumbai')") + .run("insert into tacidpart partition(country='us') values('sf')") + .run("insert into tacidpart partition(country='france') values('paris')") + .run( + "create table tflat (rank int) stored as orc tblproperties(\"transactional\"=\"false\")") + .run("insert into tflat values(11)") + .run("insert into tflat values(22)") + .run("create table tflattext (id int) ") + .run("insert into tflattext values(111), (222)") + .run("create table tflattextpart (id int) partitioned by (country string) ") + .run("insert into tflattextpart partition(country='india') values(1111), (2222)") + .run("insert into tflattextpart partition(country='us') values(3333)") + .run( + "create table tacidloc (id int) clustered by(id) into 3 buckets stored as orc LOCATION '/tmp' ") + .run("insert into tacidloc values(1)") + .run("insert into tacidloc values(2)") + .run("insert into tacidloc values(3)") + .run( + "create table tacidpartloc (place string) partitioned by (country string) clustered by(place) " + + + "into 3 buckets stored as orc ") + .run("alter table tacidpartloc add partition(country='france') LOCATION '/tmp/part'") + .run("insert into tacidpartloc partition(country='india') values('mumbai')") + .run("insert into tacidpartloc partition(country='us') values('sf')") + .run("insert into tacidpartloc partition(country='france') values('paris')") + .run( + "create table avro_table ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' " + + "stored as avro tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri() + .toString() + "')") + .run("insert into avro_table values ('str1', 10)") + .run( + "create table avro_table_part partitioned by (country string) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' " + + "stored as avro tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri() + .toString() + "')") + .run("insert into avro_table_part partition (country='india') values ('another', 13)") + .dump(primaryDbName, fromReplId); assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacid"))); assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidpart"))); assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tflat"))); @@ -165,17 +231,24 @@ public class TestReplicationWithTableMigration { assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tflattextpart"))); assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidloc"))); assertFalse(isTransactionalTable(primary.getTable(primaryDbName, "tacidpartloc"))); - Table avroTable = primary.getTable(primaryDbName, "avro_table"); - assertFalse(isTransactionalTable(avroTable)); - assertFalse(MetaStoreUtils.isExternalTable(avroTable)); + assertAvroTableState(primaryDbName, "avro_table", "avro_table_part"); + assertAvroTableState(primaryDbName, "avro_table_part"); return tuple; } + private void assertAvroTableState(String primaryDbName, String... tableNames) throws Exception { + for (String tableName : tableNames) { + Table avroTable = primary.getTable(primaryDbName, tableName); + assertFalse(isTransactionalTable(avroTable)); + assertFalse(MetaStoreUtils.isExternalTable(avroTable)); + } + } + private void verifyLoadExecution(String replicatedDbName, String lastReplId) throws Throwable { replica.run("use " + replicatedDbName) .run("show tables") .verifyResults(new String[] {"tacid", "tacidpart", "tflat", "tflattext", "tflattextpart", - "tacidloc", "tacidpartloc", "avro_table"}) + "tacidloc", "tacidpartloc", "avro_table", "avro_table_part" }) .run("repl status " + replicatedDbName) .verifyResult(lastReplId) .run("select id from tacid order by id") @@ -193,7 +266,9 @@ public class TestReplicationWithTableMigration { .run("select country from tacidpartloc order by country") .verifyResults(new String[] {"france", "india", "us"}) .run("select col1 from avro_table") - .verifyResults(new String[] {"str1"}); + .verifyResults(new String[] { "str1" }) + .run("select col1 from avro_table_part") + .verifyResults(new String[] { "another" }); assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacid"))); assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidpart"))); @@ -204,23 +279,29 @@ public class TestReplicationWithTableMigration { assertTrue(isTransactionalTable(replica.getTable(replicatedDbName, "tflattextpart"))); assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidloc"))); assertTrue(isFullAcidTable(replica.getTable(replicatedDbName, "tacidpartloc"))); + assertTablePath(replicatedDbName, "avro_table"); + assertPartitionPath(replicatedDbName, "avro_table_part"); + } - /*Path databasePath = new Path(replica.warehouseRoot, replica.getDatabase(replicatedDbName).getLocationUri()); - assertEquals(replica.getTable(replicatedDbName, "tacidloc").getSd().getLocation(), - new Path(databasePath,"tacidloc").toUri().toString()); - - Path tablePath = new Path(databasePath, "tacidpartloc"); - List<Partition> partitions = replica.getAllPartitions(replicatedDbName, "tacidpartloc"); - for (Partition part : partitions) { - tablePath.equals(new Path(part.getSd().getLocation()).getParent()); - }*/ + private void assertPartitionPath(String replicatedDbName, String tableName) throws Exception { + Path tablePath = assertTablePath(replicatedDbName, tableName); + List<Partition> partitions = replica.getAllPartitions(replicatedDbName, tableName); + assertEquals(1, partitions.size()); + String actualPartitionPath = partitions.iterator().next().getSd().getLocation().toLowerCase(); + String expectedPartitionPath = new PathBuilder(tablePath.toString()) + .addDescendant("country=india").build().toUri().toString().toLowerCase(); + assertEquals(expectedPartitionPath, actualPartitionPath); + } - Table avroTable = replica.getTable(replicatedDbName, "avro_table"); + private Path assertTablePath(String replicatedDbName, String tableName) throws Exception { + Table avroTable = replica.getTable(replicatedDbName, tableName); assertTrue(MetaStoreUtils.isExternalTable(avroTable)); - Path tablePath = new PathBuilder(replica.externalTableWarehouseRoot.toString()).addDescendant(replicatedDbName + ".db") - .addDescendant("avro_table") - .build(); - assertEquals(avroTable.getSd().getLocation().toLowerCase(), tablePath.toUri().toString().toLowerCase()); + Path tablePath = new PathBuilder(replica.externalTableWarehouseRoot.toString()) + .addDescendant(replicatedDbName + ".db").addDescendant(tableName).build(); + String expectedTablePath = tablePath.toUri().toString().toLowerCase(); + String actualTablePath = avroTable.getSd().getLocation().toLowerCase(); + assertEquals(expectedTablePath, actualTablePath); + return tablePath; } private void loadWithFailureInAddNotification(String tbl, String dumpLocation) throws Throwable { @@ -231,12 +312,12 @@ public class TestReplicationWithTableMigration { public Boolean apply(@Nullable InjectableBehaviourObjectStore.CallerArguments args) { injectionPathCalled = true; if (!args.dbName.equalsIgnoreCase(replicatedDbName) || (args.constraintTblName != null)) { - LOG.warn("Verifier - DB: " + String.valueOf(args.dbName) - + " Constraint Table: " + String.valueOf(args.constraintTblName)); + LOG.warn("Verifier - DB: " + args.dbName + + " Constraint Table: " + args.constraintTblName); return false; } if (args.tblName != null) { - LOG.warn("Verifier - Table: " + String.valueOf(args.tblName)); + LOG.warn("Verifier - Table: " + args.tblName); return args.tblName.equalsIgnoreCase(tbl); } return true; http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java index 92f2456..bf4154c 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java @@ -55,11 +55,8 @@ import org.apache.hive.hcatalog.api.repl.ReplicationV1CompatRule; import org.apache.hive.hcatalog.listener.DbNotificationListener; import org.codehaus.plexus.util.ExceptionUtils; import org.slf4j.Logger; -import org.apache.hadoop.hive.ql.exec.Utilities; import java.io.Closeable; -import java.io.File; -import java.io.FileWriter; import java.io.IOException; import java.net.URI; import java.util.ArrayList; @@ -75,20 +72,18 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; public class WarehouseInstance implements Closeable { - final String functionsRoot; + final String functionsRoot, repldDir; private Logger logger; private IDriver driver; HiveConf hiveConf; MiniDFSCluster miniDFSCluster; private HiveMetaStoreClient client; - public final Path warehouseRoot; - public final Path externalTableWarehouseRoot; - public Path avroSchemaFile; + final Path warehouseRoot; + final Path externalTableWarehouseRoot; private static int uniqueIdentifier = 0; private final static String LISTENER_CLASS = DbNotificationListener.class.getCanonicalName(); - private final static String AVRO_SCHEMA_FILE_NAME = "avro_table.avsc"; WarehouseInstance(Logger logger, MiniDFSCluster cluster, Map<String, String> overridesForHiveConf, String keyNameForEncryptedZone) throws Exception { @@ -106,8 +101,14 @@ public class WarehouseInstance implements Closeable { } Path cmRootPath = mkDir(fs, "/cmroot" + uniqueIdentifier); this.functionsRoot = mkDir(fs, "/functions" + uniqueIdentifier).toString(); - initialize(cmRootPath.toString(), warehouseRoot.toString(), externalTableWarehouseRoot.toString(), - overridesForHiveConf); + String tmpDir = "/tmp/" + + TestReplicationScenarios.class.getCanonicalName().replace('.', '_') + + "_" + + System.nanoTime(); + + this.repldDir = mkDir(fs, tmpDir + "/hrepl" + uniqueIdentifier + "/").toString(); + initialize(cmRootPath.toString(), externalTableWarehouseRoot.toString(), + warehouseRoot.toString(), overridesForHiveConf); } WarehouseInstance(Logger logger, MiniDFSCluster cluster, @@ -115,18 +116,13 @@ public class WarehouseInstance implements Closeable { this(logger, cluster, overridesForHiveConf, null); } - private void initialize(String cmRoot, String warehouseRoot, String externalTableWarehouseRoot, + private void initialize(String cmRoot, String externalTableWarehouseRoot, String warehouseRoot, Map<String, String> overridesForHiveConf) throws Exception { hiveConf = new HiveConf(miniDFSCluster.getConfiguration(0), TestReplicationScenarios.class); for (Map.Entry<String, String> entry : overridesForHiveConf.entrySet()) { hiveConf.set(entry.getKey(), entry.getValue()); } String metaStoreUri = System.getProperty("test." + HiveConf.ConfVars.METASTOREURIS.varname); - String hiveWarehouseLocation = System.getProperty("test.warehouse.dir", "/tmp") - + Path.SEPARATOR - + TestReplicationScenarios.class.getCanonicalName().replace('.', '_') - + "_" - + System.nanoTime(); if (metaStoreUri != null) { hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, metaStoreUri); return; @@ -143,8 +139,7 @@ public class WarehouseInstance implements Closeable { hiveConf.setVar(HiveConf.ConfVars.REPL_FUNCTIONS_ROOT_DIR, functionsRoot); hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, "jdbc:derby:memory:${test.tmp.dir}/APP;create=true"); - hiveConf.setVar(HiveConf.ConfVars.REPLDIR, - hiveWarehouseLocation + "/hrepl" + uniqueIdentifier + "/"); + hiveConf.setVar(HiveConf.ConfVars.REPLDIR, this.repldDir); hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); @@ -158,11 +153,6 @@ public class WarehouseInstance implements Closeable { MetaStoreTestUtils.startMetaStoreWithRetry(hiveConf, true); - Path testPath = new Path(hiveWarehouseLocation); - FileSystem testPathFileSystem = FileSystem.get(testPath.toUri(), hiveConf); - testPathFileSystem.mkdirs(testPath); - - avroSchemaFile = createAvroSchemaFile(testPathFileSystem, testPath); driver = DriverFactory.newDriver(hiveConf); SessionState.start(new CliSessionState(hiveConf)); client = new HiveMetaStoreClient(hiveConf); @@ -177,53 +167,10 @@ public class WarehouseInstance implements Closeable { private Path mkDir(DistributedFileSystem fs, String pathString) throws IOException, SemanticException { Path path = new Path(pathString); - fs.mkdir(path, new FsPermission("777")); + fs.mkdirs(path, new FsPermission("777")); return PathBuilder.fullyQualifiedHDFSUri(path, fs); } - private Path createAvroSchemaFile(FileSystem fs, Path testPath) throws IOException { - Path schemaFile = new Path(testPath, AVRO_SCHEMA_FILE_NAME); - String[] schemaVals = new String[] { "{", - " \"type\" : \"record\",", - " \"name\" : \"table1\",", - " \"doc\" : \"Sqoop import of table1\",", - " \"fields\" : [ {", - " \"name\" : \"col1\",", - " \"type\" : [ \"null\", \"string\" ],", - " \"default\" : null,", - " \"columnName\" : \"col1\",", - " \"sqlType\" : \"12\"", - " }, {", - " \"name\" : \"col2\",", - " \"type\" : [ \"null\", \"long\" ],", - " \"default\" : null,", - " \"columnName\" : \"col2\",", - " \"sqlType\" : \"13\"", - " } ],", - " \"tableName\" : \"table1\"", - "}" - }; - createTestDataFile(schemaFile.toUri().getPath(), schemaVals); - return schemaFile; - } - - private void createTestDataFile(String filename, String[] lines) throws IOException { - FileWriter writer = null; - try { - File file = new File(filename); - file.deleteOnExit(); - writer = new FileWriter(file); - int i=0; - for (String line : lines) { - writer.write(line + "\n"); - } - } finally { - if (writer != null) { - writer.close(); - } - } - } - public HiveConf getConf() { return hiveConf; } http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/Context.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index aabc34d..18089d5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -702,14 +702,13 @@ public class Context { */ public Path getExternalTmpPath(Path path) { URI extURI = path.toUri(); - if (extURI.getScheme().equals("viewfs")) { + if ("viewfs".equals(extURI.getScheme())) { // if we are on viewfs we don't want to use /tmp as tmp dir since rename from /tmp/.. // to final /user/hive/warehouse/ will fail later, so instead pick tmp dir // on same namespace as tbl dir. return getExtTmpPathRelTo(path.getParent()); } - return new Path(getExternalScratchDir(extURI), EXT_PREFIX + - nextPathId()); + return new Path(getExternalScratchDir(extURI), EXT_PREFIX + nextPathId()); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java index 47a802f..40cc576 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java @@ -56,6 +56,10 @@ import org.apache.hadoop.hive.ql.plan.TezWork; import com.google.common.annotations.VisibleForTesting; +import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork; +import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyTask; + + /** * TaskFactory implementation. **/ @@ -113,6 +117,7 @@ public final class TaskFactory { taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class)); taskvec.add(new TaskTuple<ExportWork>(ExportWork.class, ExportTask.class)); taskvec.add(new TaskTuple<ReplTxnWork>(ReplTxnWork.class, ReplTxnTask.class)); + taskvec.add(new TaskTuple<DirCopyWork>(DirCopyWork.class, DirCopyTask.class)); } private static ThreadLocal<Integer> tid = new ThreadLocal<Integer>() { http://git-wip-us.apache.org/repos/asf/hive/blob/b3ef75ea/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java new file mode 100644 index 0000000..efecdb8 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.repl; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; +import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; +import org.apache.hadoop.hive.ql.plan.Explain; +import org.apache.hadoop.hive.ql.plan.api.StageType; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.Utils; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +public class ExternalTableCopyTaskBuilder { + private static final Logger LOG = LoggerFactory.getLogger(ExternalTableCopyTaskBuilder.class); + private final ReplLoadWork work; + private final HiveConf conf; + + ExternalTableCopyTaskBuilder(ReplLoadWork work, HiveConf conf) { + this.work = work; + this.conf = conf; + } + + List<Task<? extends Serializable>> tasks(TaskTracker tracker) { + List<Task<? extends Serializable>> tasks = new ArrayList<>(); + Iterator<DirCopyWork> itr = work.getPathsToCopyIterator(); + while (tracker.canAddMoreTasks() && itr.hasNext()) { + DirCopyWork dirCopyWork = itr.next(); + Task<DirCopyWork> task = TaskFactory.get(dirCopyWork, conf); + tasks.add(task); + tracker.addTask(task); + LOG.debug("added task for {}", dirCopyWork); + } + return tasks; + } + + public static class DirCopyTask extends Task<DirCopyWork> implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(DirCopyTask.class); + private static final int MAX_COPY_RETRY = 5; + + @Override + protected int execute(DriverContext driverContext) { + String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + + Path sourcePath = work.fullyQualifiedSourcePath; + Path targetPath = work.fullyQualifiedTargetPath; + if (conf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) { + sourcePath = reservedRawPath(work.fullyQualifiedSourcePath.toUri()); + targetPath = reservedRawPath(work.fullyQualifiedTargetPath.toUri()); + } + int currentRetry = 0; + while (currentRetry < MAX_COPY_RETRY) { + try { + UserGroupInformation ugi = Utils.getUGI(); + String currentUser = ugi.getShortUserName(); + boolean usePrivilegedUser = + distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser); + + // do we create a new conf and only here provide this additional option so that we get away from + // differences of data in two location for the same directories ? + // basically add distcp.options.delete to hiveconf new object ? + FileUtils.distCp( + sourcePath.getFileSystem(conf), // source file system + Collections.singletonList(sourcePath), // list of source paths + targetPath, + false, + usePrivilegedUser ? distCpDoAsUser : null, + conf, + ShimLoader.getHadoopShims()); + return 0; + } catch (Exception e) { + if (++currentRetry < MAX_COPY_RETRY) { + LOG.warn("unable to copy", e); + } else { + LOG.error("unable to copy {} to {}", sourcePath, targetPath, e); + setException(e); + return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode(); + } + } + } + LOG.error("should never come here "); + return -1; + } + + private static Path reservedRawPath(URI uri) { + return new Path(uri.getScheme(), uri.getAuthority(), + CopyUtils.RAW_RESERVED_VIRTUAL_PATH + uri.getPath()); + } + + @Override + public StageType getType() { + return StageType.REPL_INCREMENTAL_LOAD; + } + + @Override + public String getName() { + return "DIR_COPY_TASK"; + } + } + + @Explain(displayName = "HDFS Copy Operator", explainLevels = { Explain.Level.USER, + Explain.Level.DEFAULT, + Explain.Level.EXTENDED }) + public static class DirCopyWork implements Serializable { + private final Path fullyQualifiedSourcePath, fullyQualifiedTargetPath; + + public DirCopyWork(Path fullyQualifiedSourcePath, Path fullyQualifiedTargetPath) { + this.fullyQualifiedSourcePath = fullyQualifiedSourcePath; + this.fullyQualifiedTargetPath = fullyQualifiedTargetPath; + } + + @Override + public String toString() { + return "DirCopyWork{" + + "fullyQualifiedSourcePath=" + fullyQualifiedSourcePath + + ", fullyQualifiedTargetPath=" + fullyQualifiedTargetPath + + '}'; + } + } +}