This is an automated email from the ASF dual-hosted git repository. anishek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new d5fb71f HIVE-23605: 'Wrong FS' error during _external_tables_info creation when staging location is remote(Pravin Kumar Sinha, reviewed by Aasha Medhi) d5fb71f is described below commit d5fb71fe4b549002302be99f078b3eec98fb037a Author: Anishek Agarwal <anis...@gmail.com> AuthorDate: Mon Jun 8 16:55:19 2020 +0530 HIVE-23605: 'Wrong FS' error during _external_tables_info creation when staging location is remote(Pravin Kumar Sinha, reviewed by Aasha Medhi) --- .../ql/parse/BaseReplicationAcrossInstances.java | 37 +++- .../hadoop/hive/ql/parse/ReplicationTestUtils.java | 7 +- .../TestReplicationScenariosExclusiveReplica.java | 205 +++++++++++++++++++++ .../hive/ql/exec/repl/ReplExternalTables.java | 2 +- 4 files changed, 245 insertions(+), 6 deletions(-) diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java index b805b19..6b96d2b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java @@ -30,6 +30,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.file.Files; import java.util.HashMap; import java.util.Map; @@ -43,7 +44,8 @@ public class BaseReplicationAcrossInstances { static WarehouseInstance primary; static WarehouseInstance replica; String primaryDbName, replicatedDbName; - static HiveConf conf; + static HiveConf conf; // for primary + static HiveConf replicaConf; static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz) throws Exception { @@ -60,6 +62,39 @@ public class BaseReplicationAcrossInstances { primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides); localOverrides.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir); replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides); + replicaConf = conf; + } + + static void internalBeforeClassSetupExclusiveReplica(Map<String, String> primaryOverrides, + Map<String, String> replicaOverrides, Class clazz) + throws Exception { + conf = new HiveConf(clazz); + conf.set("dfs.client.use.datanode.hostname", "true"); + conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); + String primaryBaseDir = Files.createTempDirectory("base").toFile().getAbsolutePath(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, primaryBaseDir); + MiniDFSCluster miniPrimaryDFSCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build(); + Map<String, String> localOverrides = new HashMap<String, String>() { + { + put("fs.defaultFS", miniPrimaryDFSCluster.getFileSystem().getUri().toString()); + put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true"); + } + }; + localOverrides.putAll(primaryOverrides); + primary = new WarehouseInstance(LOG, miniPrimaryDFSCluster, localOverrides); + String replicaBaseDir = Files.createTempDirectory("replica").toFile().getAbsolutePath(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, replicaBaseDir); + replicaConf = new HiveConf(clazz); + replicaConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, replicaBaseDir); + replicaConf.set("dfs.client.use.datanode.hostname", "true"); + replicaConf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*"); + MiniDFSCluster miniReplicaDFSCluster = + new MiniDFSCluster.Builder(replicaConf).numDataNodes(1).format(true).build(); + localOverrides.clear(); + localOverrides.putAll(replicaOverrides); + localOverrides.put("fs.defaultFS", miniReplicaDFSCluster.getFileSystem().getUri().toString()); + localOverrides.put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true"); + replica = new WarehouseInstance(LOG, miniReplicaDFSCluster, localOverrides); } @AfterClass diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java index e0c3ed2..902c731 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java @@ -531,10 +531,9 @@ public class ReplicationTestUtils { return withClause; } - public static void assertExternalFileInfo(WarehouseInstance primary, - List<String> expected, - Path externalTableInfoFile) throws IOException { - DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem(); + public static void assertExternalFileInfo(WarehouseInstance warehouseInstance, + List<String> expected, Path externalTableInfoFile) throws IOException { + DistributedFileSystem fileSystem = warehouseInstance.miniDFSCluster.getFileSystem(); Assert.assertTrue(fileSystem.exists(externalTableInfoFile)); InputStream inputStream = fileSystem.open(externalTableInfoFile); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java new file mode 100644 index 0000000..371f90b --- /dev/null +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.parse; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder; +import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME; + +/** + * Test replication scenarios with staging on replica. + */ +public class TestReplicationScenariosExclusiveReplica extends BaseReplicationAcrossInstances { + + private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base"; + + @BeforeClass + public static void classLevelSetup() throws Exception { + Map<String, String> overrides = new HashMap<>(); + overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(), + GzipJSONMessageEncoder.class.getCanonicalName()); + overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false"); + overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname, "true"); + overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname, + UserGroupInformation.getCurrentUser().getUserName()); + internalBeforeClassSetupExclusiveReplica(overrides, overrides, TestReplicationScenarios.class); + } + + @Before + public void setup() throws Throwable { + super.setup(); + } + + @After + public void tearDown() throws Throwable { + super.tearDown(); + } + + @Test + public void externalTableReplicationWithRemoteStaging() throws Throwable { + List<String> withClauseOptions = getStagingLocationConfig(replica.repldDir); + withClauseOptions.addAll(externalTableBasePathWithClause()); + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (100)") + .run("create table t2 (id int)") + .run("insert into table t2 values (200)") + .dump(primaryDbName, withClauseOptions); + + // verify that the external table info is written correctly for bootstrap + assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, replica); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1") + .verifyResult("100") + .run("show tables like 't2'") + .verifyResult("t2") + .run("select id from t2") + .verifyResult("200"); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t3 (id int)") + .run("insert into table t3 values (300)") + .run("create table t4 (id int)") + .run("insert into table t4 values (400)") + .dump(primaryDbName, withClauseOptions); + + // verify that the external table info is written correctly for incremental + assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, true, replica); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("show tables like 't3'") + .verifyResult("t3") + .run("show tables like 't4'") + .verifyResult("t4") + .run("select id from t1") + .verifyResult("100") + .run("select id from t2") + .verifyResult("200") + .run("select id from t3") + .verifyResult("300") + .run("select id from t4") + .verifyResult("400"); + } + + @Test + public void externalTableReplicationWithLocalStaging() throws Throwable { + List<String> withClauseOptions = getStagingLocationConfig(primary.repldDir); + withClauseOptions.addAll(externalTableBasePathWithClause()); + WarehouseInstance.Tuple tuple = primary + .run("use " + primaryDbName) + .run("create external table t1 (id int)") + .run("insert into table t1 values (500)") + .run("create table t2 (id int)") + .run("insert into table t2 values (600)") + .dump(primaryDbName, withClauseOptions); + + // verify that the external table info is written correctly for bootstrap + assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false, primary); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("select id from t1") + .verifyResult("500") + .run("show tables like 't2'") + .verifyResult("t2") + .run("select id from t2") + .verifyResult("600"); + + tuple = primary.run("use " + primaryDbName) + .run("create external table t3 (id int)") + .run("insert into table t3 values (700)") + .run("create table t4 (id int)") + .run("insert into table t4 values (800)") + .dump(primaryDbName, withClauseOptions); + + // verify that the external table info is written correctly for incremental + assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation, true, primary); + + replica.load(replicatedDbName, primaryDbName, withClauseOptions) + .run("use " + replicatedDbName) + .run("show tables like 't1'") + .verifyResult("t1") + .run("show tables like 't2'") + .verifyResult("t2") + .run("show tables like 't3'") + .verifyResult("t3") + .run("show tables like 't4'") + .verifyResult("t4") + .run("select id from t1") + .verifyResult("500") + .run("select id from t2") + .verifyResult("600") + .run("select id from t3") + .verifyResult("700") + .run("select id from t4") + .verifyResult("800"); + } + + private List<String> getStagingLocationConfig(String stagingLoc) { + List<String> confList = new ArrayList<>(); + confList.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + stagingLoc + "'"); + return confList; + } + + private List<String> externalTableBasePathWithClause() throws IOException, SemanticException { + return ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica); + } + + private void assertExternalFileInfo(List<String> expected, String dumplocation, boolean isIncremental, + WarehouseInstance warehouseInstance) + throws IOException { + Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR); + Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME); + Path externalTableInfoFile; + if (isIncremental) { + externalTableInfoFile = new Path(hivePath, FILE_NAME); + } else { + externalTableInfoFile = new Path(metadataPath, primaryDbName.toLowerCase() + File.separator + FILE_NAME); + } + ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected, externalTableInfoFile); + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java index fddee28..0fdd1bf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java @@ -110,7 +110,7 @@ public final class ReplExternalTables { dumpMetadataOnly = hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) || hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE); if (shouldWrite()) { - this.writer = FileSystem.get(hiveConf).create(writePath); + this.writer = writePath.getFileSystem(hiveConf).create(writePath); } }