This is an automated email from the ASF dual-hosted git repository.
anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new d5fb71f HIVE-23605: 'Wrong FS' error during _external_tables_info
creation when staging location is remote(Pravin Kumar Sinha, reviewed by Aasha
Medhi)
d5fb71f is described below
commit d5fb71fe4b549002302be99f078b3eec98fb037a
Author: Anishek Agarwal <[email protected]>
AuthorDate: Mon Jun 8 16:55:19 2020 +0530
HIVE-23605: 'Wrong FS' error during _external_tables_info creation when
staging location is remote(Pravin Kumar Sinha, reviewed by Aasha Medhi)
---
.../ql/parse/BaseReplicationAcrossInstances.java | 37 +++-
.../hadoop/hive/ql/parse/ReplicationTestUtils.java | 7 +-
.../TestReplicationScenariosExclusiveReplica.java | 205 +++++++++++++++++++++
.../hive/ql/exec/repl/ReplExternalTables.java | 2 +-
4 files changed, 245 insertions(+), 6 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java
index b805b19..6b96d2b 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java
@@ -30,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;
@@ -43,7 +44,8 @@ public class BaseReplicationAcrossInstances {
static WarehouseInstance primary;
static WarehouseInstance replica;
String primaryDbName, replicatedDbName;
- static HiveConf conf;
+ static HiveConf conf; // for primary
+ static HiveConf replicaConf;
static void internalBeforeClassSetup(Map<String, String> overrides, Class
clazz)
throws Exception {
@@ -60,6 +62,39 @@ public class BaseReplicationAcrossInstances {
primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
localOverrides.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(),
primary.repldDir);
replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
+ replicaConf = conf;
+ }
+
+ static void internalBeforeClassSetupExclusiveReplica(Map<String, String>
primaryOverrides,
+ Map<String, String>
replicaOverrides, Class clazz)
+ throws Exception {
+ conf = new HiveConf(clazz);
+ conf.set("dfs.client.use.datanode.hostname", "true");
+ conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() +
".hosts", "*");
+ String primaryBaseDir =
Files.createTempDirectory("base").toFile().getAbsolutePath();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, primaryBaseDir);
+ MiniDFSCluster miniPrimaryDFSCluster = new
MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+ Map<String, String> localOverrides = new HashMap<String, String>() {
+ {
+ put("fs.defaultFS",
miniPrimaryDFSCluster.getFileSystem().getUri().toString());
+ put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
+ }
+ };
+ localOverrides.putAll(primaryOverrides);
+ primary = new WarehouseInstance(LOG, miniPrimaryDFSCluster,
localOverrides);
+ String replicaBaseDir =
Files.createTempDirectory("replica").toFile().getAbsolutePath();
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, replicaBaseDir);
+ replicaConf = new HiveConf(clazz);
+ replicaConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, replicaBaseDir);
+ replicaConf.set("dfs.client.use.datanode.hostname", "true");
+ replicaConf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() +
".hosts", "*");
+ MiniDFSCluster miniReplicaDFSCluster =
+ new
MiniDFSCluster.Builder(replicaConf).numDataNodes(1).format(true).build();
+ localOverrides.clear();
+ localOverrides.putAll(replicaOverrides);
+ localOverrides.put("fs.defaultFS",
miniReplicaDFSCluster.getFileSystem().getUri().toString());
+ localOverrides.put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
+ replica = new WarehouseInstance(LOG, miniReplicaDFSCluster,
localOverrides);
}
@AfterClass
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java
index e0c3ed2..902c731 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java
@@ -531,10 +531,9 @@ public class ReplicationTestUtils {
return withClause;
}
- public static void assertExternalFileInfo(WarehouseInstance primary,
- List<String> expected,
- Path externalTableInfoFile) throws
IOException {
- DistributedFileSystem fileSystem = primary.miniDFSCluster.getFileSystem();
+ public static void assertExternalFileInfo(WarehouseInstance
warehouseInstance,
+ List<String> expected, Path
externalTableInfoFile) throws IOException {
+ DistributedFileSystem fileSystem =
warehouseInstance.miniDFSCluster.getFileSystem();
Assert.assertTrue(fileSystem.exists(externalTableInfoFile));
InputStream inputStream = fileSystem.open(externalTableInfoFile);
BufferedReader reader = new BufferedReader(new
InputStreamReader(inputStream));
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
new file mode 100644
index 0000000..371f90b
--- /dev/null
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExclusiveReplica.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.FILE_NAME;
+
+/**
+ * Test replication scenarios with staging on replica.
+ */
+public class TestReplicationScenariosExclusiveReplica extends
BaseReplicationAcrossInstances {
+
+ private static final String REPLICA_EXTERNAL_BASE = "/replica_external_base";
+
+ @BeforeClass
+ public static void classLevelSetup() throws Exception {
+ Map<String, String> overrides = new HashMap<>();
+ overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+ GzipJSONMessageEncoder.class.getCanonicalName());
+ overrides.put(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY.varname, "false");
+ overrides.put(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname,
"true");
+ overrides.put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
+ UserGroupInformation.getCurrentUser().getUserName());
+ internalBeforeClassSetupExclusiveReplica(overrides, overrides,
TestReplicationScenarios.class);
+ }
+
+ @Before
+ public void setup() throws Throwable {
+ super.setup();
+ }
+
+ @After
+ public void tearDown() throws Throwable {
+ super.tearDown();
+ }
+
+ @Test
+ public void externalTableReplicationWithRemoteStaging() throws Throwable {
+ List<String> withClauseOptions =
getStagingLocationConfig(replica.repldDir);
+ withClauseOptions.addAll(externalTableBasePathWithClause());
+ WarehouseInstance.Tuple tuple = primary
+ .run("use " + primaryDbName)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (100)")
+ .run("create table t2 (id int)")
+ .run("insert into table t2 values (200)")
+ .dump(primaryDbName, withClauseOptions);
+
+ // verify that the external table info is written correctly for bootstrap
+ assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false,
replica);
+
+ replica.load(replicatedDbName, primaryDbName, withClauseOptions)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("select id from t1")
+ .verifyResult("100")
+ .run("show tables like 't2'")
+ .verifyResult("t2")
+ .run("select id from t2")
+ .verifyResult("200");
+
+ tuple = primary.run("use " + primaryDbName)
+ .run("create external table t3 (id int)")
+ .run("insert into table t3 values (300)")
+ .run("create table t4 (id int)")
+ .run("insert into table t4 values (400)")
+ .dump(primaryDbName, withClauseOptions);
+
+ // verify that the external table info is written correctly for incremental
+ assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation,
true, replica);
+
+ replica.load(replicatedDbName, primaryDbName, withClauseOptions)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("show tables like 't2'")
+ .verifyResult("t2")
+ .run("show tables like 't3'")
+ .verifyResult("t3")
+ .run("show tables like 't4'")
+ .verifyResult("t4")
+ .run("select id from t1")
+ .verifyResult("100")
+ .run("select id from t2")
+ .verifyResult("200")
+ .run("select id from t3")
+ .verifyResult("300")
+ .run("select id from t4")
+ .verifyResult("400");
+ }
+
+ @Test
+ public void externalTableReplicationWithLocalStaging() throws Throwable {
+ List<String> withClauseOptions =
getStagingLocationConfig(primary.repldDir);
+ withClauseOptions.addAll(externalTableBasePathWithClause());
+ WarehouseInstance.Tuple tuple = primary
+ .run("use " + primaryDbName)
+ .run("create external table t1 (id int)")
+ .run("insert into table t1 values (500)")
+ .run("create table t2 (id int)")
+ .run("insert into table t2 values (600)")
+ .dump(primaryDbName, withClauseOptions);
+
+ // verify that the external table info is written correctly for bootstrap
+ assertExternalFileInfo(Arrays.asList("t1"), tuple.dumpLocation, false,
primary);
+
+ replica.load(replicatedDbName, primaryDbName, withClauseOptions)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("select id from t1")
+ .verifyResult("500")
+ .run("show tables like 't2'")
+ .verifyResult("t2")
+ .run("select id from t2")
+ .verifyResult("600");
+
+ tuple = primary.run("use " + primaryDbName)
+ .run("create external table t3 (id int)")
+ .run("insert into table t3 values (700)")
+ .run("create table t4 (id int)")
+ .run("insert into table t4 values (800)")
+ .dump(primaryDbName, withClauseOptions);
+
+ // verify that the external table info is written correctly for incremental
+ assertExternalFileInfo(Arrays.asList("t1", "t3"), tuple.dumpLocation,
true, primary);
+
+ replica.load(replicatedDbName, primaryDbName, withClauseOptions)
+ .run("use " + replicatedDbName)
+ .run("show tables like 't1'")
+ .verifyResult("t1")
+ .run("show tables like 't2'")
+ .verifyResult("t2")
+ .run("show tables like 't3'")
+ .verifyResult("t3")
+ .run("show tables like 't4'")
+ .verifyResult("t4")
+ .run("select id from t1")
+ .verifyResult("500")
+ .run("select id from t2")
+ .verifyResult("600")
+ .run("select id from t3")
+ .verifyResult("700")
+ .run("select id from t4")
+ .verifyResult("800");
+ }
+
+ private List<String> getStagingLocationConfig(String stagingLoc) {
+ List<String> confList = new ArrayList<>();
+ confList.add("'" + HiveConf.ConfVars.REPLDIR.varname + "'='" + stagingLoc
+ "'");
+ return confList;
+ }
+
+ private List<String> externalTableBasePathWithClause() throws IOException,
SemanticException {
+ return
ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE,
replica);
+ }
+
+ private void assertExternalFileInfo(List<String> expected, String
dumplocation, boolean isIncremental,
+ WarehouseInstance warehouseInstance)
+ throws IOException {
+ Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR);
+ Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME);
+ Path externalTableInfoFile;
+ if (isIncremental) {
+ externalTableInfoFile = new Path(hivePath, FILE_NAME);
+ } else {
+ externalTableInfoFile = new Path(metadataPath,
primaryDbName.toLowerCase() + File.separator + FILE_NAME);
+ }
+ ReplicationTestUtils.assertExternalFileInfo(warehouseInstance, expected,
externalTableInfoFile);
+ }
+}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
index fddee28..0fdd1bf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
@@ -110,7 +110,7 @@ public final class ReplExternalTables {
dumpMetadataOnly =
hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY) ||
hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE);
if (shouldWrite()) {
- this.writer = FileSystem.get(hiveConf).create(writePath);
+ this.writer = writePath.getFileSystem(hiveConf).create(writePath);
}
}