Repository: apex-core Updated Branches: refs/heads/master f9c1701a2 -> 12706ca43
APEXCORE-712 custom keystore at launch. This closes #526 and uses some code contributed by @devtagare Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/12706ca4 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/12706ca4 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/12706ca4 Branch: refs/heads/master Commit: 12706ca43b00e272ba8cb13bc5d6648e0124187d Parents: f9c1701 Author: Sanjay Pujare <[email protected]> Authored: Sun Jun 25 16:09:21 2017 -0700 Committer: Sanjay Pujare <[email protected]> Committed: Sun Jun 25 18:38:04 2017 -0700 ---------------------------------------------------------------------- .../main/java/com/datatorrent/api/Context.java | 85 +++++++++++++++++++- .../java/com/datatorrent/stram/StramClient.java | 33 ++++++++ .../stram/StreamingAppMasterService.java | 39 ++++++++- 3 files changed, 149 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/12706ca4/api/src/main/java/com/datatorrent/api/Context.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index ff1a2d4..9fe0c46 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -23,6 +23,8 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + import com.datatorrent.api.Attribute.AttributeMap; import com.datatorrent.api.Operator.ProcessingMode; import com.datatorrent.api.StringCodec.Class2String; @@ -536,15 +538,90 @@ public interface Context Attribute<String> LIBRARY_JARS = new Attribute<>(String2String.getInstance()); /** - * This configuration file can be used to over-ride the default Yarn configuration. - * For example, this can be used to provide custom SSL parameters in the configuration. - * Note that this file needs to be present on the node. + * SSL configuration string property. This is used to specify SSL parameters for + * the Stram's Web services. */ - Attribute<String> STRAM_HTTP_CUSTOM_CONFIG = new Attribute<>(String2String.getInstance()); + Attribute<SSLConfig> SSL_CONFIG = new Attribute<>(JsonStringCodec.getInstance(SSLConfig.class)); @SuppressWarnings(value = "FieldNameHidesFieldInSuperclass") long serialVersionUID = AttributeMap.AttributeInitializer.initialize(DAGContext.class); } + @Evolving + /** + * Wrapper class to store SSL configuration parameters specified using the SSL-CONFIG attribute. + */ + public static class SSLConfig implements Serializable + { + + private static final long serialVersionUID = -3491488868092056793L; + + /** + * Full path of the SSL keystore file on the client machine + */ + private String keyStorePath; + + /** + * Password for the keystore file + */ + private String keyStorePassword; + + /** + * Key password for the key in the keystore file + */ + private String keyStoreKeyPassword; + + /** + * Full path of SSL configuration file on the target node if all SSL files are already present there + */ + private String configPath; + + public String getKeyStorePath() + { + return keyStorePath; + } + + public void setKeyStorePath(String keyStorePath) + { + this.keyStorePath = keyStorePath; + } + + public String getKeyStorePassword() + { + return keyStorePassword; + } + + public void setKeyStorePassword(String keyStorePassword) + { + this.keyStorePassword = keyStorePassword; + } + + public String getKeyStoreKeyPassword() + { + return keyStoreKeyPassword; + } + + public void setKeyStoreKeyPassword(String keyStoreKeyPassword) + { + this.keyStoreKeyPassword = keyStoreKeyPassword; + } + + public String getConfigPath() + { + return configPath; + } + + public void setConfigPath(String nodeLocalPath) + { + this.configPath = nodeLocalPath; + } + + @Override + public String toString() + { + return "SSLConfig [keyStorePath=" + keyStorePath + ", configPath=" + configPath + "]"; + } + } + long serialVersionUID = AttributeMap.AttributeInitializer.initialize(Context.class); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/12706ca4/engine/src/main/java/com/datatorrent/stram/StramClient.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java index c5017eb..96f9daa 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramClient.java +++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java @@ -79,6 +79,7 @@ import com.google.common.collect.Lists; import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Context.SSLConfig; import com.datatorrent.api.StorageAgent; import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.BasicContainerOptConfigurator; @@ -454,6 +455,7 @@ public class StramClient appPath = new Path(configuredAppPath); } String libJarsCsv = copyFromLocal(fs, appPath, localJarFiles.toArray(new String[]{})); + setupSSLResources(dag.getValue(Context.DAGContext.SSL_CONFIG), fs, appPath, localResources); LOG.info("libjars: {}", libJarsCsv); dag.getAttributes().put(Context.DAGContext.LIBRARY_JARS, libJarsCsv); @@ -644,6 +646,37 @@ public class StramClient } } + /** + * Process SSLConfig object to set up SSL resources + * + * @param sslConfig SSLConfig object derived from SSL_CONFIG attribute + * @param fs HDFS file system object + * @param appPath application path for the current application + * @param localResources Local resources to modify + * @throws IOException + */ + private void setupSSLResources(SSLConfig sslConfig, FileSystem fs, Path appPath, Map<String, LocalResource> localResources) throws IOException + { + if (sslConfig != null) { + String nodeLocalConfig = sslConfig.getConfigPath(); + + if (StringUtils.isNotEmpty(nodeLocalConfig)) { + // all others should be empty + if (StringUtils.isNotEmpty(sslConfig.getKeyStorePath()) || StringUtils.isNotEmpty(sslConfig.getKeyStorePassword()) + || StringUtils.isNotEmpty(sslConfig.getKeyStoreKeyPassword())) { + throw new IllegalArgumentException("Cannot specify both nodeLocalConfigPath and other parameters in " + sslConfig); + } + // pass thru: Stram will implement reading the node local SSL config file + } else { + // need to package and copy the keyStore file + String keystorePath = sslConfig.getKeyStorePath(); + String[] sslFileArray = {keystorePath}; + String sslFileNames = copyFromLocal(fs, appPath, sslFileArray); + LaunchContainerRunnable.addFilesToLocalResources(LocalResourceType.FILE, sslFileNames, localResources, fs); + } + } + } + public ApplicationReport getApplicationReport() throws YarnException, IOException { return yarnClient.getApplicationReport(this.appId); http://git-wip-us.apache.org/repos/asf/apex-core/blob/12706ca4/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index b7cbc29..ed9248a 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -85,13 +85,16 @@ import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApps; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.datatorrent.api.Attribute; import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; import com.datatorrent.api.Context.DAGContext; +import com.datatorrent.api.Context.SSLConfig; import com.datatorrent.api.DAG; import com.datatorrent.api.StringCodec; import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest; @@ -130,6 +133,11 @@ public class StreamingAppMasterService extends CompositeService private static final long DELEGATION_TOKEN_RENEW_INTERVAL = Long.MAX_VALUE / 2; private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = 24 * 60 * 60 * 1000; private static final int UPDATE_NODE_REPORTS_INTERVAL = 10 * 60 * 1000; + /** + * Config property name for the SSL keystore location used by Hadoop webapps. + * This should be replaced when a constant is defined there + */ + private static final String SSL_SERVER_KEYSTORE_LOCATION = "ssl.server.keystore.location"; private AMRMClient<ContainerRequest> amRmClient; private NMClientAsync nmClient; private LogicalPlan dag; @@ -632,10 +640,8 @@ public class StreamingAppMasterService extends CompositeService config = new Configuration(config); config.set("hadoop.http.filter.initializers", StramWSFilterInitializer.class.getCanonicalName()); } - String customSSLConfig = dag.getValue(LogicalPlan.STRAM_HTTP_CUSTOM_CONFIG); - if (StringUtils.isNotEmpty(customSSLConfig)) { - config.addResource(new Path(customSSLConfig)); - } + // update config with appropriate SSL params if passed via the dag attribute SSL_CONFIG + addSSLConfigResource(config); WebApp webApp = WebApps.$for("stram", StramAppContext.class, appContext, "ws").with(config).start(new StramWebApp(this.dnmgr)); LOG.info("Started web service at port: " + webApp.port()); appMasterTrackingUrl = NetUtils.getConnectAddress(webApp.getListenerAddress()).getHostName() + ":" + webApp.port(); @@ -649,6 +655,31 @@ public class StreamingAppMasterService extends CompositeService } } + /** + * Modify config object by adding SSL related parameters into a resource for WebApp's use + * + * @param config Configuration to be modified + */ + private void addSSLConfigResource(Configuration config) + { + SSLConfig sslConfig = dag.getValue(Context.DAGContext.SSL_CONFIG); + if (sslConfig != null) { + String nodeLocalConfig = sslConfig.getConfigPath(); + if (StringUtils.isNotEmpty(nodeLocalConfig)) { + config.addResource(new Path(nodeLocalConfig)); + } else { + // create a configuration object and add it as a resource + Configuration sslConfigResource = new Configuration(false); + + final String SSL_CONFIG_LONG_NAME = Context.DAGContext.SSL_CONFIG.getLongName(); + sslConfigResource.set(SSL_SERVER_KEYSTORE_LOCATION, new Path(sslConfig.getKeyStorePath()).getName(), SSL_CONFIG_LONG_NAME); + sslConfigResource.set(WebAppUtils.WEB_APP_KEYSTORE_PASSWORD_KEY, sslConfig.getKeyStorePassword(), SSL_CONFIG_LONG_NAME); + sslConfigResource.set(WebAppUtils.WEB_APP_KEY_PASSWORD_KEY, sslConfig.getKeyStoreKeyPassword(), SSL_CONFIG_LONG_NAME); + config.addResource(sslConfigResource); + } + } + } + @Override protected void serviceStop() throws Exception {
