Repository: apex-core
Updated Branches:
  refs/heads/master f9c1701a2 -> 12706ca43


APEXCORE-712 custom keystore at launch. This closes #526 and uses some code 
contributed by @devtagare


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/12706ca4
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/12706ca4
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/12706ca4

Branch: refs/heads/master
Commit: 12706ca43b00e272ba8cb13bc5d6648e0124187d
Parents: f9c1701
Author: Sanjay Pujare <[email protected]>
Authored: Sun Jun 25 16:09:21 2017 -0700
Committer: Sanjay Pujare <[email protected]>
Committed: Sun Jun 25 18:38:04 2017 -0700

----------------------------------------------------------------------
 .../main/java/com/datatorrent/api/Context.java  | 85 +++++++++++++++++++-
 .../java/com/datatorrent/stram/StramClient.java | 33 ++++++++
 .../stram/StreamingAppMasterService.java        | 39 ++++++++-
 3 files changed, 149 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/12706ca4/api/src/main/java/com/datatorrent/api/Context.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Context.java 
b/api/src/main/java/com/datatorrent/api/Context.java
index ff1a2d4..9fe0c46 100644
--- a/api/src/main/java/com/datatorrent/api/Context.java
+++ b/api/src/main/java/com/datatorrent/api/Context.java
@@ -23,6 +23,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+
 import com.datatorrent.api.Attribute.AttributeMap;
 import com.datatorrent.api.Operator.ProcessingMode;
 import com.datatorrent.api.StringCodec.Class2String;
@@ -536,15 +538,90 @@ public interface Context
     Attribute<String> LIBRARY_JARS = new 
Attribute<>(String2String.getInstance());
 
     /**
-     * This configuration file can be used to over-ride the default Yarn 
configuration.
-     * For example, this can be used to provide custom SSL parameters in the 
configuration.
-     * Note that this file needs to be present on the node.
+     * SSL configuration string property. This is used to specify SSL 
parameters for
+     * the Stram's Web services.
      */
-    Attribute<String> STRAM_HTTP_CUSTOM_CONFIG = new 
Attribute<>(String2String.getInstance());
+    Attribute<SSLConfig> SSL_CONFIG = new 
Attribute<>(JsonStringCodec.getInstance(SSLConfig.class));
 
     @SuppressWarnings(value = "FieldNameHidesFieldInSuperclass")
     long serialVersionUID = 
AttributeMap.AttributeInitializer.initialize(DAGContext.class);
   }
 
+  @Evolving
+  /**
+   * Wrapper class to store SSL configuration parameters specified using the 
SSL-CONFIG attribute.
+   */
+  public static class SSLConfig implements Serializable
+  {
+
+    private static final long serialVersionUID = -3491488868092056793L;
+
+    /**
+     * Full path of the SSL keystore file on the client machine
+     */
+    private String keyStorePath;
+
+    /**
+     * Password for the keystore file
+     */
+    private String keyStorePassword;
+
+    /**
+     * Key password for the key in the keystore file
+     */
+    private String keyStoreKeyPassword;
+
+    /**
+     * Full path of SSL configuration file on the target node if all SSL files 
are already present there
+     */
+    private String configPath;
+
+    public String getKeyStorePath()
+    {
+      return keyStorePath;
+    }
+
+    public void setKeyStorePath(String keyStorePath)
+    {
+      this.keyStorePath = keyStorePath;
+    }
+
+    public String getKeyStorePassword()
+    {
+      return keyStorePassword;
+    }
+
+    public void setKeyStorePassword(String keyStorePassword)
+    {
+      this.keyStorePassword = keyStorePassword;
+    }
+
+    public String getKeyStoreKeyPassword()
+    {
+      return keyStoreKeyPassword;
+    }
+
+    public void setKeyStoreKeyPassword(String keyStoreKeyPassword)
+    {
+      this.keyStoreKeyPassword = keyStoreKeyPassword;
+    }
+
+    public String getConfigPath()
+    {
+      return configPath;
+    }
+
+    public void setConfigPath(String nodeLocalPath)
+    {
+      this.configPath = nodeLocalPath;
+    }
+
+    @Override
+    public String toString()
+    {
+      return "SSLConfig [keyStorePath=" + keyStorePath + ", configPath=" + 
configPath + "]";
+    }
+  }
+
   long serialVersionUID = 
AttributeMap.AttributeInitializer.initialize(Context.class);
 }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/12706ca4/engine/src/main/java/com/datatorrent/stram/StramClient.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java 
b/engine/src/main/java/com/datatorrent/stram/StramClient.java
index c5017eb..96f9daa 100644
--- a/engine/src/main/java/com/datatorrent/stram/StramClient.java
+++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java
@@ -79,6 +79,7 @@ import com.google.common.collect.Lists;
 
 import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Context.SSLConfig;
 import com.datatorrent.api.StorageAgent;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.BasicContainerOptConfigurator;
@@ -454,6 +455,7 @@ public class StramClient
         appPath = new Path(configuredAppPath);
       }
       String libJarsCsv = copyFromLocal(fs, appPath, localJarFiles.toArray(new 
String[]{}));
+      setupSSLResources(dag.getValue(Context.DAGContext.SSL_CONFIG), fs, 
appPath, localResources);
 
       LOG.info("libjars: {}", libJarsCsv);
       dag.getAttributes().put(Context.DAGContext.LIBRARY_JARS, libJarsCsv);
