APEX-35 #resolve Attempt to create directory before opening the meta file to write
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/45c7685a Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/45c7685a Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/45c7685a Branch: refs/heads/master Commit: 45c7685a0ca7a6cebcebaf0f3ab8b89788adc4b2 Parents: 19d6658 Author: David Yan <[email protected]> Authored: Thu Aug 6 13:31:05 2015 -0700 Committer: David Yan <[email protected]> Committed: Thu Aug 6 17:51:52 2015 -0700 ---------------------------------------------------------------------- .../stram/StreamingContainerManager.java | 48 +++++++++----------- 1 file changed, 21 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/45c7685a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 0847f3c..6840288 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -19,6 +19,7 @@ import java.io.*; import java.lang.management.ManagementFactory; import java.lang.reflect.Field; import java.net.InetSocketAddress; +import java.net.URI; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -26,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; +import com.datatorrent.netlet.util.DTThrowable; import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; @@ -173,6 +175,7 @@ public class StreamingContainerManager implements PlanContext private final Cache<Long, Object> commandResponse = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build(); private long lastLatencyWarningTime; private transient ExecutorService poolExecutor; + private FileContext fileContext; //logic operator name to a queue of logical metrics. this gets cleared periodically private final Map<String, Queue<Pair<Long, Map<String, Object>>>> logicalMetrics = Maps.newConcurrentMap(); @@ -329,22 +332,8 @@ public class StreamingContainerManager implements PlanContext this.eventBus = new MBassador<StramEvent>(BusConfiguration.Default(1, 1, 1)); } this.plan = new PhysicalPlan(dag, this); - setupWsClient(); - setupRecording(enableEventRecording); - setupStringCodecs(); this.journal = new Journal(this); - try { - saveMetaInfo(); - } catch (IOException ex) { - LOG.error("Error saving meta info to DFS", ex); - } - - try { - this.containerFile = new FSJsonLineFile(new Path(this.vars.appPath + "/containers"), new FsPermission((short)0644)); - this.containerFile.append(getAppMasterContainerInfo()); - } catch (IOException ex) { - LOG.warn("Caught exception when instantiating for container info file. Ignoring", ex); - } + init(enableEventRecording); } private StreamingContainerManager(CheckpointState checkpointedState, boolean enableEventRecording) @@ -354,20 +343,26 @@ public class StreamingContainerManager implements PlanContext poolExecutor = Executors.newFixedThreadPool(4); this.plan = checkpointedState.physicalPlan; this.eventBus = new MBassador<StramEvent>(BusConfiguration.Default(1, 1, 1)); + this.journal = new Journal(this); + init(enableEventRecording); + } + + private void init(boolean enableEventRecording) + { setupWsClient(); setupRecording(enableEventRecording); setupStringCodecs(); - this.journal = new Journal(this); + try { + Path file = new Path(this.vars.appPath); + URI uri = file.toUri(); + Configuration config = new YarnConfiguration(); + fileContext = uri.getScheme() == null ? FileContext.getFileContext(config) : FileContext.getFileContext(uri, config); saveMetaInfo(); - } catch (IOException ex) { - LOG.error("Error saving meta info to DFS", ex); - } - try { - this.containerFile = new FSJsonLineFile(new Path(this.vars.appPath + "/containers"), new FsPermission((short) 0644)); + this.containerFile = new FSJsonLineFile(new Path(this.vars.appPath + "/containers"), FsPermission.getDefault()); this.containerFile.append(getAppMasterContainerInfo()); } catch (IOException ex) { - LOG.error("Caught exception when instantiating for container info file", ex); + DTThrowable.rethrow(ex); } } @@ -858,9 +853,8 @@ public class StreamingContainerManager implements PlanContext */ private void saveMetaInfo() throws IOException { - Path path = new Path(this.vars.appPath, APP_META_FILENAME + "." + System.nanoTime()); - FileContext fc = FileContext.getFileContext(path.toUri()); - try (FSDataOutputStream os = fc.create(path, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE))) { + Path file = new Path(this.vars.appPath, APP_META_FILENAME + "." + System.nanoTime()); + try (FSDataOutputStream os = fileContext.create(file, EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent())) { JSONObject top = new JSONObject(); JSONObject attributes = new JSONObject(); for (Map.Entry<Attribute<?>, Object> entry : this.plan.getLogicalPlan().getAttributes().entrySet()) { @@ -877,7 +871,7 @@ public class StreamingContainerManager implements PlanContext throw new RuntimeException(ex); } Path origPath = new Path(this.vars.appPath, APP_META_FILENAME); - fc.rename(path, origPath, Options.Rename.OVERWRITE); + fileContext.rename(file, origPath, Options.Rename.OVERWRITE); } public Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String operatorName) @@ -1416,7 +1410,7 @@ public class StreamingContainerManager implements PlanContext try { FSJsonLineFile operatorFile = operatorFiles.get(ptOp.getId()); if (operatorFile == null) { - operatorFiles.putIfAbsent(ptOp.getId(), new FSJsonLineFile(new Path(this.vars.appPath + "/operators/" + ptOp.getId()), new FsPermission((short)0644))); + operatorFiles.putIfAbsent(ptOp.getId(), new FSJsonLineFile(new Path(this.vars.appPath + "/operators/" + ptOp.getId()), FsPermission.getDefault())); operatorFile = operatorFiles.get(ptOp.getId()); } JSONObject operatorInfo = new JSONObject();
