Repository: tez Updated Branches: refs/heads/master cfede26b0 -> 4c378b443
TEZ-3876. Bug in local mode distributed cache files (Jacob Tolar via jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4c378b44 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4c378b44 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4c378b44 Branch: refs/heads/master Commit: 4c378b443b20e1f643e894e81ec41271d0356b3f Parents: cfede26 Author: Jonathan Eagles <[email protected]> Authored: Mon Dec 11 14:40:04 2017 -0600 Committer: Jonathan Eagles <[email protected]> Committed: Mon Dec 11 14:40:04 2017 -0600 ---------------------------------------------------------------------- .../dag/app/launcher/TezLocalCacheManager.java | 60 ++++++----- .../app/launcher/TestTezLocalCacheManager.java | 107 +++++++++++++++++++ 2 files changed, 142 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/4c378b44/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java index 80f73aa..45e5540 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/TezLocalCacheManager.java @@ -31,7 +31,9 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -90,15 +92,20 @@ public class TezLocalCacheManager { throw new IllegalArgumentException("Resource type PATTERN not supported."); } - // submit task to download the object - java.nio.file.Path downloadDir = Files.createTempDirectory(tempDir, resourceName); - Path dest = new Path(downloadDir.toAbsolutePath().toString()); - FSDownload downloader = new FSDownload(fileContext, ugi, conf, dest, resource); - Future<Path> downloadedPath = threadPool.submit(downloader); - // linkPath is the path we want to symlink the file/directory into Path linkPath = new Path(cwd, entry.getKey()); - resourceInfo.put(resource, new ResourceInfo(downloadedPath, linkPath)); + + if (resourceInfo.containsKey(resource)) { + // We've already downloaded this resource and just need to add another link. + resourceInfo.get(resource).linkPaths.add(linkPath); + } else { + // submit task to download the object + java.nio.file.Path downloadDir = Files.createTempDirectory(tempDir, resourceName); + Path dest = new Path(downloadDir.toAbsolutePath().toString()); + FSDownload downloader = new FSDownload(fileContext, ugi, conf, dest, resource); + Future<Path> downloadedPath = threadPool.submit(downloader); + resourceInfo.put(resource, new ResourceInfo(downloadedPath, linkPath)); + } } // Link each file @@ -106,20 +113,21 @@ public class TezLocalCacheManager { LocalResource resource = entry.getKey(); ResourceInfo resourceMeta = entry.getValue(); - Path linkPath = resourceMeta.linkPath; - Path targetPath; - - try { - // this blocks on the download completing - targetPath = resourceMeta.downloadPath.get(); - } catch (InterruptedException | ExecutionException e) { - throw new IOException(e); - } - - if (createSymlink(targetPath, linkPath)) { - LOG.info("Localized file: {} as {}", resource, linkPath); - } else { - LOG.warn("Failed to create symlink: {} <- {}", targetPath, linkPath); + for (Path linkPath : resourceMeta.linkPaths) { + Path targetPath; + + try { + // this blocks on the download completing + targetPath = resourceMeta.downloadPath.get(); + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); + } + + if (createSymlink(targetPath, linkPath)) { + LOG.info("Localized file: {} as {}", resource, linkPath); + } else { + LOG.warn("Failed to create symlink: {} <- {}", targetPath, linkPath); + } } } } finally { @@ -136,8 +144,10 @@ public class TezLocalCacheManager { */ public void cleanup() throws IOException { for (ResourceInfo info : resourceInfo.values()) { - if (fileContext.util().exists(info.linkPath)) { - fileContext.delete(info.linkPath, true); + for (Path linkPath : info.linkPaths) { + if (fileContext.util().exists(linkPath)) { + fileContext.delete(linkPath, true); + } } } @@ -174,11 +184,11 @@ public class TezLocalCacheManager { */ private static class ResourceInfo { final Future<Path> downloadPath; - final Path linkPath; + final Set<Path> linkPaths = new HashSet<>(); public ResourceInfo(Future<Path> downloadPath, Path linkPath) { this.downloadPath = downloadPath; - this.linkPath = linkPath; + this.linkPaths.add(linkPath); } } } http://git-wip-us.apache.org/repos/asf/tez/blob/4c378b44/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java new file mode 100644 index 0000000..fb23a1d --- /dev/null +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestTezLocalCacheManager.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.app.launcher; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +public class TestTezLocalCacheManager { + + @Test + public void testManager() throws URISyntaxException, IOException { + Map<String, LocalResource> resources = new HashMap<>(); + + // Test that localization works for regular files and verify that if multiple symlinks are created, + // they all work + LocalResource resourceOne = createFile("content-one"); + LocalResource resourceTwo = createFile("content-two"); + + resources.put("file-one", resourceOne); + resources.put("file-two", resourceTwo); + resources.put("file-three", resourceTwo); + + TezLocalCacheManager manager = new TezLocalCacheManager(resources, new Configuration()); + + try { + manager.localize(); + + Assert.assertEquals( + "content-one", + new String(Files.readAllBytes(Paths.get("./file-one"))) + ); + + Assert.assertEquals( + "content-two", + new String(Files.readAllBytes(Paths.get("./file-two"))) + ); + + Assert.assertEquals( + "content-two", + new String(Files.readAllBytes(Paths.get("./file-three"))) + ); + } finally { + manager.cleanup(); + } + + // verify that symlinks were removed + Assert.assertFalse(Files.exists(Paths.get("./file-one"))); + Assert.assertFalse(Files.exists(Paths.get("./file-two"))); + Assert.assertFalse(Files.exists(Paths.get("./file-three"))); + } + + // create a temporary file with the given content and return a LocalResource + private static LocalResource createFile(String content) throws IOException { + FileContext fs = FileContext.getLocalFSFileContext(); + + java.nio.file.Path tempFile = Files.createTempFile("test-cache-manager", ".txt"); + File temp = tempFile.toFile(); + temp.deleteOnExit(); + Path p = new Path("file:///" + tempFile.toAbsolutePath().toString()); + + Files.write(tempFile, content.getBytes()); + + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + LocalResource ret = recordFactory.newRecordInstance(LocalResource.class); + URL yarnUrlFromPath = ConverterUtils.getYarnUrlFromPath(p); + ret.setResource(yarnUrlFromPath); + ret.setSize(content.getBytes().length); + ret.setType(LocalResourceType.FILE); + ret.setVisibility(LocalResourceVisibility.PRIVATE); + ret.setTimestamp(fs.getFileStatus(p).getModificationTime()); + return ret; + } +}