@@ -644,6 +646,37 @@ public class StramClient
     }
   }
 
+  /**
+   * Process SSLConfig object to set up SSL resources
+   *
+   * @param sslConfig  SSLConfig object derived from SSL_CONFIG attribute
+   * @param fs    HDFS file system object
+   * @param appPath    application path for the current application
+   * @param localResources  Local resources to modify
+   * @throws IOException
+   */
+  private void setupSSLResources(SSLConfig sslConfig, FileSystem fs, Path 
appPath, Map<String, LocalResource> localResources) throws IOException
+  {
+    if (sslConfig != null) {
+      String nodeLocalConfig = sslConfig.getConfigPath();
+
+      if (StringUtils.isNotEmpty(nodeLocalConfig)) {
+        // all others should be empty
+        if (StringUtils.isNotEmpty(sslConfig.getKeyStorePath()) || 
StringUtils.isNotEmpty(sslConfig.getKeyStorePassword())
+            || StringUtils.isNotEmpty(sslConfig.getKeyStoreKeyPassword())) {
+          throw new IllegalArgumentException("Cannot specify both 
nodeLocalConfigPath and other parameters in " + sslConfig);
+        }
+        // pass thru: Stram will implement reading the node local SSL config 
file
+      } else {
+        // need to package and copy the keyStore file
+        String keystorePath = sslConfig.getKeyStorePath();
+        String[] sslFileArray = {keystorePath};
+        String sslFileNames = copyFromLocal(fs, appPath, sslFileArray);
+        
LaunchContainerRunnable.addFilesToLocalResources(LocalResourceType.FILE, 
sslFileNames, localResources, fs);
+      }
+    }
+  }
+
   public ApplicationReport getApplicationReport() throws YarnException, 
IOException
   {
     return yarnClient.getApplicationReport(this.appId);

http://git-wip-us.apache.org/repos/asf/apex-core/blob/12706ca4/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index b7cbc29..ed9248a 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -85,13 +85,16 @@ import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.Context.SSLConfig;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StringCodec;
 import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest;
@@ -130,6 +133,11 @@ public class StreamingAppMasterService extends 
CompositeService
   private static final long DELEGATION_TOKEN_RENEW_INTERVAL = Long.MAX_VALUE / 
2;
   private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = 24 * 60 * 
60 * 1000;
   private static final int UPDATE_NODE_REPORTS_INTERVAL = 10 * 60 * 1000;
+  /**
+   * Config property name for the SSL keystore location used by Hadoop webapps.
+   * This should be replaced when a constant is defined there
+   */
+  private static final String SSL_SERVER_KEYSTORE_LOCATION = 
"ssl.server.keystore.location";
   private AMRMClient<ContainerRequest> amRmClient;
   private NMClientAsync nmClient;
   private LogicalPlan dag;
@@ -632,10 +640,8 @@ public class StreamingAppMasterService extends 
CompositeService
         config = new Configuration(config);
         config.set("hadoop.http.filter.initializers", 
StramWSFilterInitializer.class.getCanonicalName());
       }
-      String customSSLConfig = 
dag.getValue(LogicalPlan.STRAM_HTTP_CUSTOM_CONFIG);
-      if (StringUtils.isNotEmpty(customSSLConfig)) {
-        config.addResource(new Path(customSSLConfig));
-      }
+      // update config with appropriate SSL params if passed via the dag 
attribute SSL_CONFIG
+      addSSLConfigResource(config);
       WebApp webApp = WebApps.$for("stram", StramAppContext.class, appContext, 
"ws").with(config).start(new StramWebApp(this.dnmgr));
       LOG.info("Started web service at port: " + webApp.port());
       appMasterTrackingUrl = 
NetUtils.getConnectAddress(webApp.getListenerAddress()).getHostName() + ":" + 
webApp.port();
@@ -649,6 +655,31 @@ public class StreamingAppMasterService extends 
CompositeService
     }
   }
 
+  /**
+   * Modify config object by adding SSL related parameters into a resource for 
WebApp's use
+   *
+   * @param config  Configuration to be modified
+   */
+  private void addSSLConfigResource(Configuration config)
+  {
+    SSLConfig sslConfig = dag.getValue(Context.DAGContext.SSL_CONFIG);
+    if (sslConfig != null) {
+      String nodeLocalConfig = sslConfig.getConfigPath();
+      if (StringUtils.isNotEmpty(nodeLocalConfig)) {
+        config.addResource(new Path(nodeLocalConfig));
+      } else {
+        // create a configuration object and add it as a resource
+        Configuration sslConfigResource = new Configuration(false);
+
+        final String SSL_CONFIG_LONG_NAME = 
Context.DAGContext.SSL_CONFIG.getLongName();
+        sslConfigResource.set(SSL_SERVER_KEYSTORE_LOCATION, new 
Path(sslConfig.getKeyStorePath()).getName(), SSL_CONFIG_LONG_NAME);
+        sslConfigResource.set(WebAppUtils.WEB_APP_KEYSTORE_PASSWORD_KEY, 
sslConfig.getKeyStorePassword(), SSL_CONFIG_LONG_NAME);
+        sslConfigResource.set(WebAppUtils.WEB_APP_KEY_PASSWORD_KEY, 
sslConfig.getKeyStoreKeyPassword(), SSL_CONFIG_LONG_NAME);
+        config.addResource(sslConfigResource);
+      }
+    }
+  }
+
   @Override
   protected void serviceStop() throws Exception
   {

Reply via email to