Repository: twill Updated Branches: refs/heads/site c852eb67b -> b99b26278
Use TwillRuntimeSpecification to carry runtime information instead of using environment This closes #13 on Github. Signed-off-by: Terence Yim <cht...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/twill/repo Commit: http://git-wip-us.apache.org/repos/asf/twill/commit/6c7d32aa Tree: http://git-wip-us.apache.org/repos/asf/twill/tree/6c7d32aa Diff: http://git-wip-us.apache.org/repos/asf/twill/diff/6c7d32aa Branch: refs/heads/site Commit: 6c7d32aa70facec92482a28334877057319d5263 Parents: 983a14a Author: yaojiefeng <yao...@cask.co> Authored: Thu Sep 22 14:35:04 2016 -0700 Committer: Terence Yim <cht...@apache.org> Committed: Wed Oct 5 14:42:46 2016 -0700 ---------------------------------------------------------------------- .../org/apache/twill/internal/Constants.java | 4 + .../java/org/apache/twill/internal/EnvKeys.java | 13 +- .../twill/internal/TwillContainerLauncher.java | 2 +- .../internal/TwillRuntimeSpecification.java | 96 +++++++++++ .../json/TwillRuntimeSpecificationAdapter.java | 163 +++++++++++++++++++ .../json/TwillRuntimeSpecificationCodec.java | 91 +++++++++++ .../json/TwillSpecificationAdapter.java | 161 ------------------ .../org/apache/twill/internal/ServiceMain.java | 9 +- .../appmaster/ApplicationMasterMain.java | 21 +-- .../appmaster/ApplicationMasterService.java | 40 ++--- .../internal/appmaster/RunningContainers.java | 10 +- .../internal/container/TwillContainerMain.java | 31 ++-- .../apache/twill/yarn/YarnTwillPreparer.java | 27 ++- 13 files changed, 415 insertions(+), 253 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-common/src/main/java/org/apache/twill/internal/Constants.java ---------------------------------------------------------------------- diff --git a/twill-common/src/main/java/org/apache/twill/internal/Constants.java b/twill-common/src/main/java/org/apache/twill/internal/Constants.java index dd04eb1..8a33962 100644 --- a/twill-common/src/main/java/org/apache/twill/internal/Constants.java +++ b/twill-common/src/main/java/org/apache/twill/internal/Constants.java @@ -57,6 +57,10 @@ public final class Constants { public static final String DISCOVERY_PATH_PREFIX = "/discoverable"; public static final String INSTANCES_PATH_PREFIX = "/instances"; + /** + * Constants for twill variable names. + */ + public static final String TWILL_APP_NAME = "TWILL_APP_NAME"; /** * Constants for names of internal files that are shared between client, AM and containers. http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java b/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java index 6ee6ac8..6948f80 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java +++ b/twill-core/src/main/java/org/apache/twill/internal/EnvKeys.java @@ -22,29 +22,17 @@ package org.apache.twill.internal; */ public final class EnvKeys { - public static final String TWILL_ZK_CONNECT = "TWILL_ZK_CONNECT"; - public static final String TWILL_APP_RUN_ID = "TWILL_APP_RUN_ID"; public static final String TWILL_RUN_ID = "TWILL_RUN_ID"; public static final String TWILL_INSTANCE_ID = "TWILL_INSTANCE_ID"; public static final String TWILL_INSTANCE_COUNT = "TWILL_INSTANCE_COUNT"; - public static final String TWILL_RESERVED_MEMORY_MB = "TWILL_RESERVED_MEMORY_MB"; - - public static final String TWILL_FS_USER = "TWILL_FS_USER"; /** * Cluster filesystem directory for storing twill app related files. */ - public static final String TWILL_APP_DIR = "TWILL_APP_DIR"; - - public static final String TWILL_APP_NAME = "TWILL_APP_NAME"; - - public static final String TWILL_APP_LOG_LEVEL = "TWILL_APP_LOG_LEVEL"; - public static final String TWILL_RUNNABLE_NAME = "TWILL_RUNNABLE_NAME"; public static final String TWILL_LOG_KAFKA_ZK = "TWILL_LOG_KAFKA_ZK"; - public static final String YARN_RM_SCHEDULER_ADDRESS = "YARN_RM_SCHEDULER_ADDRESS"; public static final String YARN_APP_ID = "YARN_APP_ID"; public static final String YARN_APP_ID_CLUSTER_TIME = "YARN_APP_ID_CLUSTER_TIME"; public static final String YARN_APP_ID_STR = "YARN_APP_ID_STR"; @@ -52,6 +40,7 @@ public final class EnvKeys { public static final String YARN_CONTAINER_ID = "YARN_CONTAINER_ID"; public static final String YARN_CONTAINER_HOST = "YARN_CONTAINER_HOST"; public static final String YARN_CONTAINER_PORT = "YARN_CONTAINER_PORT"; + /** * Used to inform runnables of their resource usage. */ http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java index 3eefb8c..53c378a 100644 --- a/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java +++ b/twill-core/src/main/java/org/apache/twill/internal/TwillContainerLauncher.java @@ -136,7 +136,7 @@ public final class TwillContainerLauncher { int memory = Resources.computeMaxHeapSize(containerInfo.getMemoryMB(), reservedMemory, Constants.HEAP_MIN_RATIO); commandBuilder.add("-Djava.io.tmpdir=tmp", "-Dyarn.container=$" + EnvKeys.YARN_CONTAINER_ID, - "-Dtwill.runnable=$" + EnvKeys.TWILL_APP_NAME + ".$" + EnvKeys.TWILL_RUNNABLE_NAME, + "-Dtwill.runnable=$" + Constants.TWILL_APP_NAME + ".$" + EnvKeys.TWILL_RUNNABLE_NAME, "-cp", Constants.Files.LAUNCHER_JAR + ":" + classPath, "-Xmx" + memory + "m"); if (jvmOpts.getExtraOptions() != null) { http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java new file mode 100644 index 0000000..a48133f --- /dev/null +++ b/twill-core/src/main/java/org/apache/twill/internal/TwillRuntimeSpecification.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.internal; + +import org.apache.twill.api.RunId; +import org.apache.twill.api.TwillApplication; +import org.apache.twill.api.TwillSpecification; +import org.apache.twill.api.logging.LogEntry; + +import java.net.URI; +import javax.annotation.Nullable; + +/** + * Represents runtime specification of a {@link TwillApplication}. + */ +public class TwillRuntimeSpecification { + + private final TwillSpecification twillSpecification; + + private final String fsUser; + private final URI twillAppDir; + private final String zkConnectStr; + private final RunId twillRunId; + private final String twillAppName; + private final int reservedMemory; + private final String rmSchedulerAddr; + private final LogEntry.Level logLevel; + + public TwillRuntimeSpecification(TwillSpecification twillSpecification, String fsUser, URI twillAppDir, + String zkConnectStr, RunId twillRunId, String twillAppName, + int reservedMemory, @Nullable String rmSchedulerAddr, + @Nullable LogEntry.Level logLevel) { + this.twillSpecification = twillSpecification; + this.fsUser = fsUser; + this.twillAppDir = twillAppDir; + this.zkConnectStr = zkConnectStr; + this.twillRunId = twillRunId; + this.twillAppName = twillAppName; + this.reservedMemory = reservedMemory; + this.rmSchedulerAddr = rmSchedulerAddr; + this.logLevel = logLevel; + } + + public TwillSpecification getTwillSpecification() { + return twillSpecification; + } + + public String getFsUser() { + return fsUser; + } + + public URI getTwillAppDir() { + return twillAppDir; + } + + public String getZkConnectStr() { + return zkConnectStr; + } + + public RunId getTwillRunId() { + return twillRunId; + } + + public String getTwillAppName() { + return twillAppName; + } + + public int getReservedMemory() { + return reservedMemory; + } + + @Nullable + public String getRmSchedulerAddr() { + return rmSchedulerAddr; + } + + @Nullable + public LogEntry.Level getLogLevel() { + return logLevel; + } +} http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java new file mode 100644 index 0000000..ae9747d --- /dev/null +++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationAdapter.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.internal.json; + +import com.google.common.base.Charsets; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapter; +import com.google.gson.TypeAdapterFactory; +import com.google.gson.reflect.TypeToken; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonToken; +import com.google.gson.stream.JsonWriter; +import org.apache.twill.api.EventHandlerSpecification; +import org.apache.twill.api.LocalFile; +import org.apache.twill.api.ResourceSpecification; +import org.apache.twill.api.RuntimeSpecification; +import org.apache.twill.api.TwillRunnableSpecification; +import org.apache.twill.api.TwillSpecification; +import org.apache.twill.internal.TwillRuntimeSpecification; +import org.apache.twill.internal.json.TwillSpecificationCodec.EventHandlerSpecificationCoder; +import org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationOrderCoder; +import org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationPlacementPolicyCoder; + +import java.io.File; +import java.io.IOException; +import java.io.Reader; +import java.io.Writer; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.Map; + +/** + * + */ +public final class TwillRuntimeSpecificationAdapter { + + private final Gson gson; + + public static TwillRuntimeSpecificationAdapter create() { + return new TwillRuntimeSpecificationAdapter(); + } + + private TwillRuntimeSpecificationAdapter() { + gson = new GsonBuilder() + .serializeNulls() + .registerTypeAdapter(TwillRuntimeSpecification.class, new TwillRuntimeSpecificationCodec()) + .registerTypeAdapter(TwillSpecification.class, new TwillSpecificationCodec()) + .registerTypeAdapter(TwillSpecification.Order.class, new TwillSpecificationOrderCoder()) + .registerTypeAdapter(TwillSpecification.PlacementPolicy.class, + new TwillSpecificationPlacementPolicyCoder()) + .registerTypeAdapter(EventHandlerSpecification.class, new EventHandlerSpecificationCoder()) + .registerTypeAdapter(RuntimeSpecification.class, new RuntimeSpecificationCodec()) + .registerTypeAdapter(TwillRunnableSpecification.class, new TwillRunnableSpecificationCodec()) + .registerTypeAdapter(ResourceSpecification.class, new ResourceSpecificationCodec()) + .registerTypeAdapter(LocalFile.class, new LocalFileCodec()) + .registerTypeAdapterFactory(new TwillSpecificationTypeAdapterFactory()) + .create(); + } + + public String toJson(TwillRuntimeSpecification spec) { + return gson.toJson(spec, TwillRuntimeSpecification.class); + } + + public void toJson(TwillRuntimeSpecification spec, Writer writer) { + gson.toJson(spec, TwillRuntimeSpecification.class, writer); + } + + public void toJson(TwillRuntimeSpecification spec, File file) throws IOException { + try (Writer writer = Files.newWriter(file, Charsets.UTF_8)) { + toJson(spec, writer); + } + } + + public TwillRuntimeSpecification fromJson(String json) { + return gson.fromJson(json, TwillRuntimeSpecification.class); + } + + public TwillRuntimeSpecification fromJson(Reader reader) { + return gson.fromJson(reader, TwillRuntimeSpecification.class); + } + + public TwillRuntimeSpecification fromJson(File file) throws IOException { + try (Reader reader = Files.newReader(file, Charsets.UTF_8)) { + return fromJson(reader); + } + } + + // This is to get around gson ignoring of inner class + private static final class TwillSpecificationTypeAdapterFactory implements TypeAdapterFactory { + + @SuppressWarnings("unchecked") + @Override + public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) { + Class<?> rawType = type.getRawType(); + if (!Map.class.isAssignableFrom(rawType)) { + return null; + } + Type[] typeArgs = ((ParameterizedType) type.getType()).getActualTypeArguments(); + TypeToken<?> keyType = TypeToken.get(typeArgs[0]); + TypeToken<?> valueType = TypeToken.get(typeArgs[1]); + if (keyType.getRawType() != String.class) { + return null; + } + return (TypeAdapter<T>) mapAdapter(gson, valueType); + } + + private <V> TypeAdapter<Map<String, V>> mapAdapter(Gson gson, TypeToken<V> valueType) { + final TypeAdapter<V> valueAdapter = gson.getAdapter(valueType); + + return new TypeAdapter<Map<String, V>>() { + @Override + public void write(JsonWriter writer, Map<String, V> map) throws IOException { + if (map == null) { + writer.nullValue(); + return; + } + writer.beginObject(); + for (Map.Entry<String, V> entry : map.entrySet()) { + writer.name(entry.getKey()); + valueAdapter.write(writer, entry.getValue()); + } + writer.endObject(); + } + + @Override + public Map<String, V> read(JsonReader reader) throws IOException { + if (reader.peek() == JsonToken.NULL) { + reader.nextNull(); + return null; + } + if (reader.peek() != JsonToken.BEGIN_OBJECT) { + return null; + } + Map<String, V> map = Maps.newHashMap(); + reader.beginObject(); + while (reader.peek() != JsonToken.END_OBJECT) { + map.put(reader.nextName(), valueAdapter.read(reader)); + } + reader.endObject(); + return map; + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java new file mode 100644 index 0000000..6100c99 --- /dev/null +++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillRuntimeSpecificationCodec.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.internal.json; + +import com.google.common.reflect.TypeToken; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +import org.apache.twill.api.TwillSpecification; +import org.apache.twill.api.logging.LogEntry; +import org.apache.twill.internal.RunIds; +import org.apache.twill.internal.TwillRuntimeSpecification; + +import java.lang.reflect.Type; +import java.net.URI; + +/** + * Codec for serializing and deserializing a {@link TwillRuntimeSpecification} object using json. + */ +final class TwillRuntimeSpecificationCodec implements JsonSerializer<TwillRuntimeSpecification>, + JsonDeserializer<TwillRuntimeSpecification> { + + private static final String FS_USER = "fsUser"; + private static final String TWILL_APP_DIR = "twillAppDir"; + private static final String ZK_CONNECT_STR = "zkConnectStr"; + private static final String TWILL_RUNID = "twillRunId"; + private static final String TWILL_APP_NAME = "twillAppName"; + private static final String RESERVED_MEMORY = "reservedMemory"; + private static final String RM_SCHEDULER_ADDR = "rmSchedulerAddr"; + private static final String LOG_LEVEL = "logLevel"; + private static final String TWILL_SPEC = "twillSpecification"; + + @Override + public JsonElement serialize(TwillRuntimeSpecification src, Type typeOfSrc, JsonSerializationContext context) { + JsonObject json = new JsonObject(); + json.addProperty(FS_USER, src.getFsUser()); + json.addProperty(TWILL_APP_DIR, src.getTwillAppDir().toASCIIString()); + json.addProperty(ZK_CONNECT_STR, src.getZkConnectStr()); + json.addProperty(TWILL_RUNID, src.getTwillRunId().getId()); + json.addProperty(TWILL_APP_NAME, src.getTwillAppName()); + json.addProperty(RESERVED_MEMORY, src.getReservedMemory()); + if (src.getRmSchedulerAddr() != null) { + json.addProperty(RM_SCHEDULER_ADDR, src.getRmSchedulerAddr()); + } + if (src.getLogLevel() != null) { + json.addProperty(LOG_LEVEL, src.getLogLevel().name()); + } + json.add(TWILL_SPEC, context.serialize(src.getTwillSpecification(), + new TypeToken<TwillSpecification>() { }.getType())); + return json; + } + + @Override + public TwillRuntimeSpecification deserialize(JsonElement json, Type typeOfT, + JsonDeserializationContext context) throws JsonParseException { + JsonObject jsonObj = json.getAsJsonObject(); + + TwillSpecification twillSpecification = context.deserialize( + jsonObj.get(TWILL_SPEC), new TypeToken<TwillSpecification>() { }.getType()); + return new TwillRuntimeSpecification(twillSpecification, + jsonObj.get(FS_USER).getAsString(), + URI.create(jsonObj.get(TWILL_APP_DIR).getAsString()), + jsonObj.get(ZK_CONNECT_STR).getAsString(), + RunIds.fromString(jsonObj.get(TWILL_RUNID).getAsString()), + jsonObj.get(TWILL_APP_NAME).getAsString(), + jsonObj.get(RESERVED_MEMORY).getAsInt(), + jsonObj.has(RM_SCHEDULER_ADDR) ? + jsonObj.get(RM_SCHEDULER_ADDR).getAsString() : null, + jsonObj.has(LOG_LEVEL) ? + LogEntry.Level.valueOf(jsonObj.get(LOG_LEVEL).getAsString()) : null); + } +} http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java ---------------------------------------------------------------------- diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java deleted file mode 100644 index eda0c71..0000000 --- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.twill.internal.json; - -import com.google.common.base.Charsets; -import com.google.common.collect.Maps; -import com.google.common.io.Files; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.TypeAdapter; -import com.google.gson.TypeAdapterFactory; -import com.google.gson.reflect.TypeToken; -import com.google.gson.stream.JsonReader; -import com.google.gson.stream.JsonToken; -import com.google.gson.stream.JsonWriter; -import org.apache.twill.api.EventHandlerSpecification; -import org.apache.twill.api.LocalFile; -import org.apache.twill.api.ResourceSpecification; -import org.apache.twill.api.RuntimeSpecification; -import org.apache.twill.api.TwillRunnableSpecification; -import org.apache.twill.api.TwillSpecification; -import org.apache.twill.internal.json.TwillSpecificationCodec.EventHandlerSpecificationCoder; -import org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationOrderCoder; -import org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationPlacementPolicyCoder; - -import java.io.File; -import java.io.IOException; -import java.io.Reader; -import java.io.Writer; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.util.Map; - -/** - * - */ -public final class TwillSpecificationAdapter { - - private final Gson gson; - - public static TwillSpecificationAdapter create() { - return new TwillSpecificationAdapter(); - } - - private TwillSpecificationAdapter() { - gson = new GsonBuilder() - .serializeNulls() - .registerTypeAdapter(TwillSpecification.class, new TwillSpecificationCodec()) - .registerTypeAdapter(TwillSpecification.Order.class, new TwillSpecificationOrderCoder()) - .registerTypeAdapter(TwillSpecification.PlacementPolicy.class, - new TwillSpecificationPlacementPolicyCoder()) - .registerTypeAdapter(EventHandlerSpecification.class, new EventHandlerSpecificationCoder()) - .registerTypeAdapter(RuntimeSpecification.class, new RuntimeSpecificationCodec()) - .registerTypeAdapter(TwillRunnableSpecification.class, new TwillRunnableSpecificationCodec()) - .registerTypeAdapter(ResourceSpecification.class, new ResourceSpecificationCodec()) - .registerTypeAdapter(LocalFile.class, new LocalFileCodec()) - .registerTypeAdapterFactory(new TwillSpecificationTypeAdapterFactory()) - .create(); - } - - public String toJson(TwillSpecification spec) { - return gson.toJson(spec, TwillSpecification.class); - } - - public void toJson(TwillSpecification spec, Writer writer) { - gson.toJson(spec, TwillSpecification.class, writer); - } - - public void toJson(TwillSpecification spec, File file) throws IOException { - try (Writer writer = Files.newWriter(file, Charsets.UTF_8)) { - toJson(spec, writer); - } - } - - public TwillSpecification fromJson(String json) { - return gson.fromJson(json, TwillSpecification.class); - } - - public TwillSpecification fromJson(Reader reader) { - return gson.fromJson(reader, TwillSpecification.class); - } - - public TwillSpecification fromJson(File file) throws IOException { - try (Reader reader = Files.newReader(file, Charsets.UTF_8)) { - return fromJson(reader); - } - } - - // This is to get around gson ignoring of inner class - private static final class TwillSpecificationTypeAdapterFactory implements TypeAdapterFactory { - - @SuppressWarnings("unchecked") - @Override - public <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) { - Class<?> rawType = type.getRawType(); - if (!Map.class.isAssignableFrom(rawType)) { - return null; - } - Type[] typeArgs = ((ParameterizedType) type.getType()).getActualTypeArguments(); - TypeToken<?> keyType = TypeToken.get(typeArgs[0]); - TypeToken<?> valueType = TypeToken.get(typeArgs[1]); - if (keyType.getRawType() != String.class) { - return null; - } - return (TypeAdapter<T>) mapAdapter(gson, valueType); - } - - private <V> TypeAdapter<Map<String, V>> mapAdapter(Gson gson, TypeToken<V> valueType) { - final TypeAdapter<V> valueAdapter = gson.getAdapter(valueType); - - return new TypeAdapter<Map<String, V>>() { - @Override - public void write(JsonWriter writer, Map<String, V> map) throws IOException { - if (map == null) { - writer.nullValue(); - return; - } - writer.beginObject(); - for (Map.Entry<String, V> entry : map.entrySet()) { - writer.name(entry.getKey()); - valueAdapter.write(writer, entry.getValue()); - } - writer.endObject(); - } - - @Override - public Map<String, V> read(JsonReader reader) throws IOException { - if (reader.peek() == JsonToken.NULL) { - reader.nextNull(); - return null; - } - if (reader.peek() != JsonToken.BEGIN_OBJECT) { - return null; - } - Map<String, V> map = Maps.newHashMap(); - reader.beginObject(); - while (reader.peek() != JsonToken.END_OBJECT) { - map.put(reader.nextName(), valueAdapter.read(reader)); - } - reader.endObject(); - return map; - } - }; - } - } -} http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java index a6d9132..d6bbbff 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java @@ -129,11 +129,10 @@ public abstract class ServiceMain { protected abstract String getRunnableName(); /** - * Returns the {@link Location} for the application based on the env {@link EnvKeys#TWILL_APP_DIR}. + * Returns the {@link Location} for the application based on the app directory. */ - protected static Location createAppLocation(final Configuration conf) { + protected static Location createAppLocation(final Configuration conf, String fsUser, final URI appDir) { // Note: It's a little bit hacky based on the uri schema to create the LocationFactory, refactor it later. - final URI appDir = URI.create(System.getenv(EnvKeys.TWILL_APP_DIR)); try { if ("file".equals(appDir.getScheme())) { @@ -145,10 +144,6 @@ public abstract class ServiceMain { if (UserGroupInformation.isSecurityEnabled()) { ugi = UserGroupInformation.getCurrentUser(); } else { - String fsUser = System.getenv(EnvKeys.TWILL_FS_USER); - if (fsUser == null) { - throw new IllegalStateException("Missing environment variable " + EnvKeys.TWILL_FS_USER); - } ugi = UserGroupInformation.createRemoteUser(fsUser); } return ugi.doAs(new PrivilegedExceptionAction<Location>() { http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java index e708fb8..2ec1a14 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterMain.java @@ -25,8 +25,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.twill.api.RunId; import org.apache.twill.internal.Constants; import org.apache.twill.internal.EnvKeys; -import org.apache.twill.internal.RunIds; import org.apache.twill.internal.ServiceMain; +import org.apache.twill.internal.TwillRuntimeSpecification; +import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter; import org.apache.twill.internal.kafka.EmbeddedKafkaServer; import org.apache.twill.internal.logging.Loggings; import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory; @@ -64,17 +65,19 @@ public final class ApplicationMasterMain extends ServiceMain { * Starts the application master. */ public static void main(String[] args) throws Exception { - String zkConnect = System.getenv(EnvKeys.TWILL_ZK_CONNECT); File twillSpec = new File(Constants.Files.TWILL_SPEC); - RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID)); + TwillRuntimeSpecification twillRuntimeSpec = TwillRuntimeSpecificationAdapter.create().fromJson(twillSpec); + String zkConnect = twillRuntimeSpec.getZkConnectStr(); + RunId runId = twillRuntimeSpec.getTwillRunId(); - ZKClientService zkClientService = createZKClient(zkConnect, System.getenv(EnvKeys.TWILL_APP_NAME)); + ZKClientService zkClientService = createZKClient(zkConnect, twillRuntimeSpec.getTwillAppName()); Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration())); - setRMSchedulerAddress(conf); + setRMSchedulerAddress(conf, twillRuntimeSpec.getRmSchedulerAddr()); final YarnAMClient amClient = new VersionDetectYarnAMClientFactory(conf).create(); - ApplicationMasterService service = new ApplicationMasterService(runId, zkClientService, - twillSpec, amClient, createAppLocation(conf)); + ApplicationMasterService service = + new ApplicationMasterService(runId, zkClientService, twillRuntimeSpec, amClient, createAppLocation( + conf, twillRuntimeSpec.getFsUser(), twillRuntimeSpec.getTwillAppDir())); TrackerService trackerService = new TrackerService(service); new ApplicationMasterMain(service.getKafkaZKConnect()) @@ -90,8 +93,7 @@ public final class ApplicationMasterMain extends ServiceMain { /** * Optionally sets the RM scheduler address based on the environment variable if it is not set in the cluster config. */ - private static void setRMSchedulerAddress(Configuration conf) { - String schedulerAddress = System.getenv(EnvKeys.YARN_RM_SCHEDULER_ADDRESS); + private static void setRMSchedulerAddress(Configuration conf, String schedulerAddress) { if (schedulerAddress == null) { return; } @@ -123,7 +125,6 @@ public final class ApplicationMasterMain extends ServiceMain { return System.getenv(EnvKeys.TWILL_RUNNABLE_NAME); } - /** * A service wrapper for starting/stopping {@link EmbeddedKafkaServer} and make sure the ZK path for * Kafka exists before starting the Kafka server. http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java index f46adb2..063bed2 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java @@ -67,9 +67,10 @@ import org.apache.twill.internal.EnvKeys; import org.apache.twill.internal.JvmOptions; import org.apache.twill.internal.ProcessLauncher; import org.apache.twill.internal.TwillContainerLauncher; +import org.apache.twill.internal.TwillRuntimeSpecification; import org.apache.twill.internal.json.JvmOptionsCodec; import org.apache.twill.internal.json.LocalFileCodec; -import org.apache.twill.internal.json.TwillSpecificationAdapter; +import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter; import org.apache.twill.internal.state.Message; import org.apache.twill.internal.utils.Instances; import org.apache.twill.internal.yarn.AbstractYarnTwillService; @@ -126,23 +127,25 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp private final Location applicationLocation; private final PlacementPolicyManager placementPolicyManager; private final Map<String, Map<String, String>> environments; + private final TwillRuntimeSpecification twillRuntimeSpec; private volatile boolean stopped; private Queue<RunnableContainerRequest> runnableContainerRequests; private ExecutorService instanceChangeExecutor; - public ApplicationMasterService(RunId runId, ZKClient zkClient, File twillSpecFile, + public ApplicationMasterService(RunId runId, ZKClient zkClient, TwillRuntimeSpecification twillRuntimeSpec, YarnAMClient amClient, Location applicationLocation) throws Exception { super(zkClient, runId, applicationLocation); this.runId = runId; - this.twillSpec = TwillSpecificationAdapter.create().fromJson(twillSpecFile); + this.twillRuntimeSpec = twillRuntimeSpec; this.zkClient = zkClient; this.applicationLocation = applicationLocation; this.amClient = amClient; this.credentials = createCredentials(); this.jvmOpts = loadJvmOptions(); - this.reservedMemory = getReservedMemory(); + this.reservedMemory = twillRuntimeSpec.getReservedMemory(); + this.twillSpec = twillRuntimeSpec.getTwillSpecification(); this.placementPolicyManager = new PlacementPolicyManager(twillSpec.getPlacementPolicies()); this.environments = getEnvironments(); @@ -168,18 +171,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp }); } - private int getReservedMemory() { - String value = System.getenv(EnvKeys.TWILL_RESERVED_MEMORY_MB); - if (value == null) { - return Configs.Defaults.JAVA_RESERVED_MEMORY_MB; - } - try { - return Integer.parseInt(value); - } catch (Exception e) { - return Configs.Defaults.JAVA_RESERVED_MEMORY_MB; - } - } - @SuppressWarnings("unchecked") @Nullable private EventHandler createEventHandler(TwillSpecification twillSpec) { @@ -227,7 +218,8 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp @Override protected void doStart() throws Exception { - LOG.info("Start application master with spec: " + TwillSpecificationAdapter.create().toJson(twillSpec)); + LOG.info("Start application master with spec: " + + TwillRuntimeSpecificationAdapter.create().toJson(twillRuntimeSpec)); // initialize the event handler, if it fails, it will fail the application. if (eventHandler != null) { @@ -246,7 +238,8 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp protected void doStop() throws Exception { Thread.interrupted(); // This is just to clear the interrupt flag - LOG.info("Stop application master with spec: {}", TwillSpecificationAdapter.create().toJson(twillSpec)); + LOG.info("Stop application master with spec: {}", + TwillRuntimeSpecificationAdapter.create().toJson(twillRuntimeSpec)); if (eventHandler != null) { try { @@ -656,7 +649,7 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp String runnableName = provisionRequest.getRuntimeSpec().getName(); LOG.info("Starting runnable {} with {}", runnableName, processLauncher); - LOG.debug("Log level for Twill runnable {} is {}", runnableName, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL)); + LOG.debug("Log level for Twill runnable {} is {}", runnableName, twillRuntimeSpec.getLogLevel()); int containerCount = expectedContainers.getExpected(runnableName); @@ -666,12 +659,6 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp env.putAll(environments.get(runnableName)); } // Override with system env - env.put(EnvKeys.TWILL_APP_DIR, System.getenv(EnvKeys.TWILL_APP_DIR)); - env.put(EnvKeys.TWILL_FS_USER, System.getenv(EnvKeys.TWILL_FS_USER)); - env.put(EnvKeys.TWILL_APP_RUN_ID, runId.getId()); - env.put(EnvKeys.TWILL_APP_NAME, twillSpec.getName()); - env.put(EnvKeys.TWILL_APP_LOG_LEVEL, System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL)); - env.put(EnvKeys.TWILL_ZK_CONNECT, System.getenv(EnvKeys.TWILL_ZK_CONNECT)); env.put(EnvKeys.TWILL_LOG_KAFKA_ZK, getKafkaZKConnect()); ProcessLauncher.PrepareLaunchContext launchContext = processLauncher.prepareLaunch(env, getLocalizeFiles(), @@ -682,7 +669,8 @@ public final class ApplicationMasterService extends AbstractYarnTwillService imp ZKClients.namespace(zkClient, getZKNamespace(runnableName)), containerCount, jvmOpts, reservedMemory, getSecureStoreLocation()); - runningContainers.start(runnableName, processLauncher.getContainerInfo(), launcher); + runningContainers.start(runnableName, processLauncher.getContainerInfo(), launcher, + twillRuntimeSpec.getLogLevel()); // Need to call complete to workaround bug in YARN AMRMClient if (provisionRequest.containerAcquired()) { http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java index 3d1b0f3..2d49d02 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java @@ -126,7 +126,8 @@ final class RunningContainers { /** * Start a container for a runnable. */ - void start(String runnableName, ContainerInfo containerInfo, TwillContainerLauncher launcher) { + void start(String runnableName, ContainerInfo containerInfo, TwillContainerLauncher launcher, + LogEntry.Level logLevel) { containerLock.lock(); try { int instanceId = getStartInstanceId(runnableName); @@ -140,7 +141,7 @@ final class RunningContainers { containerInfo.getMemoryMB(), containerInfo.getHost().getHostName(), controller, - System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL)); + logLevel); resourceReport.addRunResources(runnableName, resources); containerStats.put(runnableName, containerInfo); @@ -584,9 +585,8 @@ final class RunningContainers { private DynamicTwillRunResources(int instanceId, String containerId, int cores, int memoryMB, String host, - TwillContainerController controller, String logLevel) { - super(instanceId, containerId, cores, memoryMB, host, null, - (logLevel != null) ? LogEntry.Level.valueOf(logLevel) : null); + TwillContainerController controller, LogEntry.Level logLevel) { + super(instanceId, containerId, cores, memoryMB, host, null, logLevel); this.controller = controller; } http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java index efccd89..451db69 100644 --- a/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java +++ b/twill-yarn/src/main/java/org/apache/twill/internal/container/TwillContainerMain.java @@ -18,7 +18,6 @@ package org.apache.twill.internal.container; import com.google.common.base.Charsets; -import com.google.common.base.Strings; import com.google.common.io.Files; import com.google.common.util.concurrent.AbstractIdleService; import com.google.common.util.concurrent.AbstractService; @@ -30,7 +29,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.twill.api.RunId; import org.apache.twill.api.TwillRunnableSpecification; -import org.apache.twill.api.TwillSpecification; +import org.apache.twill.api.logging.LogEntry; import org.apache.twill.discovery.ZKDiscoveryService; import org.apache.twill.internal.Arguments; import org.apache.twill.internal.BasicTwillContext; @@ -40,8 +39,9 @@ import org.apache.twill.internal.EnvContainerInfo; import org.apache.twill.internal.EnvKeys; import org.apache.twill.internal.RunIds; import org.apache.twill.internal.ServiceMain; +import org.apache.twill.internal.TwillRuntimeSpecification; import org.apache.twill.internal.json.ArgumentsCodec; -import org.apache.twill.internal.json.TwillSpecificationAdapter; +import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter; import org.apache.twill.internal.logging.Loggings; import org.apache.twill.zookeeper.ZKClient; import org.apache.twill.zookeeper.ZKClientService; @@ -61,6 +61,7 @@ import java.io.Reader; public final class TwillContainerMain extends ServiceMain { private static final Logger LOG = LoggerFactory.getLogger(TwillContainerMain.class); + private static LogEntry.Level logLevel; /** * Main method for launching a {@link TwillContainerService} which runs @@ -70,22 +71,23 @@ public final class TwillContainerMain extends ServiceMain { // Try to load the secure store from localized file, which AM requested RM to localize it for this container. loadSecureStore(); - String zkConnectStr = System.getenv(EnvKeys.TWILL_ZK_CONNECT); File twillSpecFile = new File(Constants.Files.TWILL_SPEC); - RunId appRunId = RunIds.fromString(System.getenv(EnvKeys.TWILL_APP_RUN_ID)); + TwillRuntimeSpecification twillRuntimeSpec = loadTwillSpec(twillSpecFile); + String zkConnectStr = twillRuntimeSpec.getZkConnectStr(); + RunId appRunId = twillRuntimeSpec.getTwillRunId(); RunId runId = RunIds.fromString(System.getenv(EnvKeys.TWILL_RUN_ID)); String runnableName = System.getenv(EnvKeys.TWILL_RUNNABLE_NAME); int instanceId = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_ID)); int instanceCount = Integer.parseInt(System.getenv(EnvKeys.TWILL_INSTANCE_COUNT)); + logLevel = twillRuntimeSpec.getLogLevel(); - ZKClientService zkClientService = createZKClient(zkConnectStr, System.getenv(EnvKeys.TWILL_APP_NAME)); + ZKClientService zkClientService = createZKClient(zkConnectStr, twillRuntimeSpec.getTwillAppName()); ZKDiscoveryService discoveryService = new ZKDiscoveryService(zkClientService); ZKClient appRunZkClient = getAppRunZKClient(zkClientService, appRunId); - - TwillSpecification twillSpec = loadTwillSpec(twillSpecFile); - TwillRunnableSpecification runnableSpec = twillSpec.getRunnables().get(runnableName).getRunnableSpecification(); + TwillRunnableSpecification runnableSpec = + twillRuntimeSpec.getTwillSpecification().getRunnables().get(runnableName).getRunnableSpecification(); ContainerInfo containerInfo = new EnvContainerInfo(); Arguments arguments = decodeArgs(); BasicTwillContext context = new BasicTwillContext( @@ -100,7 +102,8 @@ public final class TwillContainerMain extends ServiceMain { Configuration conf = new YarnConfiguration(new HdfsConfiguration(new Configuration())); Service service = new TwillContainerService(context, containerInfo, containerZKClient, runId, runnableSpec, getClassLoader(), - createAppLocation(conf)); + createAppLocation(conf, twillRuntimeSpec.getFsUser(), + twillRuntimeSpec.getTwillAppDir())); new TwillContainerMain().doMain( service, zkClientService, @@ -112,9 +115,7 @@ public final class TwillContainerMain extends ServiceMain { @Override protected String getLoggerLevel(Logger logger) { - String appLogLevel = System.getenv(EnvKeys.TWILL_APP_LOG_LEVEL); - - return Strings.isNullOrEmpty(appLogLevel) ? super.getLoggerLevel(logger) : appLogLevel; + return logLevel == null ? super.getLoggerLevel(logger) : logLevel.name(); } private static void loadSecureStore() throws IOException { @@ -156,9 +157,9 @@ public final class TwillContainerMain extends ServiceMain { return classLoader; } - private static TwillSpecification loadTwillSpec(File specFile) throws IOException { + private static TwillRuntimeSpecification loadTwillSpec(File specFile) throws IOException { try (Reader reader = Files.newReader(specFile, Charsets.UTF_8)) { - return TwillSpecificationAdapter.create().fromJson(reader); + return TwillRuntimeSpecificationAdapter.create().fromJson(reader); } } http://git-wip-us.apache.org/repos/asf/twill/blob/6c7d32aa/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java ---------------------------------------------------------------------- diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java index f7cb388..811aa04 100644 --- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java +++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java @@ -68,13 +68,14 @@ import org.apache.twill.internal.LogOnlyEventHandler; import org.apache.twill.internal.ProcessController; import org.apache.twill.internal.ProcessLauncher; import org.apache.twill.internal.RunIds; +import org.apache.twill.internal.TwillRuntimeSpecification; import org.apache.twill.internal.appmaster.ApplicationMasterInfo; import org.apache.twill.internal.appmaster.ApplicationMasterMain; import org.apache.twill.internal.container.TwillContainerMain; import org.apache.twill.internal.json.ArgumentsCodec; import org.apache.twill.internal.json.JvmOptionsCodec; import org.apache.twill.internal.json.LocalFileCodec; -import org.apache.twill.internal.json.TwillSpecificationAdapter; +import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter; import org.apache.twill.internal.utils.Dependencies; import org.apache.twill.internal.utils.Paths; import org.apache.twill.internal.utils.Resources; @@ -337,26 +338,17 @@ final class YarnTwillPreparer implements TwillPreparer { // appMaster.jar // org.apache.twill.internal.appmaster.ApplicationMasterMain // false - ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder() - .put(EnvKeys.TWILL_FS_USER, fsUser) - .put(EnvKeys.TWILL_APP_DIR, getAppLocation().toURI().toASCIIString()) - .put(EnvKeys.TWILL_ZK_CONNECT, zkConnectString) - .put(EnvKeys.TWILL_RUN_ID, runId.getId()) - .put(EnvKeys.TWILL_RESERVED_MEMORY_MB, Integer.toString(reservedMemory)) - .put(EnvKeys.TWILL_APP_NAME, twillSpec.getName()) - .put(EnvKeys.YARN_RM_SCHEDULER_ADDRESS, yarnConfig.get(YarnConfiguration.RM_SCHEDULER_ADDRESS)); LOG.debug("Log level is set to {} for the Twill application.", logLevel); - builder.put(EnvKeys.TWILL_APP_LOG_LEVEL, logLevel.toString()); int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(), Constants.APP_MASTER_RESERVED_MEMORY_MB, Constants.HEAP_MIN_RATIO); - return launcher.prepareLaunch(builder.build(), localFiles.values(), credentials) + return launcher.prepareLaunch(ImmutableMap.<String, String>of(), localFiles.values(), credentials) .addCommand( "$JAVA_HOME/bin/java", "-Djava.io.tmpdir=tmp", "-Dyarn.appId=$" + EnvKeys.YARN_APP_ID_STR, - "-Dtwill.app=$" + EnvKeys.TWILL_APP_NAME, + "-Dtwill.app=$" + Constants.TWILL_APP_NAME, "-cp", Constants.Files.LAUNCHER_JAR + ":$HADOOP_CONF_DIR", "-Xmx" + memory + "m", extraOptions == null ? "" : extraOptions, @@ -523,10 +515,13 @@ final class YarnTwillPreparer implements TwillPreparer { if (eventHandler == null) { eventHandler = new LogOnlyEventHandler().configure(); } - - TwillSpecificationAdapter.create().toJson( - new DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(), spec.getPlacementPolicies(), - eventHandler), writer); + TwillSpecification newTwillSpec = new DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(), + spec.getPlacementPolicies(), eventHandler); + TwillRuntimeSpecificationAdapter.create().toJson( + new TwillRuntimeSpecification(newTwillSpec, locationFactory.getHomeLocation().getName(), + getAppLocation().toURI(), zkConnectString, runId, twillSpec.getName(), + reservedMemory, yarnConfig.get(YarnConfiguration.RM_SCHEDULER_ADDRESS), + logLevel), writer); } LOG.debug("Done {}", Constants.Files.TWILL_SPEC);