Repository: twill
Updated Branches:
  refs/heads/site c852eb67b -> b99b26278


Use TwillRuntimeSpecification to carry runtime information instead of using 
environment

This closes #13 on Github.

Signed-off-by: Terence Yim <cht...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/6c7d32aa
Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/6c7d32aa
Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/6c7d32aa

Branch: refs/heads/site
Commit: 6c7d32aa70facec92482a28334877057319d5263
Parents: 983a14a
Author: yaojiefeng <yao...@cask.co>
Authored: Thu Sep 22 14:35:04 2016 -0700
Committer: Terence Yim <cht...@apache.org>
Committed: Wed Oct 5 14:42:46 2016 -0700

----------------------------------------------------------------------
 .../org/apache/twill/internal/Constants.java    |   4 +
 .../java/org/apache/twill/internal/EnvKeys.java |  13 +-
 .../twill/internal/TwillContainerLauncher.java  |   2 +-
 .../internal/TwillRuntimeSpecification.java     |  96 +++++++++++
 .../json/TwillRuntimeSpecificationAdapter.java  | 163 +++++++++++++++++++
 .../json/TwillRuntimeSpecificationCodec.java    |  91 +++++++++++
 .../json/TwillSpecificationAdapter.java         | 161 ------------------
 .../org/apache/twill/internal/ServiceMain.java  |   9 +-
 .../appmaster/ApplicationMasterMain.java        |  21 +--
 .../appmaster/ApplicationMasterService.java     |  40 ++---
 .../internal/appmaster/RunningContainers.java   |  10 +-
 .../internal/container/TwillContainerMain.java  |  31 ++--
 .../apache/twill/yarn/YarnTwillPreparer.java    |  27 ++-
 13 files changed, 415 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-common/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git 
