APEX-35 #resolve Attempt to create directory before opening the meta file to 
write


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/45c7685a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/45c7685a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/45c7685a

Branch: refs/heads/master
Commit: 45c7685a0ca7a6cebcebaf0f3ab8b89788adc4b2
Parents: 19d6658
Author: David Yan <[email protected]>
Authored: Thu Aug 6 13:31:05 2015 -0700
Committer: David Yan <[email protected]>
Committed: Thu Aug 6 17:51:52 2015 -0700

----------------------------------------------------------------------
 .../stram/StreamingContainerManager.java        | 48 +++++++++-----------
 1 file changed, 21 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/45c7685a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 0847f3c..6840288 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -19,6 +19,7 @@ import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -26,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
 
+import com.datatorrent.netlet.util.DTThrowable;
 import com.esotericsoftware.kryo.KryoException;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
@@ -173,6 +175,7 @@ public class StreamingContainerManager implements 
PlanContext
   private final Cache<Long, Object> commandResponse = 
CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();
   private long lastLatencyWarningTime;
   private transient ExecutorService poolExecutor;
+  private FileContext fileContext;
 
   //logic operator name to a queue of logical metrics. this gets cleared 
periodically
   private final Map<String, Queue<Pair<Long, Map<String, Object>>>> 
logicalMetrics = Maps.newConcurrentMap();
@@ -329,22 +332,8 @@ public class StreamingContainerManager implements 
PlanContext
       this.eventBus = new MBassador<StramEvent>(BusConfiguration.Default(1, 1, 
1));
     }
     this.plan = new PhysicalPlan(dag, this);
-    setupWsClient();
-    setupRecording(enableEventRecording);
-    setupStringCodecs();
     this.journal = new Journal(this);
-    try {
-      saveMetaInfo();
-    } catch (IOException ex) {
-      LOG.error("Error saving meta info to DFS", ex);
-    }
-
-    try {
-      this.containerFile = new FSJsonLineFile(new Path(this.vars.appPath + 
"/containers"), new FsPermission((short)0644));
-      this.containerFile.append(getAppMasterContainerInfo());
-    } catch (IOException ex) {
-      LOG.warn("Caught exception when instantiating for container info file. 
Ignoring", ex);
-    }
+    init(enableEventRecording);
   }
 
   private StreamingContainerManager(CheckpointState checkpointedState, boolean 
enableEventRecording)
@@ -354,20 +343,26 @@ public class StreamingContainerManager implements 
PlanContext
     poolExecutor = Executors.newFixedThreadPool(4);
     this.plan = checkpointedState.physicalPlan;
     this.eventBus = new MBassador<StramEvent>(BusConfiguration.Default(1, 1, 
1));
+    this.journal = new Journal(this);
+    init(enableEventRecording);
+  }
+
+  private void init(boolean enableEventRecording)
+  {
     setupWsClient();
     setupRecording(enableEventRecording);
     setupStringCodecs();
-    this.journal = new Journal(this);
+
     try {
+      Path file = new Path(this.vars.appPath);
+      URI uri = file.toUri();
+      Configuration config = new YarnConfiguration();
+      fileContext = uri.getScheme() == null ? 
FileContext.getFileContext(config) : FileContext.getFileContext(uri, config);
       saveMetaInfo();
-    } catch (IOException ex) {
-      LOG.error("Error saving meta info to DFS", ex);
-    }
-    try {
-      this.containerFile = new FSJsonLineFile(new Path(this.vars.appPath + 
"/containers"), new FsPermission((short) 0644));
+      this.containerFile = new FSJsonLineFile(new Path(this.vars.appPath + 
"/containers"), FsPermission.getDefault());
       this.containerFile.append(getAppMasterContainerInfo());
     } catch (IOException ex) {
-      LOG.error("Caught exception when instantiating for container info file", 
ex);
+      DTThrowable.rethrow(ex);
     }
   }
 
@@ -858,9 +853,8 @@ public class StreamingContainerManager implements 
PlanContext
    */
   private void saveMetaInfo() throws IOException
   {
-    Path path = new Path(this.vars.appPath, APP_META_FILENAME + "." + 
System.nanoTime());
-    FileContext fc = FileContext.getFileContext(path.toUri());
-    try (FSDataOutputStream os = fc.create(path, EnumSet.of(CreateFlag.CREATE, 
CreateFlag.OVERWRITE))) {
+    Path file = new Path(this.vars.appPath, APP_META_FILENAME + "." + 
System.nanoTime());
+    try (FSDataOutputStream os = fileContext.create(file, 
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), 
Options.CreateOpts.CreateParent.createParent())) {
       JSONObject top = new JSONObject();
       JSONObject attributes = new JSONObject();
       for (Map.Entry<Attribute<?>, Object> entry : 
this.plan.getLogicalPlan().getAttributes().entrySet()) {
@@ -877,7 +871,7 @@ public class StreamingContainerManager implements 
PlanContext
       throw new RuntimeException(ex);
     }
     Path origPath = new Path(this.vars.appPath, APP_META_FILENAME);
-    fc.rename(path, origPath, Options.Rename.OVERWRITE);
+    fileContext.rename(file, origPath, Options.Rename.OVERWRITE);
   }
 
   public Queue<Pair<Long, Map<String, Object>>> getWindowMetrics(String 
operatorName)
@@ -1416,7 +1410,7 @@ public class StreamingContainerManager implements 
PlanContext
         try {
           FSJsonLineFile operatorFile = operatorFiles.get(ptOp.getId());
           if (operatorFile == null) {
-            operatorFiles.putIfAbsent(ptOp.getId(), new FSJsonLineFile(new 
Path(this.vars.appPath + "/operators/" + ptOp.getId()), new 
FsPermission((short)0644)));
+            operatorFiles.putIfAbsent(ptOp.getId(), new FSJsonLineFile(new 
Path(this.vars.appPath + "/operators/" + ptOp.getId()), 
FsPermission.getDefault()));
             operatorFile = operatorFiles.get(ptOp.getId());
           }
           JSONObject operatorInfo = new JSONObject();

Reply via email to