Repository: incubator-apex-core Updated Branches: refs/heads/master e39c63142 -> 854966439
Ability to configure stram web services auth Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/58e3a31d Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/58e3a31d Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/58e3a31d Branch: refs/heads/master Commit: 58e3a31d0d48ff19e44e5f545ef4842ac4c05f37 Parents: 881d000 Author: Pramod Immaneni <[email protected]> Authored: Fri Mar 18 19:19:25 2016 -0700 Committer: Pramod Immaneni <[email protected]> Committed: Tue Mar 22 16:00:55 2016 -0700 ---------------------------------------------------------------------- .../main/java/com/datatorrent/api/Context.java | 20 ++++++ .../stram/StreamingAppMasterService.java | 38 ++++++++-- .../stram/plan/logical/LogicalPlan.java | 51 +++++++++++-- .../datatorrent/stram/util/SecurityUtils.java | 76 ++++++++++++++++++++ .../stram/util/WebServicesClient.java | 19 +++-- .../stram/util/SecurityUtilsTest.java | 65 +++++++++++++++++ 6 files changed, 245 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/58e3a31d/api/src/main/java/com/datatorrent/api/Context.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/com/datatorrent/api/Context.java b/api/src/main/java/com/datatorrent/api/Context.java index 90d2108..d34d682 100644 --- a/api/src/main/java/com/datatorrent/api/Context.java +++ b/api/src/main/java/com/datatorrent/api/Context.java @@ -101,6 +101,20 @@ public interface Context String getJVMOptions(List<DAG.OperatorMeta> operatorMetaList); } + /** + * The streaming application master web service authentication enablement policy.<br/><br/> + * ENABLE - Enable authentication for web service access.<br/> + * FOLLOW_HADOOP_AUTH - Follow Hadoop authentication, if hadoop authentication is enabled, i.e., if it is set to something + * other than "simple", enable authentication for web services as well.<br/> + * FOLLOW_HADOOP_HTTP_AUTH - Follow Hadoop HTTP authentication, if hadoop authentication is enabled, i.e., if it is + * set to something other than "simple", enable authentication for web services as well.<br/> + * DISABLE - Disable authentication for web services. + */ + enum StramHTTPAuthentication + { + ENABLE, FOLLOW_HADOOP_AUTH, FOLLOW_HADOOP_HTTP_AUTH, DISABLE + } + public interface PortContext extends Context { /** @@ -471,6 +485,12 @@ public interface Context */ Attribute<ContainerOptConfigurator> CONTAINER_OPTS_CONFIGURATOR = new Attribute<ContainerOptConfigurator>(new Object2String<ContainerOptConfigurator>()); /** + * The policy for enabling stram web services authentication.<br/> + * See {@link StramHTTPAuthentication} for the different options.<br/> + * Default value is StramHTTPAuthentication.FOLLOW_HADOOP_AUTH + */ + Attribute<StramHTTPAuthentication> STRAM_HTTP_AUTHENTICATION = new Attribute<>(StramHTTPAuthentication.FOLLOW_HADOOP_AUTH, new StringCodec.Enum2String<>(StramHTTPAuthentication.class)); + /** * The string codec map for classes that are to be set or get through properties as strings. * Only supports string codecs that have a constructor with no arguments */ http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/58e3a31d/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 8565275..31a7fc8 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -18,18 +18,28 @@ */ package com.datatorrent.stram; -import java.io.*; +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import javax.xml.bind.annotation.XmlElement; -import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; @@ -45,7 +55,17 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; -import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -60,8 +80,8 @@ import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.WebApps; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; import com.datatorrent.api.Attribute; import com.datatorrent.api.AutoMetric; @@ -84,6 +104,7 @@ import com.datatorrent.stram.security.StramDelegationTokenIdentifier; import com.datatorrent.stram.security.StramDelegationTokenManager; import com.datatorrent.stram.security.StramUserLogin; import com.datatorrent.stram.security.StramWSFilterInitializer; +import com.datatorrent.stram.util.SecurityUtils; import com.datatorrent.stram.webapp.AppInfo; import com.datatorrent.stram.webapp.StramWebApp; @@ -522,6 +543,9 @@ public class StreamingAppMasterService extends CompositeService LOG.info("Starting application with {} operators in {} containers", dnmgr.getPhysicalPlan().getAllOperators().size(), dnmgr.getPhysicalPlan().getContainers().size()); + // Setup security configuration such as that for web security + SecurityUtils.init(conf, dag.getValue(LogicalPlan.STRAM_HTTP_AUTHENTICATION)); + if (UserGroupInformation.isSecurityEnabled()) { // TODO :- Need to perform token renewal delegationTokenManager = new StramDelegationTokenManager(DELEGATION_KEY_UPDATE_INTERVAL, DELEGATION_TOKEN_MAX_LIFETIME, DELEGATION_TOKEN_RENEW_INTERVAL, DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL); @@ -570,7 +594,7 @@ public class StreamingAppMasterService extends CompositeService try { Configuration config = getConfig(); - if (UserGroupInformation.isSecurityEnabled()) { + if (SecurityUtils.isStramWebSecurityEnabled()) { config = new Configuration(config); config.set("hadoop.http.filter.initializers", StramWSFilterInitializer.class.getCanonicalName()); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/58e3a31d/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 173298b..d4c70ed 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -21,34 +21,71 @@ package com.datatorrent.stram.plan.logical; import java.beans.IntrospectionException; import java.beans.Introspector; import java.beans.PropertyDescriptor; -import java.io.*; -import java.lang.reflect.*; -import java.util.*; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.lang.reflect.Field; +import java.lang.reflect.GenericArrayType; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.lang.reflect.TypeVariable; +import java.lang.reflect.WildcardType; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.Stack; import java.util.concurrent.atomic.AtomicInteger; -import javax.validation.*; +import javax.validation.ConstraintViolation; +import javax.validation.ConstraintViolationException; +import javax.validation.Validation; +import javax.validation.ValidationException; +import javax.validation.Validator; +import javax.validation.ValidatorFactory; import javax.validation.constraints.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Sets; -import com.datatorrent.api.*; +import com.datatorrent.api.Attribute; import com.datatorrent.api.Attribute.AttributeMap; import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap; +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.DAG; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Module; import com.datatorrent.api.Module.ProxyInputPort; import com.datatorrent.api.Module.ProxyOutputPort; +import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.InputPort; import com.datatorrent.api.Operator.OutputPort; import com.datatorrent.api.Operator.Unifier; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.StreamCodec; +import com.datatorrent.api.StringCodec; import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/58e3a31d/engine/src/main/java/com/datatorrent/stram/util/SecurityUtils.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/util/SecurityUtils.java b/engine/src/main/java/com/datatorrent/stram/util/SecurityUtils.java new file mode 100644 index 0000000..e897416 --- /dev/null +++ b/engine/src/main/java/com/datatorrent/stram/util/SecurityUtils.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.StramHTTPAuthentication; + +/** + * + */ +public class SecurityUtils +{ + + public static final String HADOOP_HTTP_AUTH_PROP = "hadoop.http.authentication.type"; + private static final String HADOOP_HTTP_AUTH_VALUE_SIMPLE = "simple"; + + private static boolean stramWebSecurityEnabled; + private static boolean hadoopWebSecurityEnabled; + + // If not initialized explicitly default to Hadoop auth + static { + hadoopWebSecurityEnabled = stramWebSecurityEnabled = UserGroupInformation.isSecurityEnabled(); + } + + public static void init(Configuration configuration, StramHTTPAuthentication stramHTTPAuth) + { + hadoopWebSecurityEnabled = false; + String authValue = configuration.get(HADOOP_HTTP_AUTH_PROP); + if ((authValue != null) && !authValue.equals(HADOOP_HTTP_AUTH_VALUE_SIMPLE)) { + hadoopWebSecurityEnabled = true; + } + // Stram http auth may not be specified and is null but still set a default + boolean authDefault = false; + if (stramHTTPAuth != null) { + if (stramHTTPAuth == Context.StramHTTPAuthentication.FOLLOW_HADOOP_HTTP_AUTH) { + stramWebSecurityEnabled = hadoopWebSecurityEnabled; + } else if (stramHTTPAuth == StramHTTPAuthentication.FOLLOW_HADOOP_AUTH) { + stramWebSecurityEnabled = UserGroupInformation.isSecurityEnabled(); + } else if (stramHTTPAuth == StramHTTPAuthentication.ENABLE) { + stramWebSecurityEnabled = true; + } else if (stramHTTPAuth == StramHTTPAuthentication.DISABLE) { + stramWebSecurityEnabled = false; + } + } + } + + public static boolean isHadoopWebSecurityEnabled() + { + return hadoopWebSecurityEnabled; + } + + public static boolean isStramWebSecurityEnabled() + { + return stramWebSecurityEnabled; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/58e3a31d/engine/src/main/java/com/datatorrent/stram/util/WebServicesClient.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/util/WebServicesClient.java b/engine/src/main/java/com/datatorrent/stram/util/WebServicesClient.java index 2710352..f2aa896 100644 --- a/engine/src/main/java/com/datatorrent/stram/util/WebServicesClient.java +++ b/engine/src/main/java/com/datatorrent/stram/util/WebServicesClient.java @@ -22,18 +22,9 @@ import java.io.IOException; import java.security.Principal; import java.util.concurrent.Future; -import com.sun.jersey.api.client.AsyncWebResource; -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.api.client.async.ITypeListener; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.client.apache4.ApacheHttpClient4Handler; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.http.auth.AuthSchemeProvider; import org.apache.http.auth.AuthScope; import org.apache.http.auth.Credentials; @@ -47,6 +38,14 @@ import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import com.sun.jersey.api.client.AsyncWebResource; +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.async.ITypeListener; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.client.apache4.ApacheHttpClient4Handler; + /** * <p>WebServicesClient class.</p> * @@ -96,7 +95,7 @@ public class WebServicesClient public WebServicesClient(ClientConfig config) { - if (UserGroupInformation.isSecurityEnabled()) { + if (SecurityUtils.isHadoopWebSecurityEnabled()) { HttpClientBuilder httpClientBuilder = HttpClientBuilder.create(); httpClientBuilder.setConnectionManager(connectionManager); httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/58e3a31d/engine/src/test/java/com/datatorrent/stram/util/SecurityUtilsTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/util/SecurityUtilsTest.java b/engine/src/test/java/com/datatorrent/stram/util/SecurityUtilsTest.java new file mode 100644 index 0000000..fa7f2b1 --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/util/SecurityUtilsTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram.util; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; + +/** + * + */ +public class SecurityUtilsTest +{ + @Test + public void testStramWebSecurity() + { + checkWebSecurity(false, false); + Configuration conf = new Configuration(); + checkSecurityConfiguration(conf, new boolean[][] {{false, false}, {false, true}, {false, false}, {false, false}, + {false, false}}); + conf.set(SecurityUtils.HADOOP_HTTP_AUTH_PROP, "kerberos"); + checkSecurityConfiguration(conf, new boolean[][] {{true, false}, {true, true}, {true, false}, {true, false}, + {true, true}}); + } + + private void checkSecurityConfiguration(Configuration conf, boolean[][] securityConf) + { + Assert.assertEquals("Number variations", 5, securityConf.length); + SecurityUtils.init(conf, null); + checkWebSecurity(securityConf[0][0], securityConf[0][1]); + SecurityUtils.init(conf, Context.StramHTTPAuthentication.ENABLE); + checkWebSecurity(securityConf[1][0], securityConf[1][1]); + SecurityUtils.init(conf, Context.StramHTTPAuthentication.DISABLE); + checkWebSecurity(securityConf[2][0], securityConf[2][1]); + SecurityUtils.init(conf, Context.StramHTTPAuthentication.FOLLOW_HADOOP_AUTH); + checkWebSecurity(securityConf[3][0], securityConf[3][1]); + SecurityUtils.init(conf, Context.StramHTTPAuthentication.FOLLOW_HADOOP_HTTP_AUTH); + checkWebSecurity(securityConf[4][0], securityConf[4][1]); + } + + private void checkWebSecurity(boolean hadoopWebSecurity, boolean stramWebSecurity) + { + Assert.assertEquals("Hadoop web security", hadoopWebSecurity, SecurityUtils.isHadoopWebSecurityEnabled()); + Assert.assertEquals("Hadoop web security", stramWebSecurity, SecurityUtils.isStramWebSecurityEnabled()); + } +}