a/twill-common/src/main/java/org/apache/twill/internal/Constants.java 
b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
index dd04eb1..8a33962 100644
--- a/twill-common/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-common/src/main/java/org/apache/twill/internal/Constants.java
@@ -57,6 +57,10 @@ public final class Constants {
   public static final String DISCOVERY_PATH_PREFIX = "/discoverable";
   public static final String INSTANCES_PATH_PREFIX = "/instances";
 
+  /**
+   * Constants for twill variable names.
+   */
+  public static final String TWILL_APP_NAME = "TWILL_APP_NAME";
 
   /**
    * Constants for names of internal files that are shared between client, AM 
and containers.

http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java 
b/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
index 6ee6ac8..6948f80 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java
@@ -22,29 +22,17 @@ package org.apache.twill.internal;
  */
 public final class EnvKeys {
 
-  public static final String TWILL_ZK_CONNECT = "TWILL_ZK_CONNECT";
-  public static final String TWILL_APP_RUN_ID = "TWILL_APP_RUN_ID";
   public static final String TWILL_RUN_ID = "TWILL_RUN_ID";
   public static final String TWILL_INSTANCE_ID = "TWILL_INSTANCE_ID";
   public static final String TWILL_INSTANCE_COUNT = "TWILL_INSTANCE_COUNT";
-  public static final String TWILL_RESERVED_MEMORY_MB = 
"TWILL_RESERVED_MEMORY_MB";
-
-  public static final String TWILL_FS_USER = "TWILL_FS_USER";
 
   /**
    * Cluster filesystem directory for storing twill app related files.
    */
-  public static final String TWILL_APP_DIR = "TWILL_APP_DIR";
-
-  public static final String TWILL_APP_NAME = "TWILL_APP_NAME";
-
-  public static final String TWILL_APP_LOG_LEVEL = "TWILL_APP_LOG_LEVEL";
-
   public static final String TWILL_RUNNABLE_NAME = "TWILL_RUNNABLE_NAME";
 
   public static final String TWILL_LOG_KAFKA_ZK = "TWILL_LOG_KAFKA_ZK";
 
-  public static final String YARN_RM_SCHEDULER_ADDRESS = 
"YARN_RM_SCHEDULER_ADDRESS";
   public static final String YARN_APP_ID = "YARN_APP_ID";
   public static final String YARN_APP_ID_CLUSTER_TIME = 
"YARN_APP_ID_CLUSTER_TIME";
   public static final String YARN_APP_ID_STR = "YARN_APP_ID_STR";
@@ -52,6 +40,7 @@ public final class EnvKeys {
   public static final String YARN_CONTAINER_ID = "YARN_CONTAINER_ID";
   public static final String YARN_CONTAINER_HOST = "YARN_CONTAINER_HOST";
   public static final String YARN_CONTAINER_PORT = "YARN_CONTAINER_PORT";
+
   /**
    * Used to inform runnables of their resource usage.
    */

http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
 
b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
index 3eefb8c..53c378a 100644
--- 
a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
+++ 
b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java
@@ -136,7 +136,7 @@ public final class TwillContainerLauncher {
     int memory = Resources.computeMaxHeapSize(containerInfo.getMemoryMB(), 
reservedMemory, Constants.HEAP_MIN_RATIO);
     commandBuilder.add("-Djava.io.tmpdir=tmp",
                        "-Dyarn.container=$" + EnvKeys.YARN_CONTAINER_ID,
-                       "-Dtwill.runnable=$" + EnvKeys.TWILL_APP_NAME + ".$" + 
EnvKeys.TWILL_RUNNABLE_NAME,
+                       "-Dtwill.runnable=$" + Constants.TWILL_APP_NAME + ".$" 
+ EnvKeys.TWILL_RUNNABLE_NAME,
                        "-cp", Constants.Files.LAUNCHER_JAR + ":" + classPath,
                        "-Xmx" + memory + "m");
     if (jvmOpts.getExtraOptions() != null) {

http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
 
b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
new file mode 100644
index 0000000..a48133f
--- /dev/null
+++ 
b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal;
+
+import org.apache.twill.api.RunId;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.LogEntry;
+
+import java.net.URI;
+import javax.annotation.Nullable;
+
+/**
+ * Represents runtime specification of a {@link TwillApplication}.
+ */
+public class TwillRuntimeSpecification {
+
+  private final TwillSpecification twillSpecification;
+
+  private final String fsUser;
+  private final URI twillAppDir;
+  private final String zkConnectStr;
+  private final RunId twillRunId;
+  private final String twillAppName;
+  private final int reservedMemory;
+  private final String rmSchedulerAddr;
+  private final LogEntry.Level logLevel;
+
+  public TwillRuntimeSpecification(TwillSpecification twillSpecification, 
String fsUser, URI twillAppDir,
+                                   String zkConnectStr, RunId twillRunId, 
String twillAppName,
+                                   int reservedMemory, @Nullable String 
rmSchedulerAddr,
+                                   @Nullable LogEntry.Level logLevel) {
+    this.twillSpecification = twillSpecification;
+    this.fsUser = fsUser;
+    this.twillAppDir = twillAppDir;
+    this.zkConnectStr = zkConnectStr;
+    this.twillRunId = twillRunId;
+    this.twillAppName = twillAppName;
+    this.reservedMemory = reservedMemory;
+    this.rmSchedulerAddr = rmSchedulerAddr;
+    this.logLevel = logLevel;
+  }
+
+  public TwillSpecification getTwillSpecification() {
+    return twillSpecification;
+  }
+
+  public String getFsUser() {
+    return fsUser;
+  }
+
+  public URI getTwillAppDir() {
+    return twillAppDir;
+  }
+
+  public String getZkConnectStr() {
+    return zkConnectStr;
+  }
+
+  public RunId getTwillRunId() {
+    return twillRunId;
+  }
+
+  public String getTwillAppName() {
+    return twillAppName;
+  }
+
+  public int getReservedMemory() {
+    return reservedMemory;
+  }
+
+  @Nullable
+  public String getRmSchedulerAddr() {
+    return rmSchedulerAddr;
+  }
+
+  @Nullable
+  public LogEntry.Level getLogLevel() {
+    return logLevel;
+  }
+}

http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java
 
b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java
new file mode 100644
index 0000000..ae9747d
--- /dev/null
+++ 
b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.TypeAdapterFactory;
+import com.google.gson.reflect.TypeToken;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
+import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.LocalFile;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.RuntimeSpecification;
+import org.apache.twill.api.TwillRunnableSpecification;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.internal.TwillRuntimeSpecification;
+import 
org.apache.twill.internal.json.TwillSpecificationCodec.EventHandlerSpecificationCoder;
+import 
org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationOrderCoder;
+import 
org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationPlacementPolicyCoder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Reader;
+import java.io.Writer;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Map;
+
+/**
+ *
+ */
+public final class TwillRuntimeSpecificationAdapter {
+
+  private final Gson gson;
+
+  public static TwillRuntimeSpecificationAdapter create() {
+    return new TwillRuntimeSpecificationAdapter();
+  }
+
+  private TwillRuntimeSpecificationAdapter() {
+    gson = new GsonBuilder()
+              .serializeNulls()
+              .registerTypeAdapter(TwillRuntimeSpecification.class, new 
TwillRuntimeSpecificationCodec())
+              .registerTypeAdapter(TwillSpecification.class, new 
TwillSpecificationCodec())
+              .registerTypeAdapter(TwillSpecification.Order.class, new 
TwillSpecificationOrderCoder())
+              .registerTypeAdapter(TwillSpecification.PlacementPolicy.class,
+                                   new 
TwillSpecificationPlacementPolicyCoder())
+              .registerTypeAdapter(EventHandlerSpecification.class, new 
EventHandlerSpecificationCoder())
+              .registerTypeAdapter(RuntimeSpecification.class, new 
RuntimeSpecificationCodec())
+              .registerTypeAdapter(TwillRunnableSpecification.class, new 
TwillRunnableSpecificationCodec())
+              .registerTypeAdapter(ResourceSpecification.class, new 
ResourceSpecificationCodec())
+              .registerTypeAdapter(LocalFile.class, new LocalFileCodec())
+              .registerTypeAdapterFactory(new 
TwillSpecificationTypeAdapterFactory())
+              .create();
+  }
+
+  public String toJson(TwillRuntimeSpecification spec) {
+    return gson.toJson(spec, TwillRuntimeSpecification.class);
+  }
+
+  public void toJson(TwillRuntimeSpecification spec, Writer writer) {
+    gson.toJson(spec, TwillRuntimeSpecification.class, writer);
+  }
+
+  public void toJson(TwillRuntimeSpecification spec, File file) throws 
IOException {
+    try (Writer writer = Files.newWriter(file, Charsets.UTF_8)) {
+      toJson(spec, writer);
+    }
+  }
+
+  public TwillRuntimeSpecification fromJson(String json) {
+    return gson.fromJson(json, TwillRuntimeSpecification.class);
+  }
+
+  public TwillRuntimeSpecification fromJson(Reader reader) {
+    return gson.fromJson(reader, TwillRuntimeSpecification.class);
+  }
+
+  public TwillRuntimeSpecification fromJson(File file) throws IOException {
+    try (Reader reader = Files.newReader(file, Charsets.UTF_8)) {
+      return fromJson(reader);
+    }
+  }
+
+  // This is to get around gson ignoring of inner class
+  private static final class TwillSpecificationTypeAdapterFactory implements 
TypeAdapterFactory {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
+      Class<?> rawType = type.getRawType();
+      if (!Map.class.isAssignableFrom(rawType)) {
+        return null;
+      }
+      Type[] typeArgs = ((ParameterizedType) 
type.getType()).getActualTypeArguments();
+      TypeToken<?> keyType = TypeToken.get(typeArgs[0]);
+      TypeToken<?> valueType = TypeToken.get(typeArgs[1]);
+      if (keyType.getRawType() != String.class) {
+        return null;
+      }
+      return (TypeAdapter<T>) mapAdapter(gson, valueType);
+    }
+
+    private <V> TypeAdapter<Map<String, V>> mapAdapter(Gson gson, TypeToken<V> 
valueType) {
+      final TypeAdapter<V> valueAdapter = gson.getAdapter(valueType);
+
+      return new TypeAdapter<Map<String, V>>() {
+        @Override
+        public void write(JsonWriter writer, Map<String, V> map) throws 
IOException {
+          if (map == null) {
+            writer.nullValue();
+            return;
+          }
+          writer.beginObject();
+          for (Map.Entry<String, V> entry : map.entrySet()) {
+            writer.name(entry.getKey());
+            valueAdapter.write(writer, entry.getValue());
+          }
+          writer.endObject();
+        }
+
+        @Override
+        public Map<String, V> read(JsonReader reader) throws IOException {
+          if (reader.peek() == JsonToken.NULL) {
+            reader.nextNull();
+            return null;
+          }
+          if (reader.peek() != JsonToken.BEGIN_OBJECT) {
+            return null;
+          }
+          Map<String, V> map = Maps.newHashMap();
+          reader.beginObject();
+          while (reader.peek() != JsonToken.END_OBJECT) {
+            map.put(reader.nextName(), valueAdapter.read(reader));
+          }
+          reader.endObject();
+          return map;
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
 
b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
new file mode 100644
index 0000000..6100c99
--- /dev/null
+++ 
b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.json;
+
+import com.google.common.reflect.TypeToken;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.LogEntry;
+import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.TwillRuntimeSpecification;
+
+import java.lang.reflect.Type;
+import java.net.URI;
+
+/**
+ * Codec for serializing and deserializing a {@link TwillRuntimeSpecification} 
object using json.
+ */
+final class TwillRuntimeSpecificationCodec implements 
JsonSerializer<TwillRuntimeSpecification>,
+                                                         
JsonDeserializer<TwillRuntimeSpecification> {
+
+  private static final String FS_USER = "fsUser";
+  private static final String TWILL_APP_DIR = "twillAppDir";
+  private static final String ZK_CONNECT_STR = "zkConnectStr";
+  private static final String TWILL_RUNID = "twillRunId";
+  private static final String TWILL_APP_NAME = "twillAppName";
+  private static final String RESERVED_MEMORY = "reservedMemory";
+  private static final String RM_SCHEDULER_ADDR = "rmSchedulerAddr";
+  private static final String LOG_LEVEL = "logLevel";
+  private static final String TWILL_SPEC = "twillSpecification";
+
+  @Override
+  public JsonElement serialize(TwillRuntimeSpecification src, Type typeOfSrc, 
JsonSerializationContext context) {
+    JsonObject json = new JsonObject();
+    json.addProperty(FS_USER, src.getFsUser());
+    json.addProperty(TWILL_APP_DIR, src.getTwillAppDir().toASCIIString());
+    json.addProperty(ZK_CONNECT_STR, src.getZkConnectStr());
+    json.addProperty(TWILL_RUNID, src.getTwillRunId().getId());
+    json.addProperty(TWILL_APP_NAME, src.getTwillAppName());
+    json.addProperty(RESERVED_MEMORY, src.getReservedMemory());
+    if (src.getRmSchedulerAddr() != null) {
+      json.addProperty(RM_SCHEDULER_ADDR, src.getRmSchedulerAddr());
+    }
+    if (src.getLogLevel() != null) {
+      json.addProperty(LOG_LEVEL, src.getLogLevel().name());
+    }
+    json.add(TWILL_SPEC, context.serialize(src.getTwillSpecification(),
+                                           new TypeToken<TwillSpecification>() 
{ }.getType()));
+    return json;
+  }
+
+  @Override
+  public TwillRuntimeSpecification deserialize(JsonElement json, Type typeOfT,
+                                               JsonDeserializationContext 
context) throws JsonParseException {
+    JsonObject jsonObj = json.getAsJsonObject();
+
+    TwillSpecification twillSpecification = context.deserialize(
+      jsonObj.get(TWILL_SPEC), new TypeToken<TwillSpecification>() { 
}.getType());
+    return new TwillRuntimeSpecification(twillSpecification,
+                                         jsonObj.get(FS_USER).getAsString(),
+                                         
URI.create(jsonObj.get(TWILL_APP_DIR).getAsString()),
+                                         
jsonObj.get(ZK_CONNECT_STR).getAsString(),
+                                         
RunIds.fromString(jsonObj.get(TWILL_RUNID).getAsString()),
+                                         
jsonObj.get(TWILL_APP_NAME).getAsString(),
+                                         
jsonObj.get(RESERVED_MEMORY).getAsInt(),
+                                         jsonObj.has(RM_SCHEDULER_ADDR) ?
+                                                  
jsonObj.get(RM_SCHEDULER_ADDR).getAsString() : null,
+                                         jsonObj.has(LOG_LEVEL) ?
+                                           
LogEntry.Level.valueOf(jsonObj.get(LOG_LEVEL).getAsString()) : null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
----------------------------------------------------------------------
diff --git 
a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
 
b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
deleted file mode 100644
index eda0c71..0000000
--- 
a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.json;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Maps;
-import com.google.common.io.Files;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.TypeAdapter;
-import com.google.gson.TypeAdapterFactory;
-import com.google.gson.reflect.TypeToken;
-import com.google.gson.stream.JsonReader;
-import com.google.gson.stream.JsonToken;
-import com.google.gson.stream.JsonWriter;
-import org.apache.twill.api.EventHandlerSpecification;
-import org.apache.twill.api.LocalFile;
-import org.apache.twill.api.ResourceSpecification;
-import org.apache.twill.api.RuntimeSpecification;
-import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.api.TwillSpecification;
-import 
org.apache.twill.internal.json.TwillSpecificationCodec.EventHandlerSpecificationCoder;
-import 
org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationOrderCoder;
-import 
org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationPlacementPolicyCoder;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Reader;
-import java.io.Writer;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.util.Map;
-
-/**
- *
- */
-public final class TwillSpecificationAdapter {
-
-  private final Gson gson;
-
-  public static TwillSpecificationAdapter create() {
-    return new TwillSpecificationAdapter();
-  }
-
-  private TwillSpecificationAdapter() {
-    gson = new GsonBuilder()
-              .serializeNulls()
-              .registerTypeAdapter(TwillSpecification.class, new 
TwillSpecificationCodec())
-              .registerTypeAdapter(TwillSpecification.Order.class, new 
TwillSpecificationOrderCoder())
-              .registerTypeAdapter(TwillSpecification.PlacementPolicy.class,
-                                   new 
TwillSpecificationPlacementPolicyCoder())
-              .registerTypeAdapter(EventHandlerSpecification.class, new 
EventHandlerSpecificationCoder())
-              .registerTypeAdapter(RuntimeSpecification.class, new 
RuntimeSpecificationCodec())
-              .registerTypeAdapter(TwillRunnableSpecification.class, new 
TwillRunnableSpecificationCodec())
-              .registerTypeAdapter(ResourceSpecification.class, new 
ResourceSpecificationCodec())
-              .registerTypeAdapter(LocalFile.class, new LocalFileCodec())
-              .registerTypeAdapterFactory(new 
TwillSpecificationTypeAdapterFactory())
-              .create();
-  }
-
-  public String toJson(TwillSpecification spec) {
-    return gson.toJson(spec, TwillSpecification.class);
-  }
-
-  public void toJson(TwillSpecification spec, Writer writer) {
-    gson.toJson(spec, TwillSpecification.class, writer);
-  }
-
-  public void toJson(TwillSpecification spec, File file) throws IOException {
-    try (Writer writer = Files.newWriter(file, Charsets.UTF_8)) {
-      toJson(spec, writer);
-    }
-  }
-
-  public TwillSpecification fromJson(String json) {
-    return gson.fromJson(json, TwillSpecification.class);
-  }
-
-  public TwillSpecification fromJson(Reader reader) {
-    return gson.fromJson(reader, TwillSpecification.class);
-  }
-
-  public TwillSpecification fromJson(File file) throws IOException {
-    try (Reader reader = Files.newReader(file, Charsets.UTF_8)) {
-      return fromJson(reader);
-    }
-  }
-
-  // This is to get around gson ignoring of inner class
-  private static final class TwillSpecificationTypeAdapterFactory implements 
TypeAdapterFactory {
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) {
-      Class<?> rawType = type.getRawType();
-      if (!Map.class.isAssignableFrom(rawType)) {
-        return null;
-      }
-      Type[] typeArgs = ((ParameterizedType) 
type.getType()).getActualTypeArguments();
-      TypeToken<?> keyType = TypeToken.get(typeArgs[0]);
-      TypeToken<?> valueType = TypeToken.get(typeArgs[1]);
-      if (keyType.getRawType() != String.class) {
-        return null;
-      }
-      return (TypeAdapter<T>) mapAdapter(gson, valueType);
-    }
-
-    private <V> TypeAdapter<Map<String, V>> mapAdapter(Gson gson, TypeToken<V> 
valueType) {
-      final TypeAdapter<V> valueAdapter = gson.getAdapter(valueType);
-
-      return new TypeAdapter<Map<String, V>>() {
-        @Override
-        public void write(JsonWriter writer, Map<String, V> map) throws 
IOException {
-          if (map == null) {
-            writer.nullValue();
-            return;
-          }
-          writer.beginObject();
-          for (Map.Entry<String, V> entry : map.entrySet()) {
-            writer.name(entry.getKey());
-            valueAdapter.write(writer, entry.getValue());
-          }
-          writer.endObject();
-        }
-
-        @Override
-        public Map<String, V> read(JsonReader reader) throws IOException {
-          if (reader.peek() == JsonToken.NULL) {
-            reader.nextNull();
-            return null;
-          }
-          if (reader.peek() != JsonToken.BEGIN_OBJECT) {
-            return null;
-          }
-          Map<String, V> map = Maps.newHashMap();
-          reader.beginObject();
-          while (reader.peek() != JsonToken.END_OBJECT) {
-            map.put(reader.nextName(), valueAdapter.read(reader));
-          }
-          reader.endObject();
-          return map;
-        }
-      };
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java 
b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index a6d9132..d6bbbff 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -129,11 +129,10 @@ public abstract class ServiceMain {
   protected abstract String getRunnableName();
 
   /**
-   * Returns the {@link Location} for the application based on the env {@link 
EnvKeys#TWILL_APP_DIR}.
+   * Returns the {@link Location} for the application based on the app 
directory.
    */
-  protected static Location createAppLocation(final Configuration conf) {
+  protected static Location createAppLocation(final Configuration conf, String 
fsUser, final URI appDir) {
     // Note: It's a little bit hacky based on the uri schema to create the 
LocationFactory, refactor it later.
-    final URI appDir = URI.create(System.getenv(EnvKeys.TWILL_APP_DIR));
 
     try {
       if ("file".equals(appDir.getScheme())) {
@@ -145,10 +144,6 @@ public abstract class ServiceMain {
       if (UserGroupInformation.isSecurityEnabled()) {
         ugi = UserGroupInformation.getCurrentUser();
       } else {
-        String fsUser = System.getenv(EnvKeys.TWILL_FS_USER);
-        if (fsUser == null) {
-          throw new IllegalStateException("Missing environment variable " + 
EnvKeys.TWILL_FS_USER);
-        }
         ugi = UserGroupInformation.createRemoteUser(fsUser);
       }
       return ugi.doAs(new PrivilegedExceptionAction<Location>() {

http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
index e708fb8..2ec1a14 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java
@@ -25,8 +25,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.twill.api.RunId;
 import org.apache.twill.internal.Constants;
 import org.apache.twill.internal.EnvKeys;
-import org.apache.twill.internal.RunIds;
 import org.apache.twill.internal.ServiceMain;
+import org.apache.twill.internal.TwillRuntimeSpecification;
+import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
 import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
 import org.apache.twill.internal.logging.Loggings;
 import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory;
@@ -64,17 +65,19 @@ public final class ApplicationMasterMain extends 
ServiceMain {
    * Starts the application master.
    */
   public static void main(String[] args) throws Exception {
-    String zkConnect = System.getenv(EnvKeys.TWILL_ZK_CONNECT);
     File twillSpec = new File(Constants.Files.TWILL_SPEC);
-    RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
+    TwillRuntimeSpecification twillRuntimeSpec = 
TwillRuntimeSpecificationAdapter.create().fromJson(twillSpec);
+    String zkConnect = twillRuntimeSpec.getZkConnectStr();
+    RunId runId = twillRuntimeSpec.getTwillRunId();
 
-    ZKClientService zkClientService = createZKClient(zkConnect, 
System.getenv(EnvKeys.TWILL_APP_NAME));
+    ZKClientService zkClientService = createZKClient(zkConnect, 
twillRuntimeSpec.getTwillAppName());
     Configuration conf = new YarnConfiguration(new HdfsConfiguration(new 
Configuration()));
-    setRMSchedulerAddress(conf);
+    setRMSchedulerAddress(conf, twillRuntimeSpec.getRmSchedulerAddr());
 
     final YarnAMClient amClient = new 
VersionDetectYarnAMClientFactory(conf).create();
-    ApplicationMasterService service = new ApplicationMasterService(runId, 
zkClientService,
-                                                                    twillSpec, 
amClient, createAppLocation(conf));
+    ApplicationMasterService service =
+      new ApplicationMasterService(runId, zkClientService, twillRuntimeSpec, 
amClient, createAppLocation(
+        conf, twillRuntimeSpec.getFsUser(), 
twillRuntimeSpec.getTwillAppDir()));
     TrackerService trackerService = new TrackerService(service);
 
     new ApplicationMasterMain(service.getKafkaZKConnect())
@@ -90,8 +93,7 @@ public final class ApplicationMasterMain extends ServiceMain {
   /**
    * Optionally sets the RM scheduler address based on the environment 
variable if it is not set in the cluster config.
    */
-  private static void setRMSchedulerAddress(Configuration conf) {
-    String schedulerAddress = System.getenv(EnvKeys.YARN_RM_SCHEDULER_ADDRESS);
+  private static void setRMSchedulerAddress(Configuration conf, String 
schedulerAddress) {
     if (schedulerAddress == null) {
       return;
     }
@@ -123,7 +125,6 @@ public final class ApplicationMasterMain extends 
ServiceMain {
     return System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
   }
 
-
   /**
    * A service wrapper for starting/stopping {@link EmbeddedKafkaServer} and 
make sure the ZK path for
    * Kafka exists before starting the Kafka server.

http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index f46adb2..063bed2 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -67,9 +67,10 @@ import org.apache.twill.internal.EnvKeys;
 import org.apache.twill.internal.JvmOptions;
 import org.apache.twill.internal.ProcessLauncher;
 import org.apache.twill.internal.TwillContainerLauncher;
+import org.apache.twill.internal.TwillRuntimeSpecification;
 import org.apache.twill.internal.json.JvmOptionsCodec;
 import org.apache.twill.internal.json.LocalFileCodec;
-import org.apache.twill.internal.json.TwillSpecificationAdapter;
+import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
 import org.apache.twill.internal.state.Message;
 import org.apache.twill.internal.utils.Instances;
 import org.apache.twill.internal.yarn.AbstractYarnTwillService;
@@ -126,23 +127,25 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
   private final Location applicationLocation;
   private final PlacementPolicyManager placementPolicyManager;
   private final Map<String, Map<String, String>> environments;
+  private final TwillRuntimeSpecification twillRuntimeSpec;
 
   private volatile boolean stopped;
   private Queue<RunnableContainerRequest> runnableContainerRequests;
   private ExecutorService instanceChangeExecutor;
 
-  public ApplicationMasterService(RunId runId, ZKClient zkClient, File 
twillSpecFile,
+  public ApplicationMasterService(RunId runId, ZKClient zkClient, 
TwillRuntimeSpecification twillRuntimeSpec,
                                   YarnAMClient amClient, Location 
applicationLocation) throws Exception {
     super(zkClient, runId, applicationLocation);
 
     this.runId = runId;
-    this.twillSpec = 
TwillSpecificationAdapter.create().fromJson(twillSpecFile);
+    this.twillRuntimeSpec = twillRuntimeSpec;
     this.zkClient = zkClient;
     this.applicationLocation = applicationLocation;
     this.amClient = amClient;
     this.credentials = createCredentials();
     this.jvmOpts = loadJvmOptions();
-    this.reservedMemory = getReservedMemory();
+    this.reservedMemory = twillRuntimeSpec.getReservedMemory();
+    this.twillSpec = twillRuntimeSpec.getTwillSpecification();
     this.placementPolicyManager = new 
PlacementPolicyManager(twillSpec.getPlacementPolicies());
     this.environments = getEnvironments();
 
@@ -168,18 +171,6 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
     });
   }
 
-  private int getReservedMemory() {
-    String value = System.getenv(EnvKeys.TWILL_RESERVED_MEMORY_MB);
-    if (value == null) {
-      return Configs.Defaults.JAVA_RESERVED_MEMORY_MB;
-    }
-    try {
-      return Integer.parseInt(value);
-    } catch (Exception e) {
-      return Configs.Defaults.JAVA_RESERVED_MEMORY_MB;
-    }
-  }
-
   @SuppressWarnings("unchecked")
   @Nullable
   private EventHandler createEventHandler(TwillSpecification twillSpec) {
@@ -227,7 +218,8 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
 
   @Override
   protected void doStart() throws Exception {
-    LOG.info("Start application master with spec: " + 
TwillSpecificationAdapter.create().toJson(twillSpec));
+    LOG.info("Start application master with spec: " +
+               
TwillRuntimeSpecificationAdapter.create().toJson(twillRuntimeSpec));
 
     // initialize the event handler, if it fails, it will fail the application.
     if (eventHandler != null) {
@@ -246,7 +238,8 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
   protected void doStop() throws Exception {
     Thread.interrupted();     // This is just to clear the interrupt flag
 
-    LOG.info("Stop application master with spec: {}", 
TwillSpecificationAdapter.create().toJson(twillSpec));
+    LOG.info("Stop application master with spec: {}",
+             
TwillRuntimeSpecificationAdapter.create().toJson(twillRuntimeSpec));
 
     if (eventHandler != null) {
       try {
@@ -656,7 +649,7 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
       String runnableName = provisionRequest.getRuntimeSpec().getName();
       LOG.info("Starting runnable {} with {}", runnableName, processLauncher);
 
-      LOG.debug("Log level for Twill runnable {} is {}", runnableName, 
System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL));
+      LOG.debug("Log level for Twill runnable {} is {}", runnableName, 
twillRuntimeSpec.getLogLevel());
 
       int containerCount = expectedContainers.getExpected(runnableName);
 
@@ -666,12 +659,6 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
         env.putAll(environments.get(runnableName));
       }
       // Override with system env
-      env.put(EnvKeys.TWILL_APP_DIR, System.getenv(EnvKeys.TWILL_APP_DIR));
-      env.put(EnvKeys.TWILL_FS_USER, System.getenv(EnvKeys.TWILL_FS_USER));
-      env.put(EnvKeys.TWILL_APP_RUN_ID, runId.getId());
-      env.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName());
-      env.put(EnvKeys.TWILL_APP_LOG_LEVEL, 
System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL));
-      env.put(EnvKeys.TWILL_ZK_CONNECT, 
System.getenv(EnvKeys.TWILL_ZK_CONNECT));
       env.put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect());
 
       ProcessLauncher.PrepareLaunchContext launchContext = 
processLauncher.prepareLaunch(env, getLocalizeFiles(),
@@ -682,7 +669,8 @@ public final class ApplicationMasterService extends 
AbstractYarnTwillService imp
         ZKClients.namespace(zkClient, getZKNamespace(runnableName)),
         containerCount, jvmOpts, reservedMemory, getSecureStoreLocation());
 
-      runningContainers.start(runnableName, 
processLauncher.getContainerInfo(), launcher);
+      runningContainers.start(runnableName, 
processLauncher.getContainerInfo(), launcher,
+                              twillRuntimeSpec.getLogLevel());
 
       // Need to call complete to workaround bug in YARN AMRMClient
       if (provisionRequest.containerAcquired()) {

http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
index 3d1b0f3..2d49d02 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -126,7 +126,8 @@ final class RunningContainers {
   /**
    * Start a container for a runnable.
    */
-  void start(String runnableName, ContainerInfo containerInfo, 
TwillContainerLauncher launcher) {
+  void start(String runnableName, ContainerInfo containerInfo, 
TwillContainerLauncher launcher,
+             LogEntry.Level logLevel) {
     containerLock.lock();
     try {
       int instanceId = getStartInstanceId(runnableName);
@@ -140,7 +141,7 @@ final class RunningContainers {
                                                                  
containerInfo.getMemoryMB(),
                                                                  
containerInfo.getHost().getHostName(),
                                                                  controller,
-                                                                 
System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL));
+                                                                 logLevel);
       resourceReport.addRunResources(runnableName, resources);
       containerStats.put(runnableName, containerInfo);
 
@@ -584,9 +585,8 @@ final class RunningContainers {
 
     private DynamicTwillRunResources(int instanceId, String containerId,
                                      int cores, int memoryMB, String host,
-                                     TwillContainerController controller, 
String logLevel) {
-      super(instanceId, containerId, cores, memoryMB, host, null,
-            (logLevel != null) ? LogEntry.Level.valueOf(logLevel) : null);
+                                     TwillContainerController controller, 
LogEntry.Level logLevel) {
+      super(instanceId, containerId, cores, memoryMB, host, null, logLevel);
       this.controller = controller;
     }
 

http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
 
b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
index efccd89..451db69 100644
--- 
a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
+++ 
b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java
@@ -18,7 +18,6 @@
 package org.apache.twill.internal.container;
 
 import com.google.common.base.Charsets;
-import com.google.common.base.Strings;
 import com.google.common.io.Files;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.AbstractService;
@@ -30,7 +29,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.twill.api.RunId;
 import org.apache.twill.api.TwillRunnableSpecification;
-import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.LogEntry;
 import org.apache.twill.discovery.ZKDiscoveryService;
 import org.apache.twill.internal.Arguments;
 import org.apache.twill.internal.BasicTwillContext;
@@ -40,8 +39,9 @@ import org.apache.twill.internal.EnvContainerInfo;
 import org.apache.twill.internal.EnvKeys;
 import org.apache.twill.internal.RunIds;
 import org.apache.twill.internal.ServiceMain;
+import org.apache.twill.internal.TwillRuntimeSpecification;
 import org.apache.twill.internal.json.ArgumentsCodec;
-import org.apache.twill.internal.json.TwillSpecificationAdapter;
+import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
 import org.apache.twill.internal.logging.Loggings;
 import org.apache.twill.zookeeper.ZKClient;
 import org.apache.twill.zookeeper.ZKClientService;
@@ -61,6 +61,7 @@ import java.io.Reader;
 public final class TwillContainerMain extends ServiceMain {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TwillContainerMain.class);
+  private static LogEntry.Level logLevel;
 
   /**
    * Main method for launching a {@link TwillContainerService} which runs
@@ -70,22 +71,23 @@ public final class TwillContainerMain extends ServiceMain {
     // Try to load the secure store from localized file, which AM requested RM 
to localize it for this container.
     loadSecureStore();
 
-    String zkConnectStr = System.getenv(EnvKeys.TWILL_ZK_CONNECT);
     File twillSpecFile = new File(Constants.Files.TWILL_SPEC);
-    RunId appRunId = 
RunIds.fromString(System.getenv(EnvKeys.TWILL_APP_RUN_ID));
+    TwillRuntimeSpecification twillRuntimeSpec = loadTwillSpec(twillSpecFile);
+    String zkConnectStr = twillRuntimeSpec.getZkConnectStr();
+    RunId appRunId = twillRuntimeSpec.getTwillRunId();
     RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID));
     String runnableName = System.getenv(EnvKeys.TWILL_RUNNABLE_NAME);
     int instanceId = 
Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID));
     int instanceCount = 
Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT));
+    logLevel = twillRuntimeSpec.getLogLevel();
 
-    ZKClientService zkClientService = createZKClient(zkConnectStr, 
System.getenv(EnvKeys.TWILL_APP_NAME));
+    ZKClientService zkClientService = createZKClient(zkConnectStr, 
twillRuntimeSpec.getTwillAppName());
     ZKDiscoveryService discoveryService = new 
ZKDiscoveryService(zkClientService);
 
     ZKClient appRunZkClient = getAppRunZKClient(zkClientService, appRunId);
-
-    TwillSpecification twillSpec = loadTwillSpec(twillSpecFile);
     
-    TwillRunnableSpecification runnableSpec = 
twillSpec.getRunnables().get(runnableName).getRunnableSpecification();
+    TwillRunnableSpecification runnableSpec =
+      
twillRuntimeSpec.getTwillSpecification().getRunnables().get(runnableName).getRunnableSpecification();
     ContainerInfo containerInfo = new EnvContainerInfo();
     Arguments arguments = decodeArgs();
     BasicTwillContext context = new BasicTwillContext(
@@ -100,7 +102,8 @@ public final class TwillContainerMain extends ServiceMain {
     Configuration conf = new YarnConfiguration(new HdfsConfiguration(new 
Configuration()));
     Service service = new TwillContainerService(context, containerInfo, 
containerZKClient,
                                                 runId, runnableSpec, 
getClassLoader(),
-                                                createAppLocation(conf));
+                                                createAppLocation(conf, 
twillRuntimeSpec.getFsUser(),
+                                                                  
twillRuntimeSpec.getTwillAppDir()));
     new TwillContainerMain().doMain(
       service,
       zkClientService,
@@ -112,9 +115,7 @@ public final class TwillContainerMain extends ServiceMain {
 
   @Override
   protected String getLoggerLevel(Logger logger) {
-    String appLogLevel = System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL);
-
-    return Strings.isNullOrEmpty(appLogLevel) ? super.getLoggerLevel(logger) : 
appLogLevel;
+    return logLevel == null ? super.getLoggerLevel(logger) : logLevel.name();
   }
 
   private static void loadSecureStore() throws IOException {
@@ -156,9 +157,9 @@ public final class TwillContainerMain extends ServiceMain {
     return classLoader;
   }
 
-  private static TwillSpecification loadTwillSpec(File specFile) throws 
IOException {
+  private static TwillRuntimeSpecification loadTwillSpec(File specFile) throws 
IOException {
     try (Reader reader = Files.newReader(specFile, Charsets.UTF_8)) {
-      return TwillSpecificationAdapter.create().fromJson(reader);
+      return TwillRuntimeSpecificationAdapter.create().fromJson(reader);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git 
a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java 
b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index f7cb388..811aa04 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -68,13 +68,14 @@ import org.apache.twill.internal.LogOnlyEventHandler;
 import org.apache.twill.internal.ProcessController;
 import org.apache.twill.internal.ProcessLauncher;
 import org.apache.twill.internal.RunIds;
+import org.apache.twill.internal.TwillRuntimeSpecification;
 import org.apache.twill.internal.appmaster.ApplicationMasterInfo;
 import org.apache.twill.internal.appmaster.ApplicationMasterMain;
 import org.apache.twill.internal.container.TwillContainerMain;
 import org.apache.twill.internal.json.ArgumentsCodec;
 import org.apache.twill.internal.json.JvmOptionsCodec;
 import org.apache.twill.internal.json.LocalFileCodec;
-import org.apache.twill.internal.json.TwillSpecificationAdapter;
+import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
 import org.apache.twill.internal.utils.Dependencies;
 import org.apache.twill.internal.utils.Paths;
 import org.apache.twill.internal.utils.Resources;
@@ -337,26 +338,17 @@ final class YarnTwillPreparer implements TwillPreparer {
           //     appMaster.jar
           //     org.apache.twill.internal.appmaster.ApplicationMasterMain
           //     false
-          ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, 
String>builder()
-            .put(EnvKeys.TWILL_FS_USER, fsUser)
-            .put(EnvKeys.TWILL_APP_DIR, 
getAppLocation().toURI().toASCIIString())
-            .put(EnvKeys.TWILL_ZK_CONNECT, zkConnectString)
-            .put(EnvKeys.TWILL_RUN_ID, runId.getId())
-            .put(EnvKeys.TWILL_RESERVED_MEMORY_MB, 
Integer.toString(reservedMemory))
-            .put(EnvKeys.TWILL_APP_NAME, twillSpec.getName())
-            .put(EnvKeys.YARN_RM_SCHEDULER_ADDRESS, 
yarnConfig.get(YarnConfiguration.RM_SCHEDULER_ADDRESS));
 
           LOG.debug("Log level is set to {} for the Twill application.", 
logLevel);
-          builder.put(EnvKeys.TWILL_APP_LOG_LEVEL, logLevel.toString());
 
           int memory = 
Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(),
                                                     
Constants.APP_MASTER_RESERVED_MEMORY_MB, Constants.HEAP_MIN_RATIO);
-          return launcher.prepareLaunch(builder.build(), localFiles.values(), 
credentials)
+          return launcher.prepareLaunch(ImmutableMap.<String, String>of(), 
localFiles.values(), credentials)
             .addCommand(
               "$JAVA_HOME/bin/java",
               "-Djava.io.tmpdir=tmp",
               "-Dyarn.appId=$" + EnvKeys.YARN_APP_ID_STR,
-              "-Dtwill.app=$" + EnvKeys.TWILL_APP_NAME,
+              "-Dtwill.app=$" + Constants.TWILL_APP_NAME,
               "-cp", Constants.Files.LAUNCHER_JAR + ":$HADOOP_CONF_DIR",
               "-Xmx" + memory + "m",
               extraOptions == null ? "" : extraOptions,
@@ -523,10 +515,13 @@ final class YarnTwillPreparer implements TwillPreparer {
       if (eventHandler == null) {
         eventHandler = new LogOnlyEventHandler().configure();
       }
-
-      TwillSpecificationAdapter.create().toJson(
-        new DefaultTwillSpecification(spec.getName(), runtimeSpec, 
spec.getOrders(), spec.getPlacementPolicies(),
-                                      eventHandler), writer);
+      TwillSpecification newTwillSpec = new 
DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(),
+                                                                      
spec.getPlacementPolicies(), eventHandler);
+      TwillRuntimeSpecificationAdapter.create().toJson(
+        new TwillRuntimeSpecification(newTwillSpec, 
locationFactory.getHomeLocation().getName(),
+                                      getAppLocation().toURI(), 
zkConnectString, runId, twillSpec.getName(),
+                                      reservedMemory, 
yarnConfig.get(YarnConfiguration.RM_SCHEDULER_ADDRESS),
+                                      logLevel), writer);
     }
     LOG.debug("Done {}", Constants.Files.TWILL_SPEC);
 

Reply via email to