This is an automated email from the ASF dual-hosted git repository.
aasha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 83dc73d HIVE-25534: Error when executing DistCp on file system not
supporting XAttrs (#2650)(Haymant Mangla, reviewed by Pravin Kumar Sinha)
83dc73d is described below
commit 83dc73d9d6bbbb8482327046d7071cddd01e23c9
Author: Haymant Mangla <[email protected]>
AuthorDate: Fri Oct 1 14:46:27 2021 +0530
HIVE-25534: Error when executing DistCp on file system not supporting
XAttrs (#2650)(Haymant Mangla, reviewed by Pravin Kumar Sinha)
* HIVE-25534: Don't preserve FileAttribute.XATTR to initialise distcp
* Final Review
* new
* Minor correction
---
.../apache/hadoop/hive/shims/Hadoop23Shims.java | 47 ++++++------
.../hadoop/hive/shims/TestHadoop23Shims.java | 89 +++++++++++++++++++++-
2 files changed, 109 insertions(+), 27 deletions(-)
diff --git
a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
index 36fe5a0..a611dda 100644
--- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
+++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
@@ -277,6 +277,16 @@ public class Hadoop23Shims extends HadoopShimsSecure {
equalsIgnoreCase(conf.get(YarnConfiguration.RM_SCHEDULER));
}
+ private boolean checkFileSystemXAttrSupport(FileSystem fs) throws
IOException {
+ try {
+ fs.getXAttrs(new Path(Path.SEPARATOR));
+ return true;
+ } catch (UnsupportedOperationException e) {
+ LOG.warn("XAttr won't be preserved since it is not supported for file
system: " + fs.getUri());
+ return false;
+ }
+ }
+
/**
* Returns a shim to wrap MiniMrCluster
*/
@@ -1102,10 +1112,10 @@ public class Hadoop23Shims extends HadoopShimsSecure {
private static final String DISTCP_OPTIONS_PREFIX = "distcp.options.";
- List<String> constructDistCpParams(List<Path> srcPaths, Path dst,
Configuration conf) {
+ List<String> constructDistCpParams(List<Path> srcPaths, Path dst,
Configuration conf) throws IOException {
// -update and -delete are mandatory options for directory copy to work.
- // -pbx is default preserve options if user doesn't pass any.
- List<String> params = constructDistCpDefaultParams(conf);
+ List<String> params = constructDistCpDefaultParams(conf,
dst.getFileSystem(conf),
+ srcPaths.get(0).getFileSystem(conf));
if (!params.contains("-delete")) {
params.add("-delete");
}
@@ -1116,7 +1126,8 @@ public class Hadoop23Shims extends HadoopShimsSecure {
return params;
}
- private List<String> constructDistCpDefaultParams(Configuration conf) {
+ private List<String> constructDistCpDefaultParams(Configuration conf,
FileSystem dstFs,
+ FileSystem sourceFs)
throws IOException {
List<String> params = new ArrayList<String>();
boolean needToAddPreserveOption = true;
for (Map.Entry<String,String> entry :
conf.getPropsWithPrefix(DISTCP_OPTIONS_PREFIX).entrySet()){
@@ -1131,7 +1142,7 @@ public class Hadoop23Shims extends HadoopShimsSecure {
}
}
if (needToAddPreserveOption) {
- params.add("-pbx");
+ params.add((checkFileSystemXAttrSupport(dstFs) &&
checkFileSystemXAttrSupport(sourceFs)) ? "-pbx" : "-pb");
}
if (!params.contains("-update")) {
params.add("-update");
@@ -1150,9 +1161,10 @@ public class Hadoop23Shims extends HadoopShimsSecure {
* @return
*/
List<String> constructDistCpWithSnapshotParams(List<Path> srcPaths, Path
dst, String sourceSnap, String destSnap,
- Configuration conf, String diff) {
+ Configuration conf, String diff) throws IOException {
// Get the default distcp params
- List<String> params = constructDistCpDefaultParams(conf);
+ List<String> params = constructDistCpDefaultParams(conf,
dst.getFileSystem(conf),
+ srcPaths.get(0).getFileSystem(conf));
if (params.contains("-delete")) {
params.remove("-delete");
}
@@ -1192,18 +1204,11 @@ public class Hadoop23Shims extends HadoopShimsSecure {
@Override
public boolean runDistCp(List<Path> srcPaths, Path dst, Configuration conf)
throws IOException {
- DistCpOptions options = new DistCpOptions.Builder(srcPaths, dst)
- .withSyncFolder(true)
- .withDeleteMissing(true)
- .preserve(FileAttribute.BLOCKSIZE)
- .preserve(FileAttribute.XATTR)
- .build();
-
// Creates the command-line parameters for distcp
List<String> params = constructDistCpParams(srcPaths, dst, conf);
DistCp distcp = null;
try {
- distcp = new DistCp(conf, options);
+ distcp = new DistCp(conf, null);
distcp.getConf().setBoolean("mapred.mapper.new-api", true);
// HIVE-13704 states that we should use run() instead of execute() due
to a hadoop known issue
@@ -1230,14 +1235,10 @@ public class Hadoop23Shims extends HadoopShimsSecure {
public boolean runDistCpWithSnapshots(String oldSnapshot, String
newSnapshot, List<Path> srcPaths, Path dst,
boolean overwriteTarget, Configuration conf)
throws IOException {
- DistCpOptions options =
- new DistCpOptions.Builder(srcPaths,
dst).withSyncFolder(true).withUseDiff(oldSnapshot, newSnapshot)
-
.preserve(FileAttribute.BLOCKSIZE).preserve(FileAttribute.XATTR).build();
-
List<String> params = constructDistCpWithSnapshotParams(srcPaths, dst,
oldSnapshot, newSnapshot, conf, "-diff");
try {
- conf.setBoolean("mapred.mapper.new-api", true);
- DistCp distcp = new DistCp(conf, options);
+ DistCp distcp = new DistCp(conf, null);
+ distcp.getConf().setBoolean("mapred.mapper.new-api", true);
int returnCode = distcp.run(params.toArray(new String[0]));
if (returnCode == 0) {
return true;
@@ -1253,7 +1254,7 @@ public class Hadoop23Shims extends HadoopShimsSecure {
LOG.warn("Copy failed due to target modified. Attempting to restore
back the target. source: {} target: {} "
+ "snapshot: {}", srcPaths, dst, oldSnapshot);
List<String> rParams = constructDistCpWithSnapshotParams(srcPaths,
dst, ".", oldSnapshot, conf, "-rdiff");
- DistCp rDistcp = new DistCp(conf, options);
+ DistCp rDistcp = new DistCp(conf, null);
returnCode = rDistcp.run(rParams.toArray(new String[0]));
if (returnCode == 0) {
LOG.info("Target restored to previous state. source: {} target:
{} snapshot: {}. Reattempting to copy.",
@@ -1273,8 +1274,6 @@ public class Hadoop23Shims extends HadoopShimsSecure {
}
} catch (Exception e) {
throw new IOException("Cannot execute DistCp process: ", e);
- } finally {
- conf.setBoolean("mapred.mapper.new-api", false);
}
return false;
}
diff --git
a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
index e82895a..885c2d5 100644
---
a/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
+++
b/shims/0.23/src/main/test/org/apache/hadoop/hive/shims/TestHadoop23Shims.java
@@ -19,26 +19,43 @@
package org.apache.hadoop.hive.shims;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
-
import org.junit.Test;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.any;
public class TestHadoop23Shims {
+ private Path getMockedPath(boolean supportXAttr) throws IOException {
+ FileSystem fs = mock(FileSystem.class);
+ if (supportXAttr) {
+ when(fs.getXAttrs(any())).thenReturn(new HashMap<>());
+ } else {
+ when(fs.getXAttrs(any())).thenThrow(
+ new UnsupportedOperationException("XAttr not supported for file
system."));
+ }
+ Path path = mock(Path.class);
+ when(path.getFileSystem(any())).thenReturn(fs);
+ return path;
+ }
+
@Test
- public void testConstructDistCpParams() {
+ public void testConstructDistCpParams() throws Exception {
Path copySrc = new Path("copySrc");
Path copyDst = new Path("copyDst");
Configuration conf = new Configuration();
@@ -47,7 +64,6 @@ public class TestHadoop23Shims {
List<String> paramsDefault =
shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf);
assertEquals(5, paramsDefault.size());
- assertTrue("Distcp -pbx set by default", paramsDefault.contains("-pbx"));
assertTrue("Distcp -update set by default",
paramsDefault.contains("-update"));
assertTrue("Distcp -delete set by default",
paramsDefault.contains("-delete"));
assertEquals(copySrc.toString(), paramsDefault.get(3));
@@ -94,6 +110,73 @@ public class TestHadoop23Shims {
}
+ @Test
+ public void testXAttrNotPreservedDueToDestFS() throws Exception {
+ Configuration conf = new Configuration();
+ Path copySrc = getMockedPath(true);
+ Path copyDst = getMockedPath(false);
+
+ Hadoop23Shims shims = new Hadoop23Shims();
+ List<String> paramsDefault =
shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf);
+
+ assertEquals(5, paramsDefault.size());
+ assertTrue("Distcp -pb set by default", paramsDefault.contains("-pb"));
+ assertTrue("Distcp -update set by default",
paramsDefault.contains("-update"));
+ assertTrue("Distcp -delete set by default",
paramsDefault.contains("-delete"));
+ assertEquals(copySrc.toString(), paramsDefault.get(3));
+ assertEquals(copyDst.toString(), paramsDefault.get(4));
+ }
+
+ @Test
+ public void testXAttrNotPreservedDueToSrcFS() throws Exception {
+ Configuration conf = new Configuration();
+ Path copySrc = getMockedPath(false);
+ Path copyDst = getMockedPath(true);
+
+ Hadoop23Shims shims = new Hadoop23Shims();
+ List<String> paramsDefault =
shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf);
+
+ assertEquals(5, paramsDefault.size());
+ assertTrue("Distcp -pb set by default", paramsDefault.contains("-pb"));
+ assertTrue("Distcp -update set by default",
paramsDefault.contains("-update"));
+ assertTrue("Distcp -delete set by default",
paramsDefault.contains("-delete"));
+ assertEquals(copySrc.toString(), paramsDefault.get(3));
+ assertEquals(copyDst.toString(), paramsDefault.get(4));
+ }
+
+ @Test
+ public void testXAttrPreserved() throws Exception {
+ Configuration conf = new Configuration();
+ Path copySrc = getMockedPath(true);
+ Path copyDst = getMockedPath(true);
+ Hadoop23Shims shims = new Hadoop23Shims();
+ List<String> paramsDefault =
shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf);
+
+ assertEquals(5, paramsDefault.size());
+ assertTrue("Distcp -pbx set by default", paramsDefault.contains("-pbx"));
+ assertTrue("Distcp -update set by default",
paramsDefault.contains("-update"));
+ assertTrue("Distcp -delete set by default",
paramsDefault.contains("-delete"));
+ assertEquals(copySrc.toString(), paramsDefault.get(3));
+ assertEquals(copyDst.toString(), paramsDefault.get(4));
+ }
+
+ @Test
+ public void testPreserveOptionsOverwritenByUser() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("distcp.options.pbx", "");
+ Path copySrc = getMockedPath(false);
+ Path copyDst = getMockedPath(false);
+ Hadoop23Shims shims = new Hadoop23Shims();
+ List<String> paramsDefault =
shims.constructDistCpParams(Collections.singletonList(copySrc), copyDst, conf);
+
+ assertEquals(5, paramsDefault.size());
+ assertTrue("Distcp -pbx set by default", paramsDefault.contains("-pbx"));
+ assertTrue("Distcp -update set by default",
paramsDefault.contains("-update"));
+ assertTrue("Distcp -delete set by default",
paramsDefault.contains("-delete"));
+ assertEquals(copySrc.toString(), paramsDefault.get(3));
+ assertEquals(copyDst.toString(), paramsDefault.get(4));
+ }
+
@Test(expected = FileNotFoundException.class)
public void testGetFileIdForNonexistingPath() throws Exception {
Hadoop23Shims shims = new Hadoop23Shims();