Repository: crunch Updated Branches: refs/heads/master c14acfab0 -> 901d0644d
Quick and Dirty Workaround for Crunch DistCache Signed-off-by: Micah Whitacre <[email protected]> Signed-off-by: Josh Wills <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/901d0644 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/901d0644 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/901d0644 Branch: refs/heads/master Commit: 901d0644dcaec5309670b7a2eeff228cea2c7767 Parents: c14acfa Author: Dimitry Goldin <[email protected]> Authored: Fri Oct 14 18:39:41 2016 +0200 Committer: Josh Wills <[email protected]> Committed: Mon Dec 19 16:44:08 2016 -0800 ---------------------------------------------------------------------- .../crunch/impl/mr/run/CrunchTaskContext.java | 3 +- .../java/org/apache/crunch/util/DistCache.java | 42 +++++++++++--------- 2 files changed, 24 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/901d0644/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java index b81df05..0eb246a 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java @@ -40,8 +40,7 @@ class CrunchTaskContext { this.taskContext = taskContext; this.nodeContext = nodeContext; Configuration conf = taskContext.getConfiguration(); - Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)), - nodeContext.toString()); + Path path = new Path(nodeContext.toString()); try { this.nodes = (List<RTNode>) DistCache.read(conf, path); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/crunch/blob/901d0644/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java b/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java index 046f038..ad26a67 100644 --- a/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java +++ b/crunch-core/src/main/java/org/apache/crunch/util/DistCache.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.net.URI; import java.net.URL; import java.net.URLDecoder; import java.util.Enumeration; @@ -67,27 +66,32 @@ public class DistCache { DistributedCache.addCacheFile(path.toUri(), conf); } - public static Object read(Configuration conf, Path path) throws IOException { - URI target = null; - for (URI uri : DistributedCache.getCacheFiles(conf)) { - if (uri.toString().equals(path.toString())) { - target = uri; - break; - } + public static Object read(Configuration conf, Path requestedFile) throws IOException { + FileSystem localFs = FileSystem.getLocal(conf); + + Path cachedPath = null; + + try { + cachedPath = getPathToCacheFile(requestedFile, conf); + } catch (CrunchRuntimeException cre) { + throw new IOException("Can not determine cached location for " + requestedFile.toString(), cre); } - Object value = null; - if (target != null) { - Path targetPath = new Path(target.toString()); - ObjectInputStream ois = new ClassloaderFallbackObjectInputStream( - targetPath.getFileSystem(conf).open(targetPath)); - try { - value = ois.readObject(); - } catch (ClassNotFoundException e) { - throw new CrunchRuntimeException(e); + + if(cachedPath == null || !localFs.exists(cachedPath)) { + throw new IOException("Expected file with path: " + requestedFile.toString() + " to be cached"); + } + + ObjectInputStream ois = null; + try { + ois = new ObjectInputStream(localFs.open(cachedPath)); + return ois.readObject(); + } catch (ClassNotFoundException e) { + throw new CrunchRuntimeException(e); + } finally { + if (ois != null) { + ois.close(); } - ois.close(); } - return value; } public static void addCacheFile(Path path, Configuration conf) {
