This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new c9ddbd210c9 MAPREDUCE-7391. TestLocalDistributedCacheManager failing
after HADOOP-16202 (#4472)
c9ddbd210c9 is described below
commit c9ddbd210c979f5ae2841fb8fe72256f12a9b9ad
Author: Steve Loughran <[email protected]>
AuthorDate: Wed Jun 22 12:52:41 2022 +0100
MAPREDUCE-7391. TestLocalDistributedCacheManager failing after HADOOP-16202
(#4472)
Fixing a mockito-based test which broke when HADOOP-16202
changed the methods being invoked.
Contributed by Steve Loughran
---
.../mapred/TestLocalDistributedCacheManager.java | 204 ++++++++++++++-------
1 file changed, 134 insertions(+), 70 deletions(-)
diff --git
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
index 5d1c669e500..50cc63094bd 100644
---
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
+++
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestLocalDistributedCacheManager.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.mapred;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -31,9 +30,11 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -44,22 +45,31 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+/**
+ * Test the LocalDistributedCacheManager using mocking.
+ * This suite is brittle to changes in the class under test.
+ */
@SuppressWarnings("deprecation")
public class TestLocalDistributedCacheManager {
+ private static final byte[] TEST_DATA = "This is a test file\n".getBytes();
+
private static FileSystem mockfs;
public static class MockFileSystem extends FilterFileSystem {
@@ -70,6 +80,14 @@ public class TestLocalDistributedCacheManager {
private File localDir;
+ /**
+ * Recursive delete of a path.
+ * For safety, paths of length under 5 are rejected.
+ * @param file path to delete.
+ * @throws IOException never, it is just "a dummy in the method signature"
+ * @throws IllegalArgumentException path too short
+ * @throws RuntimeException File.delete() failed.
+ */
private static void delete(File file) throws IOException {
if (file.getAbsolutePath().length() < 5) {
throw new IllegalArgumentException(
@@ -109,9 +127,9 @@ public class TestLocalDistributedCacheManager {
* Mock input stream based on a byte array so that it can be used by a
* FSDataInputStream.
*/
- private static class MockInputStream extends ByteArrayInputStream
+ private static final class MockInputStream extends ByteArrayInputStream
implements Seekable, PositionedReadable {
- public MockInputStream(byte[] buf) {
+ private MockInputStream(byte[] buf) {
super(buf);
}
@@ -134,47 +152,45 @@ public class TestLocalDistributedCacheManager {
when(mockfs.getUri()).thenReturn(mockBase);
Path working = new Path("mock://test-nn1/user/me/");
when(mockfs.getWorkingDirectory()).thenReturn(working);
- when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer<Path>() {
- @Override
- public Path answer(InvocationOnMock args) throws Throwable {
- return (Path) args.getArguments()[0];
- }
- });
+ when(mockfs.resolvePath(any(Path.class))).thenAnswer(
+ (Answer<Path>) args -> (Path) args.getArguments()[0]);
final URI file = new URI("mock://test-nn1/user/me/file.txt#link");
final Path filePath = new Path(file);
File link = new File("link");
+ // return a filestatus for the file "*/file.txt"; raise FNFE for anything
else
when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new
Answer<FileStatus>() {
@Override
public FileStatus answer(InvocationOnMock args) throws Throwable {
Path p = (Path)args.getArguments()[0];
if("file.txt".equals(p.getName())) {
- return new FileStatus(201, false, 1, 500, 101, 101,
- FsPermission.getDefault(), "me", "me", filePath);
+ return createMockTestFileStatus(filePath);
} else {
- throw new FileNotFoundException(p+" not supported by mocking");
+ throw notMocked(p);
}
}
});
when(mockfs.getConf()).thenReturn(conf);
final FSDataInputStream in =
- new FSDataInputStream(new MockInputStream("This is a test
file\n".getBytes()));
- when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new
Answer<FSDataInputStream>() {
- @Override
- public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
- Path src = (Path)args.getArguments()[0];
- if ("file.txt".equals(src.getName())) {
- return in;
- } else {
- throw new FileNotFoundException(src+" not supported by mocking");
- }
- }
- });
+ new FSDataInputStream(new MockInputStream(TEST_DATA));
+
+ // file.txt: return an openfile builder which will eventually return the
data,
+ // anything else: FNFE
+ when(mockfs.openFile(any(Path.class))).thenAnswer(
+ (Answer<FutureDataInputStreamBuilder>) args -> {
+ Path src = (Path)args.getArguments()[0];
+ if ("file.txt".equals(src.getName())) {
+ return new MockOpenFileBuilder(mockfs, src,
+ () -> CompletableFuture.completedFuture(in));
+ } else {
+ throw notMocked(src);
+ }
+ });
Job.addCacheFile(file, conf);
- Map<String, Boolean> policies = new HashMap<String, Boolean>();
+ Map<String, Boolean> policies = new HashMap<>();
policies.put(file.toString(), true);
Job.setFileSharedCacheUploadPolicies(conf, policies);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101");
@@ -191,6 +207,12 @@ public class TestLocalDistributedCacheManager {
assertFalse(link.exists());
}
+ /**
+ * This test case sets the mock FS to raise FNFE
+ * on any getFileStatus/openFile calls.
+ * If the manager successfully starts up, it means that
+ * no files were probed for/opened.
+ */
@Test
public void testEmptyDownload() throws Exception {
JobID jobId = new JobID();
@@ -201,30 +223,21 @@ public class TestLocalDistributedCacheManager {
when(mockfs.getUri()).thenReturn(mockBase);
Path working = new Path("mock://test-nn1/user/me/");
when(mockfs.getWorkingDirectory()).thenReturn(working);
- when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer<Path>() {
- @Override
- public Path answer(InvocationOnMock args) throws Throwable {
- return (Path) args.getArguments()[0];
- }
- });
+ when(mockfs.resolvePath(any(Path.class))).thenAnswer(
+ (Answer<Path>) args -> (Path) args.getArguments()[0]);
- when(mockfs.getFileStatus(any(Path.class))).thenAnswer(new
Answer<FileStatus>() {
- @Override
- public FileStatus answer(InvocationOnMock args) throws Throwable {
- Path p = (Path)args.getArguments()[0];
- throw new FileNotFoundException(p+" not supported by mocking");
- }
- });
+ when(mockfs.getFileStatus(any(Path.class))).thenAnswer(
+ (Answer<FileStatus>) args -> {
+ Path p = (Path)args.getArguments()[0];
+ throw notMocked(p);
+ });
when(mockfs.getConf()).thenReturn(conf);
- when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new
Answer<FSDataInputStream>() {
- @Override
- public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
- Path src = (Path)args.getArguments()[0];
- throw new FileNotFoundException(src+" not supported by mocking");
- }
- });
-
+ when(mockfs.openFile(any(Path.class))).thenAnswer(
+ (Answer<FutureDataInputStreamBuilder>) args -> {
+ Path src = (Path)args.getArguments()[0];
+ throw notMocked(src);
+ });
conf.set(MRJobConfig.CACHE_FILES, "");
conf.set(MRConfig.LOCAL_DIR, localDir.getAbsolutePath());
LocalDistributedCacheManager manager = new LocalDistributedCacheManager();
@@ -236,6 +249,9 @@ public class TestLocalDistributedCacheManager {
}
+ /**
+ * The same file can be added to the cache twice.
+ */
@Test
public void testDuplicateDownload() throws Exception {
JobID jobId = new JobID();
@@ -246,12 +262,8 @@ public class TestLocalDistributedCacheManager {
when(mockfs.getUri()).thenReturn(mockBase);
Path working = new Path("mock://test-nn1/user/me/");
when(mockfs.getWorkingDirectory()).thenReturn(working);
- when(mockfs.resolvePath(any(Path.class))).thenAnswer(new Answer<Path>() {
- @Override
- public Path answer(InvocationOnMock args) throws Throwable {
- return (Path) args.getArguments()[0];
- }
- });
+ when(mockfs.resolvePath(any(Path.class))).thenAnswer(
+ (Answer<Path>) args -> (Path) args.getArguments()[0]);
final URI file = new URI("mock://test-nn1/user/me/file.txt#link");
final Path filePath = new Path(file);
@@ -262,32 +274,30 @@ public class TestLocalDistributedCacheManager {
public FileStatus answer(InvocationOnMock args) throws Throwable {
Path p = (Path)args.getArguments()[0];
if("file.txt".equals(p.getName())) {
- return new FileStatus(201, false, 1, 500, 101, 101,
- FsPermission.getDefault(), "me", "me", filePath);
+ return createMockTestFileStatus(filePath);
} else {
- throw new FileNotFoundException(p+" not supported by mocking");
+ throw notMocked(p);
}
}
});
when(mockfs.getConf()).thenReturn(conf);
final FSDataInputStream in =
- new FSDataInputStream(new MockInputStream("This is a test
file\n".getBytes()));
- when(mockfs.open(any(Path.class), anyInt())).thenAnswer(new
Answer<FSDataInputStream>() {
- @Override
- public FSDataInputStream answer(InvocationOnMock args) throws Throwable {
- Path src = (Path)args.getArguments()[0];
- if ("file.txt".equals(src.getName())) {
- return in;
- } else {
- throw new FileNotFoundException(src+" not supported by mocking");
- }
- }
- });
+ new FSDataInputStream(new MockInputStream(TEST_DATA));
+ when(mockfs.openFile(any(Path.class))).thenAnswer(
+ (Answer<FutureDataInputStreamBuilder>) args -> {
+ Path src = (Path)args.getArguments()[0];
+ if ("file.txt".equals(src.getName())) {
+ return new MockOpenFileBuilder(mockfs, src,
+ () -> CompletableFuture.completedFuture(in));
+ } else {
+ throw notMocked(src);
+ }
+ });
Job.addCacheFile(file, conf);
Job.addCacheFile(file, conf);
- Map<String, Boolean> policies = new HashMap<String, Boolean>();
+ Map<String, Boolean> policies = new HashMap<>();
policies.put(file.toString(), true);
Job.setFileSharedCacheUploadPolicies(conf, policies);
conf.set(MRJobConfig.CACHE_FILE_TIMESTAMPS, "101,101");
@@ -306,7 +316,7 @@ public class TestLocalDistributedCacheManager {
/**
* This test tries to replicate the issue with the previous version of
- * {@ref LocalDistributedCacheManager} when the resulting timestamp is
+ * {@link LocalDistributedCacheManager} when the resulting timestamp is
* identical as that in another process. Unfortunately, it is difficult
* to mimic such behavior in a single process unit test. And mocking
* the unique id (timestamp previously, UUID otherwise) won't prove the
@@ -321,7 +331,7 @@ public class TestLocalDistributedCacheManager {
final int threadCount = 10;
final CyclicBarrier barrier = new CyclicBarrier(threadCount);
- ArrayList<Callable<Void>> setupCallable = new ArrayList<>();
+ List<Callable<Void>> setupCallable = new ArrayList<>();
for (int i = 0; i < threadCount; ++i) {
setupCallable.add(() -> {
barrier.await();
@@ -340,4 +350,58 @@ public class TestLocalDistributedCacheManager {
manager.close();
}
}
+
+ /**
+ * Create test file status using test data as the length.
+ * @param filePath path to the file
+ * @return a file status.
+ */
+ private FileStatus createMockTestFileStatus(final Path filePath) {
+ return new FileStatus(TEST_DATA.length, false, 1, 500, 101, 101,
+ FsPermission.getDefault(), "me", "me", filePath);
+ }
+
+ /**
+ * Exception to throw on a not mocked path.
+ * @return a FileNotFoundException
+ */
+ private FileNotFoundException notMocked(final Path p) {
+ return new FileNotFoundException(p + " not supported by mocking");
+ }
+
+ /**
+ * Openfile builder where the build operation is a l-expression
+ * supplied in the constructor.
+ */
+ private static final class MockOpenFileBuilder extends
+ FutureDataInputStreamBuilderImpl {
+
+ /**
+ * Operation to invoke to build the result.
+ */
+ private final CallableRaisingIOE<CompletableFuture<FSDataInputStream>>
+ buildTheResult;
+
+ /**
+ * Create the builder. the FS and path must be non-null.
+ * FileSystem.getConf() is the only method invoked of the FS by
+ * the superclass.
+ * @param fileSystem fs
+ * @param path path to open
+ * @param buildTheResult builder operation.
+ */
+ private MockOpenFileBuilder(final FileSystem fileSystem, Path path,
+ final CallableRaisingIOE<CompletableFuture<FSDataInputStream>>
buildTheResult) {
+ super(fileSystem, path);
+ this.buildTheResult = buildTheResult;
+ }
+
+ @Override
+ public CompletableFuture<FSDataInputStream> build()
+ throws IllegalArgumentException, UnsupportedOperationException,
+ IOException {
+ return buildTheResult.apply();
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]