APEXCORE-283 #comment added storage agent interface and stram client changes to retrieve application attributes
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/02c43eea Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/02c43eea Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/02c43eea Branch: refs/heads/master Commit: 02c43eea28d27fd71fe3f551dfaceb0c3c931db8 Parents: 0d5bfa5 Author: Ashish Tadose <[email protected]> Authored: Fri Dec 18 01:23:46 2015 +0530 Committer: Ashish Tadose <[email protected]> Committed: Fri Dec 18 02:25:12 2015 +0530 ---------------------------------------------------------------------- .../java/com/datatorrent/api/StorageAgent.java | 18 ++++++++++ .../java/com/datatorrent/stram/StramClient.java | 38 ++++++++++++++++---- 2 files changed, 50 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/02c43eea/api/src/main/java/com/datatorrent/api/StorageAgent.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/StorageAgent.java b/api/src/main/java/com/datatorrent/api/StorageAgent.java index e155ed3..b5dcf39 100644 --- a/api/src/main/java/com/datatorrent/api/StorageAgent.java +++ b/api/src/main/java/com/datatorrent/api/StorageAgent.java @@ -20,6 +20,8 @@ package com.datatorrent.api; import java.io.IOException; +import com.datatorrent.api.Attribute.AttributeMap; + /** * Interface to define writing/reading checkpoint state of any operator. * @@ -77,4 +79,20 @@ public interface StorageAgent */ public long[] getWindowIds(int operatorId) throws IOException; + /** + * Interface to pass application attributes to storage agent + * + * + */ + public interface ApplicationAwareStorageAgent extends StorageAgent + { + + /** + * Passes attributes of application to storage agent + * + * @param map attributes of application + */ + public void setApplicationAttributes(AttributeMap map); + } + } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/02c43eea/engine/src/main/java/com/datatorrent/stram/StramClient.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java index 9a570e0..046a56c 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramClient.java +++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java @@ -24,10 +24,12 @@ import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; -import java.util.*; - -import com.google.common.base.Objects; -import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,14 +40,29 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.JarFinder; import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -54,7 +71,11 @@ import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.log4j.DTLoggerFactory; +import com.google.common.base.Objects; +import com.google.common.collect.Lists; + import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.StorageAgent; import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.BasicContainerOptConfigurator; import com.datatorrent.stram.client.StramClientUtils; @@ -455,6 +476,11 @@ public class StramClient } dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, appPath.toString()); + StorageAgent agent = dag.getAttributes().get(OperatorContext.STORAGE_AGENT); + if (agent != null && agent instanceof StorageAgent.ApplicationAwareStorageAgent) { + ((StorageAgent.ApplicationAwareStorageAgent)agent).setApplicationAttributes(dag.getAttributes()); + } + if (dag.getAttributes().get(OperatorContext.STORAGE_AGENT) == null) { /* which would be the most likely case */ Path checkpointPath = new Path(appPath, LogicalPlan.SUBDIR_CHECKPOINTS); // use conf client side to pickup any proxy settings from dt-site.xml
