This is an automated email from the ASF dual-hosted git repository.
pravin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 52a0399 HIVE-25609: Preserve XAttrs in normal file copy case.
(Haymant Mangla, reviewed by Ayush Saxena)
52a0399 is described below
commit 52a0399b146814e8a6a7c38b1c5f6e215b7851fb
Author: Haymant Mangla <[email protected]>
AuthorDate: Wed Dec 1 14:44:18 2021 +0530
HIVE-25609: Preserve XAttrs in normal file copy case. (Haymant Mangla,
reviewed by Ayush Saxena)
---
.../org/apache/hadoop/hive/common/FileUtils.java | 145 ++++++++++++++++++++-
.../apache/hadoop/hive/common/TestFileUtils.java | 81 ++++++++++++
.../parse/TestReplicationScenariosAcidTables.java | 77 +++++++++++
.../hadoop/hive/ql/parse/repl/CopyUtils.java | 11 +-
.../apache/hadoop/hive/shims/Hadoop23Shims.java | 17 +--
.../java/org/apache/hadoop/hive/shims/Utils.java | 14 ++
6 files changed, 323 insertions(+), 22 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index fdd78cb..d5cf3d6 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -37,14 +37,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
+import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
-
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.GlobFilter;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -53,10 +52,14 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.PathIsDirectoryException;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.ShutdownHookManager;
@@ -661,11 +664,145 @@ public final class FileUtils {
// is tried and it fails. We depend upon that behaviour in cases like
replication,
// wherein if distcp fails, there is good reason to not plod along with
a trivial
// implementation, and fail instead.
- copied = FileUtil.copy(srcFS, src, dstFS, dst, deleteSource, overwrite,
conf);
+ copied = copy(srcFS, srcFS.getFileStatus(src), dstFS, dst, deleteSource,
overwrite, shouldPreserveXAttrs(conf, srcFS, dstFS), conf);
}
return copied;
}
+ public static boolean copy(FileSystem srcFS, FileStatus srcStatus,
FileSystem dstFS, Path dst, boolean deleteSource,
+ boolean overwrite, boolean preserveXAttrs,
Configuration conf) throws IOException {
+ Path src = srcStatus.getPath();
+ dst = checkDest(src.getName(), dstFS, dst, overwrite);
+ if (srcStatus.isDirectory()) {
+ checkDependencies(srcFS, src, dstFS, dst);
+ if (!dstFS.mkdirs(dst)) {
+ return false;
+ }
+
+ RemoteIterator<FileStatus> fileIterator = srcFS.listStatusIterator(src);
+ while(fileIterator.hasNext()) {
+ FileStatus file = fileIterator.next();
+ copy(srcFS, file, dstFS, new Path(dst, file.getPath().getName()),
deleteSource, overwrite, preserveXAttrs, conf);
+ }
+ if (preserveXAttrs) {
+ preserveXAttr(srcFS, src, dstFS, dst);
+ }
+ } else {
+ InputStream in = null;
+ FSDataOutputStream out = null;
+
+ try {
+ in = srcFS.open(src);
+ out = dstFS.create(dst, overwrite);
+ IOUtils.copyBytes(in, out, conf, true);
+ if (preserveXAttrs) {
+ preserveXAttr(srcFS, src, dstFS, dst);
+ }
+ } catch (IOException var11) {
+ IOUtils.closeStream(in);
+ IOUtils.closeStream(out);
+ throw var11;
+ }
+ }
+
+ return deleteSource ? srcFS.delete(src, true) : true;
+ }
+
+ public static boolean copy(FileSystem srcFS, Path[] srcs, FileSystem dstFS,
Path dst, boolean deleteSource, boolean overwrite, boolean preserveXAttr,
Configuration conf) throws IOException {
+ boolean gotException = false;
+ boolean returnVal = true;
+ StringBuilder exceptions = new StringBuilder();
+ if (srcs.length == 1) {
+ return copy(srcFS, srcFS.getFileStatus(srcs[0]), dstFS, dst,
deleteSource, overwrite, preserveXAttr, conf);
+ } else {
+ try {
+ FileStatus sdst = dstFS.getFileStatus(dst);
+ if (!sdst.isDirectory()) {
+ throw new IOException("copying multiple files, but last argument `"
+ dst + "' is not a directory");
+ }
+ } catch (FileNotFoundException var16) {
+ throw new IOException("`" + dst + "': specified destination directory
does not exist", var16);
+ }
+
+ Path[] var17 = srcs;
+ int var11 = srcs.length;
+
+ for(int var12 = 0; var12 < var11; ++var12) {
+ Path src = var17[var12];
+
+ try {
+ if (!copy(srcFS, srcFS.getFileStatus(src), dstFS, dst, deleteSource,
overwrite, preserveXAttr, conf)) {
+ returnVal = false;
+ }
+ } catch (IOException var15) {
+ gotException = true;
+ exceptions.append(var15.getMessage());
+ exceptions.append("\n");
+ }
+ }
+
+ if (gotException) {
+ throw new IOException(exceptions.toString());
+ } else {
+ return returnVal;
+ }
+ }
+ }
+
+ private static void preserveXAttr(FileSystem srcFS, Path src, FileSystem
dstFS, Path dst) throws IOException {
+ for (Map.Entry<String, byte[]> attr : srcFS.getXAttrs(src).entrySet()) {
+ dstFS.setXAttr(dst, attr.getKey(), attr.getValue());
+ }
+ }
+
+ private static Path checkDest(String srcName, FileSystem dstFS, Path dst,
boolean overwrite) throws IOException {
+ FileStatus sdst;
+ try {
+ sdst = dstFS.getFileStatus(dst);
+ } catch (FileNotFoundException var6) {
+ sdst = null;
+ }
+ if (null != sdst) {
+ if (sdst.isDirectory()) {
+ if (null == srcName) {
+ throw new PathIsDirectoryException(dst.toString());
+ }
+
+ return checkDest((String)null, dstFS, new Path(dst, srcName),
overwrite);
+ }
+
+ if (!overwrite) {
+ throw new PathExistsException(dst.toString(), "Target " + dst + "
already exists");
+ }
+ }
+
+ return dst;
+ }
+
+ private static void checkDependencies(FileSystem srcFS, Path src, FileSystem
dstFS, Path dst) throws IOException {
+ if (srcFS == dstFS) {
+ String srcq = srcFS.makeQualified(src).toString() + "/";
+ String dstq = dstFS.makeQualified(dst).toString() + "/";
+ if (dstq.startsWith(srcq)) {
+ throw new IOException((srcq.length() == dstq.length()) ?
+ "Cannot copy " + src + " to itself." : "Cannot copy " + src +
" to its subdirectory " + dst);
+ }
+ }
+ }
+
+ public static boolean shouldPreserveXAttrs(HiveConf conf, FileSystem srcFS,
FileSystem dstFS) throws IOException {
+ if (!Utils.checkFileSystemXAttrSupport(srcFS) ||
!Utils.checkFileSystemXAttrSupport(dstFS)){
+ return false;
+ }
+ for (Map.Entry<String,String> entry :
conf.getPropsWithPrefix(Utils.DISTCP_OPTIONS_PREFIX).entrySet()) {
+ String distCpOption = entry.getKey();
+ if (distCpOption.startsWith("p")) {
+ return distCpOption.contains("x");
+ }
+ }
+ return true;
+ }
+
public static boolean distCp(FileSystem srcFS, List<Path> srcPaths, Path dst,
boolean deleteSource, UserGroupInformation proxyUser,
HiveConf conf, HadoopShims shims) throws IOException {
diff --git
a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/common/TestFileUtils.java
b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/common/TestFileUtils.java
index 47af79c..fa91c2c 100644
---
a/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/common/TestFileUtils.java
+++
b/itests/hive-unit-hadoop2/src/test/java/org/apache/hadoop/hive/common/TestFileUtils.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.common;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.shims.HadoopShims;
@@ -27,6 +29,8 @@ import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -75,6 +79,83 @@ public class TestFileUtils {
}
@Test
+ public void testXAttrsPreserved() throws Exception {
+ //Case 1) src and dst are files.
+ Path src = new Path(basePath, "src.txt");
+ fs.create(src).close();
+ setXAttrsRecursive(src);
+ Path dst = new Path(basePath, "dst.txt");
+ Assert.assertFalse(fs.exists(dst));
+ Assert.assertTrue(FileUtils.copy(fs, fs.getFileStatus(src), fs, dst,
false, true, true, conf));
+ Assert.assertTrue(fs.exists(dst));
+ verifyXAttrsPreserved(src, dst);
+ //Case 2) src is file and dst directory does not exist.
+ dst = new Path(basePath, "dummyDstDir");
+ Assert.assertFalse(fs.exists(dst));
+ Assert.assertTrue(FileUtils.copy(fs, fs.getFileStatus(src), fs, dst,
false, true, true, conf));
+ Assert.assertTrue(fs.exists(dst));
+ Assert.assertTrue(fs.exists(new Path(dst, new Path(basePath, "src.txt"))));
+ verifyXAttrsPreserved(src, dst);
+ //Case 3) src is a file and dst directory exists.
+ dst = new Path(basePath, "dummyDstDir1");
+ fs.mkdirs(dst);
+ Assert.assertTrue(fs.exists(dst));
+ Assert.assertTrue(FileUtils.copy(fs, fs.getFileStatus(src), fs, dst,
false, true, true, conf));
+ Assert.assertTrue(fs.exists(dst));
+ Assert.assertTrue(fs.exists(new Path(dst, "src.txt")));
+ verifyXAttrsPreserved(src, new Path(dst, "src.txt"));
+ //Case 4) src & dst are directories and dst does not exist.
+ src = new Path(basePath, "dummySrcDir2");
+ dst = new Path(basePath, "dummyDstDir2");
+ fs.create(new Path(src, "src.txt"));
+ setXAttrsRecursive(src);
+ Assert.assertFalse(fs.exists(dst));
+ Assert.assertTrue(FileUtils.copy(fs, fs.getFileStatus(src), fs, dst,
false, true, true, conf));
+ Assert.assertTrue(fs.exists(dst));
+ Assert.assertTrue(fs.exists(new Path(dst, "src.txt")));
+ verifyXAttrsPreserved(src, dst);
+ //Case 5) src & dst are directories and dst directory exists
+ src = new Path(basePath, "dummySrcDir3");
+ dst = new Path(basePath, "dummyDstDir3");
+ fs.create(new Path(src, "src.txt"));
+ fs.mkdirs(dst);
+ setXAttrsRecursive(src);
+ Assert.assertTrue(fs.exists(dst));
+ Assert.assertTrue(FileUtils.copy(fs, fs.getFileStatus(src), fs, dst,
false, true, true, conf));
+ Assert.assertTrue(fs.exists(new Path(dst, "dummySrcDir3/src.txt")));
+ verifyXAttrsPreserved(src, new Path(dst, src.getName()));
+ }
+
+ private void verifyXAttrsPreserved(Path src, Path dst) throws Exception {
+ FileStatus srcStatus = fs.getFileStatus(src);
+ FileStatus dstStatus = fs.getFileStatus(dst);
+ if (srcStatus.isDirectory()) {
+ Assert.assertTrue(dstStatus.isDirectory());
+ for(FileStatus srcContent: fs.listStatus(src)) {
+ Path dstContent = new Path(dst, srcContent.getPath().getName());
+ Assert.assertTrue(fs.exists(dstContent));
+ verifyXAttrsPreserved(srcContent.getPath(), dstContent);
+ }
+ } else {
+ Assert.assertFalse(dstStatus.isDirectory());
+ }
+ Map<String, byte[]> values = fs.getXAttrs(dst);
+ for(Map.Entry<String, byte[]> value : fs.getXAttrs(src).entrySet()) {
+ Assert.assertEquals(new String(value.getValue()), new
String(values.get(value.getKey())));
+ }
+ }
+
+ private void setXAttrsRecursive(Path path) throws Exception {
+ if (fs.getFileStatus(path).isDirectory()) {
+ RemoteIterator<FileStatus> content = fs.listStatusIterator(path);
+ while(content.hasNext()) {
+ setXAttrsRecursive(content.next().getPath());
+ }
+ }
+ fs.setXAttr(path, "user.random", "value".getBytes(StandardCharsets.UTF_8));
+ }
+
+ @Test
public void testCopyWithDistcp() throws IOException {
String file1Name = "file1.txt";
String file2Name = "file2.txt";
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index f997f84..d05ff85 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.repl.ReplConst;
@@ -29,6 +30,7 @@ import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
+import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import
org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -43,6 +45,7 @@ import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import org.apache.hadoop.hive.ql.parse.repl.load.FailoverMetaData;
@@ -65,6 +68,7 @@ import java.io.File;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -989,6 +993,79 @@ public class TestReplicationScenariosAcidTables extends
BaseReplicationScenarios
}
@Test
+ public void testXAttrsPreserved() throws Throwable {
+ String nonTxnTable = "nonTxnTable";
+ String unptnedTable = "unptnedTable";
+ String ptnedTable = "ptnedTable";
+ String clusteredTable = "clusteredTable";
+ String clusteredAndPtnedTable = "clusteredAndPtnedTable";
+ primary.run("use " + primaryDbName)
+ .run("create table " + nonTxnTable + " (id int)")
+ .run("create table " + unptnedTable + " (id int) stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("create table " + ptnedTable + " (id int) partitioned by
(name string) stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("create table " + clusteredTable + " (id int) clustered by
(id) into 3 buckets stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("create table " + clusteredAndPtnedTable + " (id int)
partitioned by (name string) clustered by(id)" +
+ "into 3 buckets stored as orc tblproperties
(\"transactional\"=\"true\")")
+ .run("insert into " + nonTxnTable + " values (2)").run("INSERT
into " + unptnedTable + " values(1)")
+ .run("INSERT into " + ptnedTable + " values(2,
'temp')").run("INSERT into " + clusteredTable + " values(1)")
+ .run("INSERT into " + clusteredAndPtnedTable + " values(1,
'temp')");
+ for (String table : primary.getAllTables(primaryDbName)) {
+ org.apache.hadoop.hive.ql.metadata.Table tb =
Hive.get(primary.hiveConf).getTable(primaryDbName, table);
+ if (tb.isPartitioned()) {
+ List<Partition> partitions = primary.getAllPartitions(primaryDbName,
table);
+ for (Partition partition: partitions) {
+ Path partitionLoc = new Path(partition.getSd().getLocation());
+ FileSystem fs = partitionLoc.getFileSystem(conf);
+ setXAttrsRecursive(fs, partitionLoc, true);
+ }
+ } else {
+ Path tablePath = tb.getDataLocation();
+ FileSystem fs = tablePath.getFileSystem(conf);
+ setXAttrsRecursive(fs, tablePath, true);
+ }
+ }
+ primary.dump(primaryDbName);
+ replica.load(replicatedDbName, primaryDbName);
+ Path srcDbPath = new
Path(primary.getDatabase(primaryDbName).getLocationUri());
+ Path replicaDbPath = new
Path(primary.getDatabase(replicatedDbName).getLocationUri());
+ verifyXAttrsPreserved(srcDbPath.getFileSystem(conf), srcDbPath,
replicaDbPath);
+ }
+
+ private void setXAttrsRecursive(FileSystem fs, Path path, boolean isParent)
throws Exception {
+ if (fs.getFileStatus(path).isDirectory()) {
+ RemoteIterator<FileStatus> content = fs.listStatusIterator(path);
+ while(content.hasNext()) {
+ setXAttrsRecursive(fs, content.next().getPath(), false);
+ }
+ }
+ if (!isParent) {
+ fs.setXAttr(path, "user.random",
"value".getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ private void verifyXAttrsPreserved(FileSystem fs, Path src, Path dst) throws
Exception {
+ FileStatus srcStatus = fs.getFileStatus(src);
+ FileStatus dstStatus = fs.getFileStatus(dst);
+ if (srcStatus.isDirectory()) {
+ assertTrue(dstStatus.isDirectory());
+ for(FileStatus srcContent: fs.listStatus(src)) {
+ Path dstContent = new Path(dst, srcContent.getPath().getName());
+ assertTrue(fs.exists(dstContent));
+ verifyXAttrsPreserved(fs, srcContent.getPath(), dstContent);
+ }
+ } else {
+ assertFalse(dstStatus.isDirectory());
+ }
+ Map<String, byte[]> values = fs.getXAttrs(dst);
+ for(Map.Entry<String, byte[]> value : fs.getXAttrs(src).entrySet()) {
+ assertEquals(new String(value.getValue()), new
String(values.get(value.getKey())));
+ }
+ }
+
+ @Test
public void testAcidTablesBootstrap() throws Throwable {
// Bootstrap
WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName,
null);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index 43e4ecc..b7eb5b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -109,10 +109,13 @@ public class CopyUtils {
}
@VisibleForTesting
- void copyFilesBetweenFS(FileSystem sourceFs, Path[] paths, FileSystem
destinationFs,
- Path finalDestination, boolean deleteSource,
boolean overwrite) throws IOException {
- retryableFxn(() -> FileUtil
- .copy(sourceFs, paths, destinationFs, finalDestination,
deleteSource, overwrite, hiveConf));
+ void copyFilesBetweenFS(FileSystem srcFS, Path[] paths, FileSystem dstFS,
+ Path dst, boolean deleteSource, boolean
overwrite) throws IOException {
+ retryableFxn(() -> {
+ boolean preserveXAttrs = FileUtils.shouldPreserveXAttrs(hiveConf, srcFS,
dstFS);
+ FileUtils.copy(srcFS, paths, dstFS, dst, deleteSource, overwrite,
preserveXAttrs, hiveConf);
+ return null;
+ });
}
@VisibleForTesting
diff --git
a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index a611dda..a4972fd 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -277,16 +277,6 @@ public class Hadoop23Shims extends HadoopShimsSecure {
equalsIgnoreCase(conf.get(YarnConfiguration.RM_SCHEDULER));
}
- private boolean checkFileSystemXAttrSupport(FileSystem fs) throws
IOException {
- try {
- fs.getXAttrs(new Path(Path.SEPARATOR));
- return true;
- } catch (UnsupportedOperationException e) {
- LOG.warn("XAttr won't be preserved since it is not supported for file
system: " + fs.getUri());
- return false;
- }
- }
-
/**
* Returns a shim to wrap MiniMrCluster
*/
@@ -1110,8 +1100,6 @@ public class Hadoop23Shims extends HadoopShimsSecure {
}
}
- private static final String DISTCP_OPTIONS_PREFIX = "distcp.options.";
-
List<String> constructDistCpParams(List<Path> srcPaths, Path dst,
Configuration conf) throws IOException {
// -update and -delete are mandatory options for directory copy to work.
List<String> params = constructDistCpDefaultParams(conf,
dst.getFileSystem(conf),
@@ -1130,7 +1118,7 @@ public class Hadoop23Shims extends HadoopShimsSecure {
FileSystem sourceFs)
throws IOException {
List<String> params = new ArrayList<String>();
boolean needToAddPreserveOption = true;
- for (Map.Entry<String,String> entry :
conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){
+ for (Map.Entry<String,String> entry :
conf.getPropsWithPrefix(Utils.DISTCP_OPTIONS_PREFIX).entrySet()){
String distCpOption = entry.getKey();
String distCpVal = entry.getValue();
if (distCpOption.startsWith("p")) {
@@ -1142,7 +1130,8 @@ public class Hadoop23Shims extends HadoopShimsSecure {
}
}
if (needToAddPreserveOption) {
- params.add((checkFileSystemXAttrSupport(dstFs) &&
checkFileSystemXAttrSupport(sourceFs)) ? "-pbx" : "-pb");
+ params.add((Utils.checkFileSystemXAttrSupport(dstFs)
+ && Utils.checkFileSystemXAttrSupport(sourceFs)) ? "-pbx" :
"-pb");
}
if (!params.contains("-update")) {
params.add("-update");
diff --git a/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java
b/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java
index 0343ae2..7cf78dc 100644
--- a/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java
+++ b/shims/common/src/main/java/org/apache/hadoop/hive/shims/Utils.java
@@ -37,6 +37,8 @@ import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.util.KerberosUtil;
@@ -51,6 +53,8 @@ public class Utils {
private static final boolean IBM_JAVA = System.getProperty("java.vendor")
.contains("IBM");
+ public static final String DISTCP_OPTIONS_PREFIX = "distcp.options.";
+
public static UserGroupInformation getUGI() throws LoginException,
IOException {
String doAs = System.getenv("HADOOP_USER_NAME");
if(doAs != null && doAs.length() > 0) {
@@ -156,6 +160,16 @@ public class Utils {
return Utils.constructXSRFFilter();
}
+ public static boolean checkFileSystemXAttrSupport(FileSystem fs) throws
IOException {
+ try {
+ fs.getXAttrs(new Path(Path.SEPARATOR));
+ return true;
+ } catch (UnsupportedOperationException e) {
+ LOG.warn("XAttr won't be preserved since it is not supported for file
system: " + fs.getUri());
+ return false;
+ }
+ }
+
private static Filter constructXSRFFilter() {
// Note Hadoop 2.7.1 onwards includes a RestCsrfPreventionFilter class
that is
// usable as-is. However, since we have to work on a multitude of hadoop
versions