This is an automated email from the ASF dual-hosted git repository. abhi pushed a commit to branch ranger-5039 in repository https://gitbox.apache.org/repos/asf/ranger.git
commit 788f25ea318440fc6cc366debd3e95a078d19e4d Author: Abhishek Kumar <[email protected]> AuthorDate: Wed Dec 18 00:50:23 2024 -0800 RANGER-5039: checkstyle compliance updates - plugin-kafka module --- plugin-kafka/pom.xml | 2 + .../kafka/authorizer/RangerKafkaAuditHandler.java | 86 ++-- .../kafka/authorizer/RangerKafkaAuthorizer.java | 557 ++++++++++----------- .../ranger/services/kafka/RangerServiceKafka.java | 192 ++++--- .../services/kafka/client/ServiceKafkaClient.java | 398 +++++++-------- .../kafka/client/ServiceKafkaConnectionMgr.java | 127 +++-- .../authorizer/KafkaRangerAuthorizerGSSTest.java | 189 ++++--- .../KafkaRangerAuthorizerSASLSSLTest.java | 126 +++-- .../authorizer/KafkaRangerAuthorizerTest.java | 145 +++--- .../authorizer/KafkaRangerTopicCreationTest.java | 116 +++-- .../kafka/authorizer/KafkaTestUtils.java | 103 ++-- .../kafka/authorizer/RangerAdminClientImpl.java | 28 +- 12 files changed, 996 insertions(+), 1073 deletions(-) diff --git a/plugin-kafka/pom.xml b/plugin-kafka/pom.xml index 5e64e2d28..9e2db449b 100644 --- a/plugin-kafka/pom.xml +++ b/plugin-kafka/pom.xml @@ -28,6 +28,8 @@ <name>KAFKA Security Plugin</name> <description>KAFKA Security Plugin</description> <properties> + <checkstyle.failOnViolation>true</checkstyle.failOnViolation> + <checkstyle.skip>false</checkstyle.skip> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java index f16fbd6b6..361a50e22 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuditHandler.java @@ -17,7 +17,6 @@ * under the License. */ - package org.apache.ranger.authorization.kafka.authorizer; import org.apache.ranger.audit.model.AuthzAuditEvent; @@ -33,80 +32,73 @@ import java.util.Collection; public class RangerKafkaAuditHandler extends RangerDefaultAuditHandler { private static final Logger LOG = LoggerFactory.getLogger(RangerKafkaAuditHandler.class); - private AuthzAuditEvent auditEvent = null; + private AuthzAuditEvent auditEvent; - public RangerKafkaAuditHandler(){ + public RangerKafkaAuditHandler() { } @Override public void processResult(RangerAccessResult result) { // If Cluster Resource Level Topic Creation is not Allowed we don't audit. // Subsequent call from Kafka for Topic Creation at Topic resource Level will be audited. - if(LOG.isTraceEnabled()) { - LOG.trace("==> RangerKafkaAuditHandler.processResult()"); - } + LOG.trace("==> RangerKafkaAuditHandler.processResult()"); + if (!isAuditingNeeded(result)) { return; } + auditEvent = super.getAuthzEvents(result); - if(LOG.isTraceEnabled()) { - LOG.trace("<== RangerKafkaAuditHandler.processResult()"); - } + + LOG.trace("<== RangerKafkaAuditHandler.processResult()"); } + @Override public void processResults(Collection<RangerAccessResult> results) { - if(LOG.isTraceEnabled()) { - LOG.trace("==> RangerKafkaAuditHandler.processResults(" + results + ")"); - } - if (results!=null){ - for(RangerAccessResult res: results){ + LOG.trace("==> RangerKafkaAuditHandler.processResults({})", results); + + if (results != null) { + for (RangerAccessResult res : results) { processResult(res); flushAudit(); } } - if(LOG.isTraceEnabled()) { - LOG.trace("<== RangerKafkaAuditHandler.processResults(" + results + ")"); - } + LOG.trace("<== RangerKafkaAuditHandler.processResults({})", results); } + public void flushAudit() { + LOG.trace("==> RangerKafkaAuditHandler.flushAudit(AuditEvent: {})", auditEvent); - private boolean isAuditingNeeded(final RangerAccessResult result) { - if(LOG.isTraceEnabled()) { - LOG.trace("==> RangerKafkaAuditHandler.isAuditingNeeded()"); + if (auditEvent != null) { + super.logAuthzAudit(auditEvent); } - boolean ret = true; - boolean isAllowed = result.getIsAllowed(); - RangerAccessRequest request = result.getAccessRequest(); - RangerAccessResourceImpl resource = (RangerAccessResourceImpl) request.getResource(); - String resourceName = (String) resource.getValue(RangerKafkaAuthorizer.KEY_CLUSTER); + + LOG.trace("<== RangerKafkaAuditHandler.flushAudit()"); + } + + private boolean isAuditingNeeded(final RangerAccessResult result) { + LOG.trace("==> RangerKafkaAuditHandler.isAuditingNeeded()"); + + boolean ret = true; + boolean isAllowed = result.getIsAllowed(); + RangerAccessRequest request = result.getAccessRequest(); + RangerAccessResourceImpl resource = (RangerAccessResourceImpl) request.getResource(); + String resourceName = (String) resource.getValue(RangerKafkaAuthorizer.KEY_CLUSTER); + if (resourceName != null) { if (request.getAccessType().equalsIgnoreCase(RangerKafkaAuthorizer.ACCESS_TYPE_CREATE) && !isAllowed) { ret = false; } } - if(LOG.isTraceEnabled()) { - LOG.trace("RangerKafkaAuditHandler: isAuditingNeeded()"); - LOG.trace("request:"+request); - LOG.trace("resource:"+resource); - LOG.trace("resourceName:"+resourceName); - LOG.trace("request.getAccessType():"+request.getAccessType()); - LOG.trace("isAllowed:"+isAllowed); - LOG.trace("ret="+ret); - LOG.trace("<== RangerKafkaAuditHandler.isAuditingNeeded() = "+ret+" for result="+result); - } - return ret; - } - public void flushAudit() { - if(LOG.isTraceEnabled()) { - LOG.trace("==> RangerKafkaAuditHandler.flushAudit(" + "AuditEvent: " + auditEvent+")"); - } - if (auditEvent != null) { - super.logAuthzAudit(auditEvent); - } - if(LOG.isTraceEnabled()) { - LOG.trace("<== RangerKafkaAuditHandler.flushAudit()"); - } + LOG.trace("RangerKafkaAuditHandler: isAuditingNeeded()"); + LOG.trace("request: {}", request); + LOG.trace("resource: {}", resource); + LOG.trace("resourceName: {}", resourceName); + LOG.trace("request.getAccessType(): {}", request.getAccessType()); + LOG.trace("isAllowed: {}", isAllowed); + LOG.trace("ret = {}", ret); + LOG.trace("<== RangerKafkaAuditHandler.isAuditingNeeded() = {} for result = {}", ret, result); + return ret; } } diff --git a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java index 96a36abe9..8b83c9775 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java @@ -19,17 +19,6 @@ package org.apache.ranger.authorization.kafka.authorizer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.stream.Collectors; - import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -60,291 +49,301 @@ import org.apache.ranger.plugin.util.RangerPerfTracer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.stream.Collectors; + public class RangerKafkaAuthorizer implements Authorizer { - private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class); - private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request"); - - public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs"; - public static final String KEY_TOPIC = "topic"; - public static final String KEY_CLUSTER = "cluster"; - public static final String KEY_CONSUMER_GROUP = "consumergroup"; - public static final String KEY_TRANSACTIONALID = "transactionalid"; - public static final String KEY_DELEGATIONTOKEN = "delegationtoken"; - public static final String ACCESS_TYPE_READ = "consume"; - public static final String ACCESS_TYPE_WRITE = "publish"; - public static final String ACCESS_TYPE_CREATE = "create"; - public static final String ACCESS_TYPE_DELETE = "delete"; - public static final String ACCESS_TYPE_CONFIGURE = "configure"; - public static final String ACCESS_TYPE_DESCRIBE = "describe"; - public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs"; - public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action"; - public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write"; - - private static volatile RangerBasePlugin rangerPlugin = null; - RangerKafkaAuditHandler auditHandler = null; - - public RangerKafkaAuthorizer() { - } - - private static String mapToRangerAccessType(AclOperation operation) { - switch (operation) { - case READ: - return ACCESS_TYPE_READ; - case WRITE: - return ACCESS_TYPE_WRITE; - case ALTER: - return ACCESS_TYPE_CONFIGURE; - case DESCRIBE: - return ACCESS_TYPE_DESCRIBE; - case CLUSTER_ACTION: - return ACCESS_TYPE_CLUSTER_ACTION; - case CREATE: - return ACCESS_TYPE_CREATE; - case DELETE: - return ACCESS_TYPE_DELETE; - case DESCRIBE_CONFIGS: - return ACCESS_TYPE_DESCRIBE_CONFIGS; - case ALTER_CONFIGS: - return ACCESS_TYPE_ALTER_CONFIGS; - case IDEMPOTENT_WRITE: - return ACCESS_TYPE_IDEMPOTENT_WRITE; - case UNKNOWN: - case ANY: - case ALL: - default: - return null; + private static final Logger logger = LoggerFactory.getLogger(RangerKafkaAuthorizer.class); + private static final Logger PERF_KAFKAAUTH_REQUEST_LOG = RangerPerfTracer.getPerfLogger("kafkaauth.request"); + + public static final String ACCESS_TYPE_ALTER_CONFIGS = "alter_configs"; + public static final String KEY_TOPIC = "topic"; + public static final String KEY_CLUSTER = "cluster"; + public static final String KEY_CONSUMER_GROUP = "consumergroup"; + public static final String KEY_TRANSACTIONALID = "transactionalid"; + public static final String KEY_DELEGATIONTOKEN = "delegationtoken"; + public static final String ACCESS_TYPE_READ = "consume"; + public static final String ACCESS_TYPE_WRITE = "publish"; + public static final String ACCESS_TYPE_CREATE = "create"; + public static final String ACCESS_TYPE_DELETE = "delete"; + public static final String ACCESS_TYPE_CONFIGURE = "configure"; + public static final String ACCESS_TYPE_DESCRIBE = "describe"; + public static final String ACCESS_TYPE_DESCRIBE_CONFIGS = "describe_configs"; + public static final String ACCESS_TYPE_CLUSTER_ACTION = "cluster_action"; + public static final String ACCESS_TYPE_IDEMPOTENT_WRITE = "idempotent_write"; + + private static volatile RangerBasePlugin rangerPlugin; + + RangerKafkaAuditHandler auditHandler; + + public RangerKafkaAuthorizer() { + } + + @Override + public void close() { + logger.info("close() called on authorizer."); + try { + if (rangerPlugin != null) { + rangerPlugin.cleanup(); + } + } catch (Throwable t) { + logger.error("Error closing RangerPlugin.", t); + } + } + + @Override + public void configure(Map<String, ?> configs) { + RangerBasePlugin me = rangerPlugin; + if (me == null) { + synchronized (RangerKafkaAuthorizer.class) { + me = rangerPlugin; + if (me == null) { + try { + // Possible to override JAAS configuration which is used by Ranger, otherwise + // SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer', + // if it's not defined, then it reverts to 'KafkaServer' configuration. + final Object jaasContext = configs.get("ranger.jaas.context"); + final String listenerName = (jaasContext instanceof String + && StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext + : SecurityProtocol.SASL_PLAINTEXT.name(); + final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM; + JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs); + MiscUtil.setUGIFromJAASConfig(context.name()); + UserGroupInformation loginUser = MiscUtil.getUGILoginUser(); + logger.info("LoginUser = {}", loginUser); + } catch (Throwable t) { + logger.error("Error getting principal.", t); + } + rangerPlugin = new RangerBasePlugin("kafka", "kafka"); + logger.info("Calling plugin.init()"); + rangerPlugin.init(); + auditHandler = new RangerKafkaAuditHandler(); + rangerPlugin.setResultProcessor(auditHandler); + } + } + } + } + + @Override + public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo) { + return serverInfo.endpoints().stream() + .collect(Collectors.toMap(endpoint -> endpoint, endpoint -> CompletableFuture.completedFuture(null), (a, b) -> b)); + } + + @Override + public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) { + if (rangerPlugin == null) { + MiscUtil.logErrorMessageByInterval(logger, "Authorizer is still not initialized"); + return denyAll(actions); + } + + RangerPerfTracer perf = null; + if (RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) { + perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(actions=" + actions + ")"); + } + try { + return wrappedAuthorization(requestContext, actions); + } finally { + RangerPerfTracer.log(perf); + } } - } - - private static String mapToResourceType(ResourceType resourceType) { - switch (resourceType) { - case TOPIC: - return KEY_TOPIC; - case CLUSTER: - return KEY_CLUSTER; - case GROUP: - return KEY_CONSUMER_GROUP; - case TRANSACTIONAL_ID: - return KEY_TRANSACTIONALID; - case DELEGATION_TOKEN: - return KEY_DELEGATIONTOKEN; - case ANY: - case UNKNOWN: - default: - return null; + + @Override + public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) { + logger.error("createAcls is not supported by Ranger for Kafka"); + + return aclBindings.stream() + .map(ab -> { + CompletableFuture<AclCreateResult> completableFuture = new CompletableFuture<>(); + completableFuture.completeExceptionally(new UnsupportedOperationException("createAcls is not supported by Ranger for Kafka")); + return completableFuture; + }) + .collect(Collectors.toList()); } - } - - private static RangerAccessResourceImpl createRangerAccessResource(String resourceTypeKey, String resourceName) { - RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl(); - rangerResource.setValue(resourceTypeKey, resourceName); - return rangerResource; - } - - private static RangerAccessRequestImpl createRangerAccessRequest(String userName, - Set<String> userGroups, - String ip, - Date eventTime, - String resourceTypeKey, - String resourceName, - String accessType) { - RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl(); - rangerRequest.setResource(createRangerAccessResource(resourceTypeKey, resourceName)); - rangerRequest.setUser(userName); - rangerRequest.setUserGroups(userGroups); - rangerRequest.setClientIPAddress(ip); - rangerRequest.setAccessTime(eventTime); - rangerRequest.setAccessType(accessType); - rangerRequest.setAction(accessType); - rangerRequest.setRequestData(resourceName); - return rangerRequest; - } - - private static List<AuthorizationResult> denyAll(List<Action> actions) { - return actions.stream().map(a -> AuthorizationResult.DENIED).collect(Collectors.toList()); - } - - private static List<AuthorizationResult> mapResults(List<Action> actions, Collection<RangerAccessResult> results) { - if (CollectionUtils.isEmpty(results)) { - logger.error("Ranger Plugin returned null or empty. Returning Denied for all"); - return denyAll(actions); + + @Override + public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters) { + logger.error("deleteAcls is not supported by Ranger for Kafka"); + return aclBindingFilters.stream() + .map(ab -> { + CompletableFuture<AclDeleteResult> completableFuture = new CompletableFuture<>(); + completableFuture.completeExceptionally(new UnsupportedOperationException("deleteAcls is not supported by Ranger for Kafka")); + return completableFuture; + }) + .collect(Collectors.toList()); } - return results.stream() - .map(r -> r != null && r.getIsAllowed() ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED) - .collect(Collectors.toList()); - } - - private static String toString(AuthorizableRequestContext requestContext) { - return requestContext == null ? null : - String.format("AuthorizableRequestContext{principal=%s, clientAddress=%s, clientId=%s}", - requestContext.principal(), requestContext.clientAddress(), requestContext.clientId()); - } - - @Override - public void close() { - logger.info("close() called on authorizer."); - try { - if (rangerPlugin != null) { - rangerPlugin.cleanup(); - } - } catch (Throwable t) { - logger.error("Error closing RangerPlugin.", t); + + @Override + public Iterable<AclBinding> acls(AclBindingFilter filter) { + logger.error("(getting) acls is not supported by Ranger for Kafka"); + throw new UnsupportedOperationException("(getting) acls is not supported by Ranger for Kafka"); } - } - - @Override - public void configure(Map<String, ?> configs) { - RangerBasePlugin me = rangerPlugin; - if (me == null) { - synchronized (RangerKafkaAuthorizer.class) { - me = rangerPlugin; - if (me == null) { - try { - // Possible to override JAAS configuration which is used by Ranger, otherwise - // SASL_PLAINTEXT is used, which force Kafka to use 'sasl_plaintext.KafkaServer', - // if it's not defined, then it reverts to 'KafkaServer' configuration. - final Object jaasContext = configs.get("ranger.jaas.context"); - final String listenerName = (jaasContext instanceof String - && StringUtils.isNotEmpty((String) jaasContext)) ? (String) jaasContext - : SecurityProtocol.SASL_PLAINTEXT.name(); - final String saslMechanism = SaslConfigs.GSSAPI_MECHANISM; - JaasContext context = JaasContext.loadServerContext(new ListenerName(listenerName), saslMechanism, configs); - MiscUtil.setUGIFromJAASConfig(context.name()); - UserGroupInformation loginUser = MiscUtil.getUGILoginUser(); - logger.info("LoginUser={}", loginUser); - } catch (Throwable t) { - logger.error("Error getting principal.", t); - } - rangerPlugin = new RangerBasePlugin("kafka", "kafka"); - logger.info("Calling plugin.init()"); - rangerPlugin.init(); - auditHandler = new RangerKafkaAuditHandler(); - rangerPlugin.setResultProcessor(auditHandler); + + // TODO: provide a real implementation (RANGER-3809) + // Currently we return a dummy implementation because KAFKA-13598 makes producers idempotent by default and this causes + // a failure in the InitProducerId API call on the broker side because of the missing acls() method implementation. + // Overriding this with a dummy impl will make Kafka return an authorization error instead of an exception if the + // IDEMPOTENT_WRITE permission wasn't set on the producer. + @Override + public AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { + SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); + + logger.debug("authorizeByResourceType call is not supported by Ranger for Kafka yet"); + return AuthorizationResult.DENIED; + } + + private static String mapToRangerAccessType(AclOperation operation) { + switch (operation) { + case READ: + return ACCESS_TYPE_READ; + case WRITE: + return ACCESS_TYPE_WRITE; + case ALTER: + return ACCESS_TYPE_CONFIGURE; + case DESCRIBE: + return ACCESS_TYPE_DESCRIBE; + case CLUSTER_ACTION: + return ACCESS_TYPE_CLUSTER_ACTION; + case CREATE: + return ACCESS_TYPE_CREATE; + case DELETE: + return ACCESS_TYPE_DELETE; + case DESCRIBE_CONFIGS: + return ACCESS_TYPE_DESCRIBE_CONFIGS; + case ALTER_CONFIGS: + return ACCESS_TYPE_ALTER_CONFIGS; + case IDEMPOTENT_WRITE: + return ACCESS_TYPE_IDEMPOTENT_WRITE; + case UNKNOWN: + case ANY: + case ALL: + default: + return null; } - } } - } - - @Override - public Map<Endpoint, ? extends CompletionStage<Void>> start(AuthorizerServerInfo serverInfo) { - return serverInfo.endpoints().stream() - .collect(Collectors.toMap(endpoint -> endpoint, endpoint -> CompletableFuture.completedFuture(null), (a, b) -> b)); - } - - @Override - public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) { - if (rangerPlugin == null) { - MiscUtil.logErrorMessageByInterval(logger, "Authorizer is still not initialized"); - return denyAll(actions); + + private static String mapToResourceType(ResourceType resourceType) { + switch (resourceType) { + case TOPIC: + return KEY_TOPIC; + case CLUSTER: + return KEY_CLUSTER; + case GROUP: + return KEY_CONSUMER_GROUP; + case TRANSACTIONAL_ID: + return KEY_TRANSACTIONALID; + case DELEGATION_TOKEN: + return KEY_DELEGATIONTOKEN; + case ANY: + case UNKNOWN: + default: + return null; + } } - RangerPerfTracer perf = null; - if (RangerPerfTracer.isPerfTraceEnabled(PERF_KAFKAAUTH_REQUEST_LOG)) { - perf = RangerPerfTracer.getPerfTracer(PERF_KAFKAAUTH_REQUEST_LOG, "RangerKafkaAuthorizer.authorize(actions=" + actions + ")"); + private static RangerAccessResourceImpl createRangerAccessResource(String resourceTypeKey, String resourceName) { + RangerAccessResourceImpl rangerResource = new RangerAccessResourceImpl(); + rangerResource.setValue(resourceTypeKey, resourceName); + return rangerResource; } - try { - return wrappedAuthorization(requestContext, actions); - } finally { - RangerPerfTracer.log(perf); + + private static RangerAccessRequestImpl createRangerAccessRequest(String userName, + Set<String> userGroups, + String ip, + Date eventTime, + String resourceTypeKey, + String resourceName, + String accessType) { + RangerAccessRequestImpl rangerRequest = new RangerAccessRequestImpl(); + rangerRequest.setResource(createRangerAccessResource(resourceTypeKey, resourceName)); + rangerRequest.setUser(userName); + rangerRequest.setUserGroups(userGroups); + rangerRequest.setClientIPAddress(ip); + rangerRequest.setAccessTime(eventTime); + rangerRequest.setAccessType(accessType); + rangerRequest.setAction(accessType); + rangerRequest.setRequestData(resourceName); + return rangerRequest; + } + + private static List<AuthorizationResult> denyAll(List<Action> actions) { + return actions.stream().map(a -> AuthorizationResult.DENIED).collect(Collectors.toList()); } - } - private List<AuthorizationResult> wrappedAuthorization(AuthorizableRequestContext requestContext, List<Action> actions) { - if (CollectionUtils.isEmpty(actions)) { - return Collections.emptyList(); + private static List<AuthorizationResult> mapResults(List<Action> actions, Collection<RangerAccessResult> results) { + if (CollectionUtils.isEmpty(results)) { + logger.error("Ranger Plugin returned null or empty. Returning Denied for all"); + return denyAll(actions); + } + return results.stream() + .map(r -> r != null && r.getIsAllowed() ? AuthorizationResult.ALLOWED : AuthorizationResult.DENIED) + .collect(Collectors.toList()); } - String userName = requestContext.principal() == null ? null : requestContext.principal().getName(); - Set<String> userGroups = MiscUtil.getGroupsForRequestUser(userName); - String hostAddress = requestContext.clientAddress() == null ? null : requestContext.clientAddress().getHostAddress(); - String ip = StringUtils.isNotEmpty(hostAddress) && hostAddress.charAt(0) == '/' ? hostAddress.substring(1) : hostAddress; - Date eventTime = new Date(); - - List<RangerAccessRequest> rangerRequests = new ArrayList<>(); - for (Action action : actions) { - String accessType = mapToRangerAccessType(action.operation()); - if (accessType == null) { - MiscUtil.logErrorMessageByInterval(logger, "Unsupported access type, requestContext=" + toString(requestContext) + - ", actions=" + actions + ", operation=" + action.operation()); - return denyAll(actions); - } - String resourceTypeKey = mapToResourceType(action.resourcePattern().resourceType()); - if (resourceTypeKey == null) { - MiscUtil.logErrorMessageByInterval(logger, "Unsupported resource type, requestContext=" + toString(requestContext) + - ", actions=" + actions + ", resourceType=" + action.resourcePattern().resourceType()); - return denyAll(actions); - } - - RangerAccessRequestImpl rangerAccessRequest = createRangerAccessRequest( - userName, - userGroups, - ip, - eventTime, - resourceTypeKey, - action.resourcePattern().name(), - accessType); - rangerRequests.add(rangerAccessRequest); + + private static String toString(AuthorizableRequestContext requestContext) { + return requestContext == null ? null : + String.format("AuthorizableRequestContext{principal=%s, clientAddress=%s, clientId=%s}", + requestContext.principal(), requestContext.clientAddress(), requestContext.clientId()); } - Collection<RangerAccessResult> results = callRangerPlugin(rangerRequests); + private List<AuthorizationResult> wrappedAuthorization(AuthorizableRequestContext requestContext, List<Action> actions) { + if (CollectionUtils.isEmpty(actions)) { + return Collections.emptyList(); + } + String userName = requestContext.principal() == null ? null : requestContext.principal().getName(); + Set<String> userGroups = MiscUtil.getGroupsForRequestUser(userName); + String hostAddress = requestContext.clientAddress() == null ? null : requestContext.clientAddress().getHostAddress(); + String ip = StringUtils.isNotEmpty(hostAddress) && hostAddress.charAt(0) == '/' ? hostAddress.substring(1) : hostAddress; + Date eventTime = new Date(); + + List<RangerAccessRequest> rangerRequests = new ArrayList<>(); + for (Action action : actions) { + String accessType = mapToRangerAccessType(action.operation()); + if (accessType == null) { + MiscUtil.logErrorMessageByInterval(logger, "Unsupported access type, requestContext=" + toString(requestContext) + ", actions=" + actions + ", operation=" + action.operation()); + return denyAll(actions); + } + String resourceTypeKey = mapToResourceType(action.resourcePattern().resourceType()); + if (resourceTypeKey == null) { + MiscUtil.logErrorMessageByInterval(logger, "Unsupported resource type, requestContext=" + toString(requestContext) + ", actions=" + actions + ", resourceType=" + action.resourcePattern().resourceType()); + return denyAll(actions); + } + + RangerAccessRequestImpl rangerAccessRequest = createRangerAccessRequest( + userName, + userGroups, + ip, + eventTime, + resourceTypeKey, + action.resourcePattern().name(), + accessType); + rangerRequests.add(rangerAccessRequest); + } - List<AuthorizationResult> authorizationResults = mapResults(actions, results); + Collection<RangerAccessResult> results = callRangerPlugin(rangerRequests); - logger.debug("rangerRequests={}, return={}", rangerRequests, authorizationResults); - return authorizationResults; - } + List<AuthorizationResult> authorizationResults = mapResults(actions, results); - private Collection<RangerAccessResult> callRangerPlugin(List<RangerAccessRequest> rangerRequests) { - try { - return rangerPlugin.isAccessAllowed(rangerRequests); - } catch (Throwable t) { - logger.error("Error while calling isAccessAllowed(). requests={}", rangerRequests, t); - return null; - } finally { - auditHandler.flushAudit(); + logger.debug("rangerRequests={}, return={}", rangerRequests, authorizationResults); + return authorizationResults; + } + + private Collection<RangerAccessResult> callRangerPlugin(List<RangerAccessRequest> rangerRequests) { + try { + return rangerPlugin.isAccessAllowed(rangerRequests); + } catch (Throwable t) { + logger.error("Error while calling isAccessAllowed(). requests={}", rangerRequests, t); + return null; + } finally { + auditHandler.flushAudit(); + } } - } - - @Override - public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) { - logger.error("createAcls is not supported by Ranger for Kafka"); - - return aclBindings.stream() - .map(ab -> { - CompletableFuture<AclCreateResult> completableFuture = new CompletableFuture<>(); - completableFuture.completeExceptionally(new UnsupportedOperationException("createAcls is not supported by Ranger for Kafka")); - return completableFuture; - }) - .collect(Collectors.toList()); - } - - @Override - public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters) { - logger.error("deleteAcls is not supported by Ranger for Kafka"); - return aclBindingFilters.stream() - .map(ab -> { - CompletableFuture<AclDeleteResult> completableFuture = new CompletableFuture<>(); - completableFuture.completeExceptionally(new UnsupportedOperationException("deleteAcls is not supported by Ranger for Kafka")); - return completableFuture; - }) - .collect(Collectors.toList()); - } - - // TODO: provide a real implementation (RANGER-3809) - // Currently we return a dummy implementation because KAFKA-13598 makes producers idempotent by default and this causes - // a failure in the InitProducerId API call on the broker side because of the missing acls() method implementation. - // Overriding this with a dummy impl will make Kafka return an authorization error instead of an exception if the - // IDEMPOTENT_WRITE permission wasn't set on the producer. - @Override - public AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) { - SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); - - logger.debug("authorizeByResourceType call is not supported by Ranger for Kafka yet"); - return AuthorizationResult.DENIED; - } - - @Override - public Iterable<AclBinding> acls(AclBindingFilter filter) { - logger.error("(getting) acls is not supported by Ranger for Kafka"); - throw new UnsupportedOperationException("(getting) acls is not supported by Ranger for Kafka"); - } } diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java index 537333a3e..dc872f4d1 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/RangerServiceKafka.java @@ -19,11 +19,6 @@ package org.apache.ranger.services.kafka; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.commons.lang.StringUtils; import org.apache.ranger.plugin.model.RangerPolicy; import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyItem; @@ -37,107 +32,94 @@ import org.apache.ranger.services.kafka.client.ServiceKafkaConnectionMgr; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import static org.apache.ranger.plugin.policyengine.RangerPolicyEngine.GROUP_PUBLIC; public class RangerServiceKafka extends RangerBaseService { - private static final Logger LOG = LoggerFactory.getLogger(RangerServiceKafka.class); - public static final String ACCESS_TYPE_DESCRIBE = "describe"; - - public RangerServiceKafka() { - super(); - } - - @Override - public void init(RangerServiceDef serviceDef, RangerService service) { - super.init(serviceDef, service); - } - - @Override - public Map<String, Object> validateConfig() throws Exception { - Map<String, Object> ret = new HashMap<String, Object>(); - - if (LOG.isDebugEnabled()) { - LOG.debug("==> RangerServiceKafka.validateConfig(" + serviceName + ")"); - } - - if (configs != null) { - try { - ret = ServiceKafkaConnectionMgr.connectionTest(serviceName, configs); - } catch (Exception e) { - LOG.error("<== RangerServiceKafka.validateConfig Error:" + e); - throw e; - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== RangerServiceKafka.validateConfig(" + serviceName + "): ret=" + ret); - } - - return ret; - } - - @Override - public List<String> lookupResource(ResourceLookupContext context) throws Exception { - List<String> ret = null; - - if (LOG.isDebugEnabled()) { - LOG.debug("==> RangerServiceKafka.lookupResource(" + serviceName + ")"); - } - - if (configs != null) { - ServiceKafkaClient serviceKafkaClient = ServiceKafkaConnectionMgr.getKafkaClient(serviceName, configs); - - ret = serviceKafkaClient.getResources(context); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== RangerServiceKafka.lookupResource(" + serviceName + "): ret=" + ret); - } - - return ret; - } - - @Override - public List<RangerPolicy> getDefaultRangerPolicies() throws Exception { - - if (LOG.isDebugEnabled()) { - LOG.debug("==> RangerServiceKafka.getDefaultRangerPolicies() "); - } - - List<RangerPolicy> ret = super.getDefaultRangerPolicies(); - - String authType = getConfig().get(RANGER_AUTH_TYPE,"simple"); - - if (StringUtils.equalsIgnoreCase(authType, KERBEROS_TYPE)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Auth type is " + KERBEROS_TYPE); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Auth type is " + authType); - } - for (RangerPolicy defaultPolicy : ret) { - if(defaultPolicy.getName().contains("all")){ - for (RangerPolicy.RangerPolicyItem defaultPolicyItem : defaultPolicy.getPolicyItems()) { - defaultPolicyItem.addGroup(GROUP_PUBLIC); - } - } - } - } - for (RangerPolicy defaultPolicy : ret) { - if (defaultPolicy.getName().contains("all") && StringUtils.isNotBlank(lookUpUser)) { - RangerPolicyItem policyItemForLookupUser = new RangerPolicyItem(); - policyItemForLookupUser.setUsers(Collections.singletonList(lookUpUser)); - policyItemForLookupUser.setAccesses(Collections.singletonList( - new RangerPolicyItemAccess(ACCESS_TYPE_DESCRIBE))); - policyItemForLookupUser.setDelegateAdmin(false); - defaultPolicy.addPolicyItem(policyItemForLookupUser); - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("<== RangerServiceKafka.getDefaultRangerPolicies() "); - } - return ret; - } + private static final Logger LOG = LoggerFactory.getLogger(RangerServiceKafka.class); + public static final String ACCESS_TYPE_DESCRIBE = "describe"; + + public RangerServiceKafka() { + super(); + } + + @Override + public void init(RangerServiceDef serviceDef, RangerService service) { + super.init(serviceDef, service); + } + + @Override + public Map<String, Object> validateConfig() throws Exception { + Map<String, Object> ret = new HashMap<>(); + + LOG.debug("==> RangerServiceKafka.validateConfig({})", serviceName); + + if (configs != null) { + try { + ret = ServiceKafkaConnectionMgr.connectionTest(serviceName, configs); + } catch (Exception e) { + LOG.error("<== RangerServiceKafka.validateConfig Error:{}", String.valueOf(e)); + throw e; + } + } + + LOG.debug("<== RangerServiceKafka.validateConfig({}): ret={}", serviceName, ret); + + return ret; + } + + @Override + public List<String> lookupResource(ResourceLookupContext context) throws Exception { + List<String> ret = null; + + LOG.debug("==> RangerServiceKafka.lookupResource({})", serviceName); + + if (configs != null) { + ServiceKafkaClient serviceKafkaClient = ServiceKafkaConnectionMgr.getKafkaClient(serviceName, configs); + + ret = serviceKafkaClient.getResources(context); + } + + LOG.debug("<== RangerServiceKafka.lookupResource({}): ret={}", serviceName, ret); + + return ret; + } + + @Override + public List<RangerPolicy> getDefaultRangerPolicies() throws Exception { + LOG.debug("==> RangerServiceKafka.getDefaultRangerPolicies() "); + + List<RangerPolicy> ret = super.getDefaultRangerPolicies(); + String authType = getConfig().get(RANGER_AUTH_TYPE, "simple"); + + if (StringUtils.equalsIgnoreCase(authType, KERBEROS_TYPE)) { + LOG.debug("Auth type is " + KERBEROS_TYPE); + } else { + LOG.debug("Auth type is {}", authType); + for (RangerPolicy defaultPolicy : ret) { + if (defaultPolicy.getName().contains("all")) { + for (RangerPolicy.RangerPolicyItem defaultPolicyItem : defaultPolicy.getPolicyItems()) { + defaultPolicyItem.addGroup(GROUP_PUBLIC); + } + } + } + } + + for (RangerPolicy defaultPolicy : ret) { + if (defaultPolicy.getName().contains("all") && StringUtils.isNotBlank(lookUpUser)) { + RangerPolicyItem policyItemForLookupUser = new RangerPolicyItem(); + policyItemForLookupUser.setUsers(Collections.singletonList(lookUpUser)); + policyItemForLookupUser.setAccesses(Collections.singletonList(new RangerPolicyItemAccess(ACCESS_TYPE_DESCRIBE))); + policyItemForLookupUser.setDelegateAdmin(false); + defaultPolicy.addPolicyItem(policyItemForLookupUser); + } + } + + LOG.debug("<== RangerServiceKafka.getDefaultRangerPolicies() "); + return ret; + } } diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java index 82e869511..5eb0b872b 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaClient.java @@ -19,228 +19,198 @@ package org.apache.ranger.services.kafka.client; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; - -import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.KafkaAdminClient; -import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.TopicListing; import org.apache.ranger.plugin.client.BaseClient; import org.apache.ranger.plugin.service.ResourceLookupContext; import org.apache.ranger.plugin.util.TimedEventUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ServiceKafkaClient { - private static final Logger LOG = LoggerFactory.getLogger(ServiceKafkaClient.class); - - enum RESOURCE_TYPE { - TOPIC - } - - String serviceName; - Map<String,String > configs; - private static final String errMessage = " You can still save the repository and start creating " - + "policies, but you would not be able to use autocomplete for " - + "resource names. Check server logs for more info."; - - private static final String TOPIC_KEY = "topic"; - private static final long LOOKUP_TIMEOUT_SEC = 5; - private static final String KEY_SASL_MECHANISM = "sasl.mechanism"; - private static final String KEY_SASL_JAAS_CONFIG = "sasl.jaas.config"; - private static final String KEY_KAFKA_KEYTAB = "kafka.keytab"; - private static final String KEY_KAFKA_PRINCIPAL = "kafka.principal"; - private static final String JAAS_KRB5_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; - private static final String JAAS_USE_KEYTAB = "useKeyTab=true"; - private static final String JAAS_KEYTAB = "keyTab=\""; - private static final String JAAS_STOKE_KEY = "storeKey=true"; - private static final String JAAS_SERVICE_NAME = "serviceName=kafka"; - private static final String JAAS_USER_TICKET_CACHE = "useTicketCache=false"; - private static final String JAAS_PRINCIPAL = "principal=\""; - - public ServiceKafkaClient(String serviceName, Map<String,String> configs) { - this.serviceName = serviceName; - this.configs = configs; - } - - public Map<String, Object> connectionTest() { - String errMsg = errMessage; - Map<String, Object> responseData = new HashMap<String, Object>(); - try { - getTopicList(null); - // If it doesn't throw exception, then assume the instance is - // reachable - String successMsg = "ConnectionTest Successful"; - BaseClient.generateResponseDataMap(true, successMsg, - successMsg, null, null, responseData); - } catch (Exception e) { - LOG.error("Error connecting to Kafka. kafkaClient=" + this, e); - String failureMsg = "Unable to connect to Kafka instance." - + e.getMessage(); - BaseClient.generateResponseDataMap(false, failureMsg, - failureMsg + errMsg, null, null, responseData); - } - return responseData; - } - - private List<String> getTopicList(List<String> ignoreTopicList) throws Exception { - List<String> ret = new ArrayList<String>(); - - int sessionTimeout = 5000; - int connectionTimeout = 10000; - AdminClient adminClient = null; - - try { - Properties props = new Properties(); - props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)); - props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, configs.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG)); - props.put(KEY_SASL_MECHANISM, configs.get(KEY_SASL_MECHANISM)); - props.put(KEY_SASL_JAAS_CONFIG, getJAASConfig(configs)); - props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, getIntProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, sessionTimeout)); - props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getIntProperty(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, connectionTimeout)); - adminClient = KafkaAdminClient.create(props); - ListTopicsResult listTopicsResult = adminClient.listTopics(); - if (listTopicsResult != null) { - Collection<TopicListing> topicListings = listTopicsResult.listings().get(); - for (TopicListing topicListing : topicListings) { - String topicName = topicListing.name(); - if (ignoreTopicList == null || !ignoreTopicList.contains(topicName)) { - ret.add(topicName); - } - } - } - } catch (Exception e) { - throw e; - } finally { - if (adminClient != null) { - adminClient.close(); - } - } - return ret; - } - - - - /** - * @param context - * @param context - * @return - */ - public List<String> getResources(ResourceLookupContext context) { - - String userInput = context.getUserInput(); - String resource = context.getResourceName(); - Map<String, List<String>> resourceMap = context.getResources(); - List<String> resultList = null; - List<String> topicList = null; - - RESOURCE_TYPE lookupResource = RESOURCE_TYPE.TOPIC; - - if (LOG.isDebugEnabled()) { - LOG.debug("<== getResources() UserInput: \"" + userInput - + "\" resource : " + resource + " resourceMap: " - + resourceMap); - } - - if (userInput != null && resource != null) { - if (resourceMap != null && !resourceMap.isEmpty()) { - topicList = resourceMap.get(TOPIC_KEY); - } - switch (resource.trim().toLowerCase()) { - case TOPIC_KEY: - lookupResource = RESOURCE_TYPE.TOPIC; - break; - default: - break; - } - } - - if (userInput != null) { - try { - Callable<List<String>> callableObj = null; - final String userInputFinal = userInput; - - final List<String> finalTopicList = topicList; - - if (lookupResource == RESOURCE_TYPE.TOPIC) { - // get the topic list for given Input - callableObj = new Callable<List<String>>() { - @Override - public List<String> call() { - List<String> retList = new ArrayList<String>(); - try { - List<String> list = getTopicList(finalTopicList); - if (userInputFinal != null - && !userInputFinal.isEmpty()) { - for (String value : list) { - if (value.startsWith(userInputFinal)) { - retList.add(value); - } - } - } else { - retList.addAll(list); - } - } catch (Exception ex) { - LOG.error("Error getting topic.", ex); - } - return retList; - }; - }; - } - // If we need to do lookup - if (callableObj != null) { - synchronized (this) { - resultList = TimedEventUtil.timedTask(callableObj, - LOOKUP_TIMEOUT_SEC, TimeUnit.SECONDS); - } - } - } catch (Exception e) { - LOG.error("Unable to get hive resources.", e); - } - } - - return resultList; - } - - @Override - public String toString() { - return "ServiceKafkaClient [serviceName=" + serviceName - + ", configs=" + configs + "]"; - } - - private Integer getIntProperty(String key, int defaultValue) { - if (key == null) { - return defaultValue; - } - String rtrnVal = configs.get(key); - if (rtrnVal == null) { - return defaultValue; - } - return Integer.valueOf(rtrnVal); - } - - private String getJAASConfig(Map<String,String> configs){ - String jaasConfig = new StringBuilder() - .append(JAAS_KRB5_MODULE).append(" ") - .append(JAAS_USE_KEYTAB).append(" ") - .append(JAAS_KEYTAB).append(configs.get(KEY_KAFKA_KEYTAB)).append("\"").append(" ") - .append(JAAS_STOKE_KEY).append(" ") - .append(JAAS_USER_TICKET_CACHE).append(" ") - .append(JAAS_SERVICE_NAME).append(" ") - .append(JAAS_PRINCIPAL).append(configs.get(KEY_KAFKA_PRINCIPAL)).append("\";") - .toString(); - if (LOG.isDebugEnabled()) { - LOG.debug("KafkaClient JAAS: " + jaasConfig); - } - return jaasConfig; - } +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +public class ServiceKafkaClient { + private static final Logger LOG = LoggerFactory.getLogger(ServiceKafkaClient.class); + + private static final String errMessage = " You can still save the repository and start creating policies, but you would not be able to use autocomplete for resource names. Check server logs for more info."; + private static final String TOPIC_KEY = "topic"; + private static final String KEY_SASL_MECHANISM = "sasl.mechanism"; + private static final String KEY_SASL_JAAS_CONFIG = "sasl.jaas.config"; + private static final String KEY_KAFKA_KEYTAB = "kafka.keytab"; + private static final String KEY_KAFKA_PRINCIPAL = "kafka.principal"; + private static final String JAAS_KRB5_MODULE = "com.sun.security.auth.module.Krb5LoginModule required"; + private static final String JAAS_USE_KEYTAB = "useKeyTab=true"; + private static final String JAAS_KEYTAB = "keyTab=\""; + private static final String JAAS_STOKE_KEY = "storeKey=true"; + private static final String JAAS_SERVICE_NAME = "serviceName=kafka"; + private static final String JAAS_USER_TICKET_CACHE = "useTicketCache=false"; + private static final String JAAS_PRINCIPAL = "principal=\""; + private static final long LOOKUP_TIMEOUT_SEC = 5; + + String serviceName; + Map<String, String> configs; + + public ServiceKafkaClient(String serviceName, Map<String, String> configs) { + this.serviceName = serviceName; + this.configs = configs; + } + + public Map<String, Object> connectionTest() { + String errMsg = errMessage; + Map<String, Object> responseData = new HashMap<>(); + try { + getTopicList(null); + // If it doesn't throw exception, then assume the instance is reachable + String successMsg = "ConnectionTest Successful"; + BaseClient.generateResponseDataMap(true, successMsg, successMsg, null, null, responseData); + } catch (Exception e) { + LOG.error("Error connecting to Kafka. kafkaClient = {}", this, e); + String failureMsg = "Unable to connect to Kafka instance." + e.getMessage(); + BaseClient.generateResponseDataMap(false, failureMsg, failureMsg + errMsg, null, null, responseData); + } + return responseData; + } + + public List<String> getResources(ResourceLookupContext context) { + String userInput = context.getUserInput(); + String resource = context.getResourceName(); + Map<String, List<String>> resourceMap = context.getResources(); + List<String> resultList = null; + List<String> topicList = null; + + ResourceType lookupResource = ResourceType.TOPIC; + + if (LOG.isDebugEnabled()) { + LOG.debug("<== getResources() UserInput: \"{}\" resource : {} resourceMap: {}", userInput, resource, resourceMap); + } + + if (userInput != null && resource != null) { + if (resourceMap != null && !resourceMap.isEmpty()) { + topicList = resourceMap.get(TOPIC_KEY); + } + if (resource.trim().equalsIgnoreCase(TOPIC_KEY)) { + lookupResource = ResourceType.TOPIC; + } + } + + if (userInput != null) { + try { + Callable<List<String>> callableObj = null; + final String userInputFinal = userInput; + + final List<String> finalTopicList = topicList; + + if (lookupResource == ResourceType.TOPIC) { + // get the topic list for given Input + callableObj = () -> { + List<String> retList = new ArrayList<>(); + try { + List<String> list = getTopicList(finalTopicList); + if (userInputFinal != null && !userInputFinal.isEmpty()) { + for (String value : list) { + if (value.startsWith(userInputFinal)) { + retList.add(value); + } + } + } else { + retList.addAll(list); + } + } catch (Exception ex) { + LOG.error("Error getting topic.", ex); + } + return retList; + }; + } + // If we need to do lookup + if (callableObj != null) { + synchronized (this) { + resultList = TimedEventUtil.timedTask(callableObj, LOOKUP_TIMEOUT_SEC, TimeUnit.SECONDS); + } + } + } catch (Exception e) { + LOG.error("Unable to get hive resources.", e); + } + } + + return resultList; + } + + @Override + public String toString() { + return "ServiceKafkaClient [serviceName = " + serviceName + ", configs = " + configs + "]"; + } + + private List<String> getTopicList(List<String> ignoreTopicList) throws Exception { + List<String> ret = new ArrayList<>(); + + int sessionTimeout = 5000; + int connectionTimeout = 10000; + AdminClient adminClient = null; + + try { + Properties props = new Properties(); + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)); + props.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, configs.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG)); + props.put(KEY_SASL_MECHANISM, configs.get(KEY_SASL_MECHANISM)); + props.put(KEY_SASL_JAAS_CONFIG, getJAASConfig(configs)); + props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, getIntProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, sessionTimeout)); + props.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, getIntProperty(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, connectionTimeout)); + adminClient = KafkaAdminClient.create(props); + ListTopicsResult listTopicsResult = adminClient.listTopics(); + if (listTopicsResult != null) { + Collection<TopicListing> topicListings = listTopicsResult.listings().get(); + for (TopicListing topicListing : topicListings) { + String topicName = topicListing.name(); + if (ignoreTopicList == null || !ignoreTopicList.contains(topicName)) { + ret.add(topicName); + } + } + } + } finally { + if (adminClient != null) { + adminClient.close(); + } + } + return ret; + } + + private Integer getIntProperty(String key, int defaultValue) { + if (key == null) { + return defaultValue; + } + String returnVal = configs.get(key); + if (returnVal == null) { + return defaultValue; + } + return Integer.valueOf(returnVal); + } + + private String getJAASConfig(Map<String, String> configs) { + String jaasConfig = new StringBuilder() + .append(JAAS_KRB5_MODULE).append(" ") + .append(JAAS_USE_KEYTAB).append(" ") + .append(JAAS_KEYTAB).append(configs.get(KEY_KAFKA_KEYTAB)).append("\"").append(" ") + .append(JAAS_STOKE_KEY).append(" ") + .append(JAAS_USER_TICKET_CACHE).append(" ") + .append(JAAS_SERVICE_NAME).append(" ") + .append(JAAS_PRINCIPAL).append(configs.get(KEY_KAFKA_PRINCIPAL)).append("\";") + .toString(); + + LOG.debug("KafkaClient JAAS: {}", jaasConfig); + return jaasConfig; + } + + enum ResourceType { + TOPIC + } } diff --git a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java index 60c55cc13..0e6dd073c 100644 --- a/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java +++ b/plugin-kafka/src/main/java/org/apache/ranger/services/kafka/client/ServiceKafkaConnectionMgr.java @@ -21,81 +21,78 @@ package org.apache.ranger.services.kafka.client; import org.apache.commons.lang.StringUtils; import org.apache.kafka.clients.admin.AdminClientConfig; + import java.util.Map; public class ServiceKafkaConnectionMgr { - private static final String SEPARATOR = ","; - private static final String KEY_SASL_MECHANISM = "sasl.mechanism"; - private static final String KEY_KAFKA_KEYTAB = "kafka.keytab"; - private static final String KEY_KAFKA_PRINCIPAL = "kafka.principal"; + private static final String SEPARATOR = ","; + private static final String KEY_SASL_MECHANISM = "sasl.mechanism"; + private static final String KEY_KAFKA_KEYTAB = "kafka.keytab"; + private static final String KEY_KAFKA_PRINCIPAL = "kafka.principal"; + + private ServiceKafkaConnectionMgr() { + // to block instantiation + } + + public static ServiceKafkaClient getKafkaClient(String serviceName, Map<String, String> configs) throws Exception { + String error = getServiceConfigValidationErrors(configs); - static public ServiceKafkaClient getKafkaClient(String serviceName, - Map<String, String> configs) throws Exception { - String error = getServiceConfigValidationErrors(configs); - if (StringUtils.isNotBlank(error)){ - error = "JAAS configuration missing or not correct in Ranger Kafka Service..." + error; - throw new Exception(error); - } - ServiceKafkaClient serviceKafkaClient = new ServiceKafkaClient(serviceName, configs); - return serviceKafkaClient; - } + if (StringUtils.isNotBlank(error)) { + error = "JAAS configuration missing or not correct in Ranger Kafka Service. " + error; + throw new Exception(error); + } + return new ServiceKafkaClient(serviceName, configs); + } - /** - * @param serviceName - * @param configs - * @return - */ - public static Map<String, Object> connectionTest(String serviceName, - Map<String, String> configs) throws Exception { - ServiceKafkaClient serviceKafkaClient = getKafkaClient(serviceName, - configs); - return serviceKafkaClient.connectionTest(); - } + public static Map<String, Object> connectionTest(String serviceName, Map<String, String> configs) throws Exception { + ServiceKafkaClient serviceKafkaClient = getKafkaClient(serviceName, configs); + return serviceKafkaClient.connectionTest(); + } - private static String getServiceConfigValidationErrors(Map<String, String> configs) { - StringBuilder ret = new StringBuilder(); + private static String getServiceConfigValidationErrors(Map<String, String> configs) { + StringBuilder ret = new StringBuilder(); - String bootstrap_servers = configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG); - String security_protocol = configs.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG); - String sasl_mechanism = configs.get(KEY_SASL_MECHANISM); - String kafka_keytab = configs.get(KEY_KAFKA_KEYTAB); - String kafka_principal = configs.get(KEY_KAFKA_PRINCIPAL); + String bootstrapServers = configs.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG); + String securityProtocol = configs.get(AdminClientConfig.SECURITY_PROTOCOL_CONFIG); + String saslMechanism = configs.get(KEY_SASL_MECHANISM); + String kafkaKeytab = configs.get(KEY_KAFKA_KEYTAB); + String kafkaPrincipal = configs.get(KEY_KAFKA_PRINCIPAL); - if (StringUtils.isEmpty(bootstrap_servers)) { - ret.append(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG); - } + if (StringUtils.isEmpty(bootstrapServers)) { + ret.append(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG); + } - if (StringUtils.isEmpty(security_protocol)) { - if (StringUtils.isNotBlank(ret.toString())) { - ret.append(SEPARATOR).append(AdminClientConfig.SECURITY_PROTOCOL_CONFIG); - } else { - ret.append(AdminClientConfig.SECURITY_PROTOCOL_CONFIG); - } - } + if (StringUtils.isEmpty(securityProtocol)) { + if (StringUtils.isNotBlank(ret.toString())) { + ret.append(SEPARATOR).append(AdminClientConfig.SECURITY_PROTOCOL_CONFIG); + } else { + ret.append(AdminClientConfig.SECURITY_PROTOCOL_CONFIG); + } + } - if (StringUtils.isEmpty(sasl_mechanism)) { - if (StringUtils.isNotBlank(ret.toString())) { - ret.append(SEPARATOR).append(KEY_SASL_MECHANISM); - } else { - ret.append(KEY_SASL_MECHANISM); - } - } + if (StringUtils.isEmpty(saslMechanism)) { + if (StringUtils.isNotBlank(ret.toString())) { + ret.append(SEPARATOR).append(KEY_SASL_MECHANISM); + } else { + ret.append(KEY_SASL_MECHANISM); + } + } - if (StringUtils.isEmpty(kafka_keytab)) { - if (StringUtils.isNotBlank(ret.toString())) { - ret.append(SEPARATOR).append(KEY_KAFKA_KEYTAB); - } else { - ret.append(KEY_KAFKA_KEYTAB); - } - } + if (StringUtils.isEmpty(kafkaKeytab)) { + if (StringUtils.isNotBlank(ret.toString())) { + ret.append(SEPARATOR).append(KEY_KAFKA_KEYTAB); + } else { + ret.append(KEY_KAFKA_KEYTAB); + } + } - if (StringUtils.isEmpty(kafka_principal)) { - if (StringUtils.isNotBlank(ret.toString())) { - ret.append(SEPARATOR).append(KEY_KAFKA_PRINCIPAL); - } else { - ret.append(KEY_KAFKA_PRINCIPAL); - } - } - return ret.toString(); - } + if (StringUtils.isEmpty(kafkaPrincipal)) { + if (StringUtils.isNotBlank(ret.toString())) { + ret.append(SEPARATOR).append(KEY_KAFKA_PRINCIPAL); + } else { + ret.append(KEY_KAFKA_PRINCIPAL); + } + } + return ret.toString(); + } } diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java index 4ddf75818..ae467a389 100644 --- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java +++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerGSSTest.java @@ -17,22 +17,8 @@ package org.apache.ranger.authorization.kafka.authorizer; -import java.io.File; -import java.net.ServerSocket; -import java.nio.charset.StandardCharsets; -import java.nio.file.FileSystems; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Duration; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.Future; - +import kafka.server.KafkaConfig; import kafka.server.KafkaServer; - import org.apache.commons.io.FileUtils; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingServer; @@ -49,37 +35,49 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.utils.Time; import org.apache.kerby.kerberos.kerb.server.SimpleKdcServer; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import kafka.server.KafkaConfig; import scala.Some; +import java.io.File; +import java.net.ServerSocket; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Future; + /** * A simple test that starts a Kafka broker, creates "test" and "dev" topics, * sends a message to them and consumes it. * The RangerKafkaAuthorizer enforces the following authorization rules: - * - * - The "IT" group can do anything - * - The "public" group can "read/describe/write" on the "test" topic. - * + * <p> + * - The "IT" group can do anything + * - The "public" group can "read/describe/write" on the "test" topic. + * <p> * Policies available from admin via: - * + * <p> * http://localhost:6080/service/plugins/policies/download/cl1_kafka - * + * <p> * Authentication is done via Kerberos/GSS. */ public class KafkaRangerAuthorizerGSSTest { - private final static Logger LOG = LoggerFactory.getLogger(KafkaRangerAuthorizerGSSTest.class); + private static final Logger LOG = LoggerFactory.getLogger(KafkaRangerAuthorizerGSSTest.class); - private static KafkaServer kafkaServer; - private static TestingServer zkServer; - private static int port; - private static Path tempDir; + private static KafkaServer kafkaServer; + private static TestingServer zkServer; + private static int port; + private static Path tempDir; private static SimpleKdcServer kerbyServer; @BeforeAll @@ -92,7 +90,7 @@ public class KafkaRangerAuthorizerGSSTest { configureKerby(basedir); // JAAS Config file - We need to point to the correct keytab files - Path path = FileSystems.getDefault().getPath(basedir, "/src/test/resources/kafka_kerberos.jaas"); + Path path = FileSystems.getDefault().getPath(basedir, "/src/test/resources/kafka_kerberos.jaas"); String content = new String(Files.readAllBytes(path), StandardCharsets.UTF_8); content = content.replaceAll("<basedir>", basedir); //content = content.replaceAll("zookeeper/localhost", "zookeeper/" + address); @@ -103,12 +101,12 @@ public class KafkaRangerAuthorizerGSSTest { System.setProperty("java.security.auth.login.config", path2.toString()); // Set up Zookeeper to require SASL - Map<String,Object> zookeeperProperties = new HashMap<>(); + Map<String, Object> zookeeperProperties = new HashMap<>(); zookeeperProperties.put("authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider"); zookeeperProperties.put("requireClientAuthScheme", "sasl"); zookeeperProperties.put("jaasLoginRenew", "3600000"); - InstanceSpec instanceSpec = new InstanceSpec(null, -1, -1, -1, true, 1,-1, -1, zookeeperProperties, "localhost"); + InstanceSpec instanceSpec = new InstanceSpec(null, -1, -1, -1, true, 1, -1, -1, zookeeperProperties, "localhost"); zkServer = new TestingServer(instanceSpec, true); @@ -158,39 +156,6 @@ public class KafkaRangerAuthorizerGSSTest { KafkaTestUtils.createSomeTopics(adminProps); } - private static void configureKerby(String baseDir) throws Exception { - - //System.setProperty("sun.security.krb5.debug", "true"); - System.setProperty("java.security.krb5.conf", baseDir + "/target/krb5.conf"); - - kerbyServer = new SimpleKdcServer(); - - kerbyServer.setKdcRealm("kafka.apache.org"); - kerbyServer.setAllowUdp(false); - kerbyServer.setWorkDir(new File(baseDir + "/target")); - - kerbyServer.init(); - - // Create principals - String zookeeper = "zookeeper/[email protected]"; - String kafka = "kafka/[email protected]"; - String client = "[email protected]"; - - kerbyServer.createPrincipal(zookeeper, "zookeeper"); - File keytabFile = new File(baseDir + "/target/zookeeper.keytab"); - kerbyServer.exportPrincipal(zookeeper, keytabFile); - - kerbyServer.createPrincipal(kafka, "kafka"); - keytabFile = new File(baseDir + "/target/kafka.keytab"); - kerbyServer.exportPrincipal(kafka, keytabFile); - - kerbyServer.createPrincipal(client, "client"); - keytabFile = new File(baseDir + "/target/client.keytab"); - kerbyServer.exportPrincipal(client, keytabFile); - - kerbyServer.start(); - } - @AfterAll public static void cleanup() throws Exception { if (kafkaServer != null) { @@ -262,38 +227,9 @@ public class KafkaRangerAuthorizerGSSTest { } } - private void checkTopicExists(final KafkaConsumer<String, String> consumer) { - Map<String, List<PartitionInfo>> topics = consumer.listTopics(); - while (!topics.containsKey("test")) { - LOG.warn("Required topic is not available, only {} present", topics.keySet()); - sleep(); - topics = consumer.listTopics(); - } - LOG.warn("Available topics: {}", topics.keySet()); - } - - private void sendMessage(final Producer<String, String> producer) { - // Send a message - try { - LOG.info("Send a message to 'test'"); - producer.send(new ProducerRecord<>("test", "somekey", "somevalue")); - producer.flush(); - } catch (RuntimeException e) { - LOG.error("Unable to send message to topic 'test' ", e); - } - } - - private void sleep() { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - LOG.info("Interrupted sleep, nothing important"); - } - } - // The "public" group can't write to "dev" @Test - public void testUnauthorizedWrite() throws Exception { + public void testUnauthorizedWrite() { // Create the Producer Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", "localhost:" + port); @@ -315,7 +251,6 @@ public class KafkaRangerAuthorizerGSSTest { } } - @Test public void testAuthorizedIdempotentWrite() throws Exception { // Create the Producer @@ -336,4 +271,64 @@ public class KafkaRangerAuthorizerGSSTest { record.get(); } } + + private static void configureKerby(String baseDir) throws Exception { + System.setProperty("java.security.krb5.conf", baseDir + "/target/krb5.conf"); + + kerbyServer = new SimpleKdcServer(); + + kerbyServer.setKdcRealm("kafka.apache.org"); + kerbyServer.setAllowUdp(false); + kerbyServer.setWorkDir(new File(baseDir + "/target")); + + kerbyServer.init(); + + // Create principals + String zookeeper = "zookeeper/[email protected]"; + String kafka = "kafka/[email protected]"; + String client = "[email protected]"; + + kerbyServer.createPrincipal(zookeeper, "zookeeper"); + File keytabFile = new File(baseDir + "/target/zookeeper.keytab"); + kerbyServer.exportPrincipal(zookeeper, keytabFile); + + kerbyServer.createPrincipal(kafka, "kafka"); + keytabFile = new File(baseDir + "/target/kafka.keytab"); + kerbyServer.exportPrincipal(kafka, keytabFile); + + kerbyServer.createPrincipal(client, "client"); + keytabFile = new File(baseDir + "/target/client.keytab"); + kerbyServer.exportPrincipal(client, keytabFile); + + kerbyServer.start(); + } + + private void checkTopicExists(final KafkaConsumer<String, String> consumer) { + Map<String, List<PartitionInfo>> topics = consumer.listTopics(); + while (!topics.containsKey("test")) { + LOG.warn("Required topic is not available, only {} present", topics.keySet()); + sleep(); + topics = consumer.listTopics(); + } + LOG.warn("Available topics: {}", topics.keySet()); + } + + private void sendMessage(final Producer<String, String> producer) { + // Send a message + try { + LOG.info("Send a message to 'test'"); + producer.send(new ProducerRecord<>("test", "somekey", "somevalue")); + producer.flush(); + } catch (RuntimeException e) { + LOG.error("Unable to send message to topic 'test' ", e); + } + } + + private void sleep() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.info("Interrupted sleep, nothing important"); + } + } } diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java index 4c777c7fc..1dacb988c 100644 --- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java +++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerSASLSSLTest.java @@ -17,18 +17,8 @@ package org.apache.ranger.authorization.kafka.authorizer; -import java.io.File; -import java.io.OutputStream; -import java.math.BigInteger; -import java.net.ServerSocket; -import java.nio.file.Files; -import java.nio.file.Path; -import java.security.KeyStore; -import java.time.Duration; -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.Future; - +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; import org.apache.hadoop.security.UserGroupInformation; @@ -48,38 +38,47 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; - -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; import scala.Some; +import java.io.File; +import java.io.OutputStream; +import java.math.BigInteger; +import java.net.ServerSocket; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.KeyStore; +import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.Future; + /** - * A simple test that starts a Kafka broker, creates "test" and "dev" topics, sends a message to them and consumes it. We also plug in a + * A simple test that starts a Kafka broker, creates "test" and "dev" topics, sends a message to them and consumes it. We also plug in a * CustomAuthorizer that enforces some authorization rules: - * - * - The "IT" group can do anything - * - The "public" group can "read/describe/write" on the "test" topic. - * - The "public" group can only "read/describe" on the "dev" topic, but not write. - * + * <p> + * - The "IT" group can do anything + * - The "public" group can "read/describe/write" on the "test" topic. + * - The "public" group can only "read/describe" on the "dev" topic, but not write. + * <p> * Policies available from admin via: - * + * <p> * http://localhost:6080/service/plugins/policies/download/cl1_kafka - * + * <p> * Clients and services authenticate to Kafka using the SASL SSL protocol as part of this test. */ @Disabled("Causing JVM to abort on some platforms") public class KafkaRangerAuthorizerSASLSSLTest { - private static KafkaServer kafkaServer; + private static KafkaServer kafkaServer; private static TestingServer zkServer; - private static int port; - private static String serviceKeystorePath; - private static String clientKeystorePath; - private static String truststorePath; - private static Path tempDir; + private static int port; + private static String serviceKeystorePath; + private static String clientKeystorePath; + private static String truststorePath; + private static Path tempDir; @BeforeAll public static void setup() throws Exception { - // JAAS Config file + // JAAS Config file String basedir = System.getProperty("basedir"); if (basedir == null) { basedir = new File(".").getCanonicalPath(); @@ -87,30 +86,30 @@ public class KafkaRangerAuthorizerSASLSSLTest { File f = new File(basedir + "/src/test/resources/kafka_plain.jaas"); System.setProperty("java.security.auth.login.config", f.getPath()); - - // Create keys - String serviceDN = "CN=Service,O=Apache,L=Dublin,ST=Leinster,C=IE"; - String clientDN = "CN=Client,O=Apache,L=Dublin,ST=Leinster,C=IE"; - - // Create a truststore - KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType()); - keystore.load(null, "security".toCharArray()); - - serviceKeystorePath = - KafkaTestUtils.createAndStoreKey(serviceDN, serviceDN, BigInteger.valueOf(30), - "sspass", "myservicekey", "skpass", keystore); - clientKeystorePath = - KafkaTestUtils.createAndStoreKey(clientDN, clientDN, BigInteger.valueOf(31), - "cspass", "myclientkey", "ckpass", keystore); - - File truststoreFile = File.createTempFile("kafkatruststore", ".jks"); - try (OutputStream output = Files.newOutputStream(truststoreFile.toPath())) { - keystore.store(output, "security".toCharArray()); - } - truststorePath = truststoreFile.getPath(); - + + // Create keys + String serviceDN = "CN=Service,O=Apache,L=Dublin,ST=Leinster,C=IE"; + String clientDN = "CN=Client,O=Apache,L=Dublin,ST=Leinster,C=IE"; + + // Create a truststore + KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType()); + keystore.load(null, "security".toCharArray()); + + serviceKeystorePath = + KafkaTestUtils.createAndStoreKey(serviceDN, serviceDN, BigInteger.valueOf(30), + "sspass", "myservicekey", "skpass", keystore); + clientKeystorePath = + KafkaTestUtils.createAndStoreKey(clientDN, clientDN, BigInteger.valueOf(31), + "cspass", "myclientkey", "ckpass", keystore); + + File truststoreFile = File.createTempFile("kafkatruststore", ".jks"); + try (OutputStream output = Files.newOutputStream(truststoreFile.toPath())) { + keystore.store(output, "security".toCharArray()); + } + truststorePath = truststoreFile.getPath(); + zkServer = new TestingServer(); - + // Get a random port ServerSocket serverSocket = new ServerSocket(0); port = serverSocket.getLocalPort(); @@ -144,10 +143,10 @@ public class KafkaRangerAuthorizerSASLSSLTest { // Plug in Apache Ranger authorizer props.put("authorizer.class.name", "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer"); - + // Create users for testing UserGroupInformation.createUserForTesting("alice", new String[] {"IT"}); - + KafkaConfig config = new KafkaConfig(props); kafkaServer = new KafkaServer(config, Time.SYSTEM, new Some<String>("KafkaRangerAuthorizerSASLSSLTest"), false); kafkaServer.startup(); @@ -165,7 +164,7 @@ public class KafkaRangerAuthorizerSASLSSLTest { adminProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security"); KafkaTestUtils.createSomeTopics(adminProps); } - + @AfterAll public static void cleanup() throws Exception { if (kafkaServer != null) { @@ -174,24 +173,24 @@ public class KafkaRangerAuthorizerSASLSSLTest { if (zkServer != null) { zkServer.stop(); } - + File clientKeystoreFile = new File(clientKeystorePath); if (clientKeystoreFile.exists()) { - FileUtils.forceDelete(clientKeystoreFile); + FileUtils.forceDelete(clientKeystoreFile); } File serviceKeystoreFile = new File(serviceKeystorePath); if (serviceKeystoreFile.exists()) { - FileUtils.forceDelete(serviceKeystoreFile); + FileUtils.forceDelete(serviceKeystoreFile); } File truststoreFile = new File(truststorePath); if (truststoreFile.exists()) { - FileUtils.forceDelete(truststoreFile); + FileUtils.forceDelete(truststoreFile); } if (tempDir != null) { FileUtils.deleteDirectory(tempDir.toFile()); } } - + @Test public void testAuthorizedRead() throws Exception { // Create the Producer @@ -202,7 +201,7 @@ public class KafkaRangerAuthorizerSASLSSLTest { producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); producerProps.put("sasl.mechanism", "PLAIN"); - + producerProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS"); producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, serviceKeystorePath); producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "sspass"); @@ -256,7 +255,7 @@ public class KafkaRangerAuthorizerSASLSSLTest { } } } - + @Test public void testAuthorizedWrite() throws Exception { // Create the Producer @@ -284,5 +283,4 @@ public class KafkaRangerAuthorizerSASLSSLTest { record.get(); } } - } diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java index 2ff2c1083..346682851 100644 --- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java +++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerAuthorizerTest.java @@ -17,19 +17,7 @@ package org.apache.ranger.authorization.kafka.authorizer; -import java.io.File; -import java.io.FileOutputStream; -import java.io.OutputStream; -import java.math.BigInteger; -import java.net.ServerSocket; -import java.nio.file.Files; -import java.nio.file.Path; -import java.security.KeyStore; -import java.time.Duration; -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.Future; - +import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; @@ -44,74 +32,84 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.utils.Time; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import kafka.server.KafkaConfig; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import scala.Some; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.math.BigInteger; +import java.net.ServerSocket; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.KeyStore; +import java.time.Duration; +import java.util.Arrays; +import java.util.Properties; +import java.util.concurrent.Future; + /** - * A simple test that starts a Kafka broker, creates "test" and "dev" topics, sends a message to them and consumes it. We also plug in a + * A simple test that starts a Kafka broker, creates "test" and "dev" topics, sends a message to them and consumes it. We also plug in a * CustomAuthorizer that enforces some authorization rules: - * - * - The "IT" group can do anything - * - The "public" group can "read/describe/write" on the "test" topic. - * - The "public" group can only "read/describe" on the "dev" topic, but not write. - * + * <p> + * - The "IT" group can do anything + * - The "public" group can "read/describe/write" on the "test" topic. + * - The "public" group can only "read/describe" on the "dev" topic, but not write. + * <p> * In addition we have a TAG based policy, which grants "read/describe" access to the "public" group to the "messages" topic (which is associated * with the tag called "MessagesTag". A "kafka_topic" entity was created in Apache Atlas + then associated with the "MessagesTag". This was * then imported into Ranger using the TagSyncService. The policies were then downloaded locally and saved for testing off-line. - * + * <p> * Policies available from admin via: - * + * <p> * http://localhost:6080/service/plugins/policies/download/cl1_kafka */ public class KafkaRangerAuthorizerTest { - - private static KafkaServer kafkaServer; + private static KafkaServer kafkaServer; private static TestingServer zkServer; - private static int port; - private static String serviceKeystorePath; - private static String clientKeystorePath; - private static String truststorePath; - private static Path tempDir; - + private static int port; + private static String serviceKeystorePath; + private static String clientKeystorePath; + private static String truststorePath; + private static Path tempDir; + @BeforeAll public static void setup() throws Exception { - // Create keys + // Create keys String serviceDN = "CN=localhost,O=Apache,L=Dublin,ST=Leinster,C=IE"; - String clientDN = "CN=localhost,O=Apache,L=Dublin,ST=Leinster,C=IE"; - - // Create a truststore - KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType()); - keystore.load(null, "security".toCharArray()); - - serviceKeystorePath = - KafkaTestUtils.createAndStoreKey(serviceDN, serviceDN, BigInteger.valueOf(30), - "sspass", "myservicekey", "skpass", keystore); - clientKeystorePath = - KafkaTestUtils.createAndStoreKey(clientDN, clientDN, BigInteger.valueOf(31), - "cspass", "myclientkey", "ckpass", keystore); - - File truststoreFile = File.createTempFile("kafkatruststore", ".jks"); - try (OutputStream output = new FileOutputStream(truststoreFile)) { - keystore.store(output, "security".toCharArray()); - } - truststorePath = truststoreFile.getPath(); - + String clientDN = "CN=localhost,O=Apache,L=Dublin,ST=Leinster,C=IE"; + + // Create a truststore + KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType()); + keystore.load(null, "security".toCharArray()); + + serviceKeystorePath = + KafkaTestUtils.createAndStoreKey(serviceDN, serviceDN, BigInteger.valueOf(30), + "sspass", "myservicekey", "skpass", keystore); + clientKeystorePath = + KafkaTestUtils.createAndStoreKey(clientDN, clientDN, BigInteger.valueOf(31), + "cspass", "myclientkey", "ckpass", keystore); + + File truststoreFile = File.createTempFile("kafkatruststore", ".jks"); + try (OutputStream output = new FileOutputStream(truststoreFile)) { + keystore.store(output, "security".toCharArray()); + } + truststorePath = truststoreFile.getPath(); + zkServer = new TestingServer(); - zkServer.start() ; - + zkServer.start(); + // Get a random port try (ServerSocket serverSocket = new ServerSocket(0)) { - Assertions.assertNotNull(serverSocket) ; - port = serverSocket.getLocalPort() ; - Assertions.assertTrue(port > 0) ; - } catch (java.io.IOException e) { - throw new RuntimeException("Local socket port not available", e) ; - } + Assertions.assertNotNull(serverSocket); + port = serverSocket.getLocalPort(); + Assertions.assertTrue(port > 0); + } catch (java.io.IOException e) { + throw new RuntimeException("Local socket port not available", e); + } tempDir = Files.createTempDirectory("kafka"); @@ -137,11 +135,11 @@ public class KafkaRangerAuthorizerTest { // Plug in Apache Ranger authorizer props.put("authorizer.class.name", "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer"); - + // Create users for testing UserGroupInformation.createUserForTesting(clientDN, new String[] {"public"}); UserGroupInformation.createUserForTesting(serviceDN, new String[] {"IT"}); - + KafkaConfig config = new KafkaConfig(props); kafkaServer = new KafkaServer(config, Time.SYSTEM, new Some<String>("KafkaRangerAuthorizerTest"), false); kafkaServer.startup(); @@ -158,7 +156,7 @@ public class KafkaRangerAuthorizerTest { adminProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security"); KafkaTestUtils.createSomeTopics(adminProps); } - + @AfterAll public static void cleanup() throws Exception { if (kafkaServer != null) { @@ -167,24 +165,24 @@ public class KafkaRangerAuthorizerTest { if (zkServer != null) { zkServer.stop(); } - + File clientKeystoreFile = new File(clientKeystorePath); if (clientKeystoreFile.exists()) { - FileUtils.forceDelete(clientKeystoreFile); + FileUtils.forceDelete(clientKeystoreFile); } File serviceKeystoreFile = new File(serviceKeystorePath); if (serviceKeystoreFile.exists()) { - FileUtils.forceDelete(serviceKeystoreFile); + FileUtils.forceDelete(serviceKeystoreFile); } File truststoreFile = new File(truststorePath); if (truststoreFile.exists()) { - FileUtils.forceDelete(truststoreFile); + FileUtils.forceDelete(truststoreFile); } if (tempDir != null) { FileUtils.deleteDirectory(tempDir.toFile()); } } - + // The "public" group can read from "test" @Test public void testAuthorizedRead() throws Exception { @@ -201,7 +199,7 @@ public class KafkaRangerAuthorizerTest { producerProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "skpass"); producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststorePath); producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "security"); - + // Create the Consumer Properties consumerProps = new Properties(); consumerProps.put("bootstrap.servers", "localhost:" + port); @@ -244,7 +242,7 @@ public class KafkaRangerAuthorizerTest { Assertions.assertEquals("somevalue", record.value()); } } - + // The "IT" group can write to any topic @Test public void testAuthorizedWrite() throws Exception { @@ -269,7 +267,7 @@ public class KafkaRangerAuthorizerTest { record.get(); } } - + // The "public" group can write to "test" but not "dev" @Test public void testUnauthorizedWrite() throws Exception { @@ -290,7 +288,7 @@ public class KafkaRangerAuthorizerTest { try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) { // Send a message Future<RecordMetadata> record = - producer.send(new ProducerRecord<>("test", "somekey", "somevalue")); + producer.send(new ProducerRecord<>("test", "somekey", "somevalue")); producer.flush(); record.get(); @@ -363,5 +361,4 @@ public class KafkaRangerAuthorizerTest { Assertions.assertEquals("somevalue", record.value()); } } - } diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java index 6fb8ccc9f..e1420138b 100644 --- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java +++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaRangerTopicCreationTest.java @@ -17,17 +17,8 @@ package org.apache.ranger.authorization.kafka.authorizer; -import java.io.File; -import java.net.ServerSocket; -import java.nio.charset.StandardCharsets; -import java.nio.file.FileSystems; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - +import kafka.server.KafkaConfig; +import kafka.server.KafkaServer; import org.apache.commons.io.FileUtils; import org.apache.curator.test.InstanceSpec; import org.apache.curator.test.TestingServer; @@ -46,19 +37,26 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; import scala.Some; +import java.io.File; +import java.net.ServerSocket; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; public class KafkaRangerTopicCreationTest { - private final static Logger LOG = LoggerFactory.getLogger(KafkaRangerTopicCreationTest.class); + private static final Logger LOG = LoggerFactory.getLogger(KafkaRangerTopicCreationTest.class); - private static KafkaServer kafkaServer; - private static TestingServer zkServer; - private static int port; - private static Path tempDir; + private static KafkaServer kafkaServer; + private static TestingServer zkServer; + private static int port; + private static Path tempDir; private static SimpleKdcServer kerbyServer; @BeforeAll @@ -72,7 +70,7 @@ public class KafkaRangerTopicCreationTest { configureKerby(basedir); // JAAS Config file - We need to point to the correct keytab files - Path path = FileSystems.getDefault().getPath(basedir, "/src/test/resources/kafka_kerberos.jaas"); + Path path = FileSystems.getDefault().getPath(basedir, "/src/test/resources/kafka_kerberos.jaas"); String content = new String(Files.readAllBytes(path), StandardCharsets.UTF_8); content = content.replaceAll("<basedir>", basedir); //content = content.replaceAll("zookeeper/localhost", "zookeeper/" + address); @@ -83,12 +81,12 @@ public class KafkaRangerTopicCreationTest { System.setProperty("java.security.auth.login.config", path2.toString()); // Set up Zookeeper to require SASL - Map<String,Object> zookeeperProperties = new HashMap<>(); + Map<String, Object> zookeeperProperties = new HashMap<>(); zookeeperProperties.put("authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider"); zookeeperProperties.put("requireClientAuthScheme", "sasl"); zookeeperProperties.put("jaasLoginRenew", "3600000"); - InstanceSpec instanceSpec = new InstanceSpec(null, -1, -1, -1, true, 1,-1, -1, zookeeperProperties, "localhost"); + InstanceSpec instanceSpec = new InstanceSpec(null, -1, -1, -1, true, 1, -1, -1, zookeeperProperties, "localhost"); zkServer = new TestingServer(instanceSpec, true); @@ -129,39 +127,6 @@ public class KafkaRangerTopicCreationTest { KafkaConfig config = new KafkaConfig(props); kafkaServer = new KafkaServer(config, Time.SYSTEM, new Some<String>("KafkaRangerTopicCreationTest"), false); kafkaServer.startup(); - } - - private static void configureKerby(String baseDir) throws Exception { - - //System.setProperty("sun.security.krb5.debug", "true"); - System.setProperty("java.security.krb5.conf", baseDir + "/target/krb5.conf"); - - kerbyServer = new SimpleKdcServer(); - - kerbyServer.setKdcRealm("kafka.apache.org"); - kerbyServer.setAllowUdp(false); - kerbyServer.setWorkDir(new File(baseDir + "/target")); - - kerbyServer.init(); - - // Create principals - String zookeeper = "zookeeper/[email protected]"; - String kafka = "kafka/[email protected]"; - String client = "[email protected]"; - - kerbyServer.createPrincipal(zookeeper, "zookeeper"); - File keytabFile = new File(baseDir + "/target/zookeeper.keytab"); - kerbyServer.exportPrincipal(zookeeper, keytabFile); - - kerbyServer.createPrincipal(kafka, "kafka"); - keytabFile = new File(baseDir + "/target/kafka.keytab"); - kerbyServer.exportPrincipal(kafka, keytabFile); - - kerbyServer.createPrincipal(client, "client"); - keytabFile = new File(baseDir + "/target/client.keytab"); - kerbyServer.exportPrincipal(client, keytabFile); - - kerbyServer.start(); } @AfterAll @@ -182,8 +147,8 @@ public class KafkaRangerTopicCreationTest { @Test public void testCreateTopic() throws Exception { - final String topic = "test"; - Properties properties = new Properties(); + final String topic = "test"; + Properties properties = new Properties(); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + port); properties.put("client.id", "test-consumer-id"); properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); @@ -192,10 +157,41 @@ public class KafkaRangerTopicCreationTest { result.values().get(topic).get(); for (Map.Entry<String, KafkaFuture<Void>> entry : result.values().entrySet()) { System.out.println("Create Topic : " + entry.getKey() + " " + - "isCancelled : " + entry.getValue().isCancelled() + " " + - "isCompletedExceptionally : " + entry.getValue().isCompletedExceptionally() + " " + - "isDone : " + entry.getValue().isDone()); + "isCancelled : " + entry.getValue().isCancelled() + " " + + "isCompletedExceptionally : " + entry.getValue().isCompletedExceptionally() + " " + + "isDone : " + entry.getValue().isDone()); } } } + + private static void configureKerby(String baseDir) throws Exception { + System.setProperty("java.security.krb5.conf", baseDir + "/target/krb5.conf"); + + kerbyServer = new SimpleKdcServer(); + + kerbyServer.setKdcRealm("kafka.apache.org"); + kerbyServer.setAllowUdp(false); + kerbyServer.setWorkDir(new File(baseDir + "/target")); + + kerbyServer.init(); + + // Create principals + String zookeeper = "zookeeper/[email protected]"; + String kafka = "kafka/[email protected]"; + String client = "[email protected]"; + + kerbyServer.createPrincipal(zookeeper, "zookeeper"); + File keytabFile = new File(baseDir + "/target/zookeeper.keytab"); + kerbyServer.exportPrincipal(zookeeper, keytabFile); + + kerbyServer.createPrincipal(kafka, "kafka"); + keytabFile = new File(baseDir + "/target/kafka.keytab"); + kerbyServer.exportPrincipal(kafka, keytabFile); + + kerbyServer.createPrincipal(client, "client"); + keytabFile = new File(baseDir + "/target/client.keytab"); + kerbyServer.exportPrincipal(client, keytabFile); + + kerbyServer.start(); + } } diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java index 70e62f8f7..b15d4ca3d 100644 --- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java +++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/KafkaTestUtils.java @@ -17,6 +17,16 @@ package org.apache.ranger.authorization.kafka.authorizer; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.NewTopic; +import org.bouncycastle.asn1.x500.X500Name; +import org.bouncycastle.asn1.x500.style.RFC4519Style; +import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo; +import org.bouncycastle.cert.X509v3CertificateBuilder; +import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; +import org.bouncycastle.operator.ContentSigner; +import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; + import java.io.File; import java.io.OutputStream; import java.math.BigInteger; @@ -31,60 +41,47 @@ import java.util.Arrays; import java.util.Date; import java.util.Properties; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.NewTopic; -import org.bouncycastle.asn1.x500.X500Name; -import org.bouncycastle.asn1.x500.style.RFC4519Style; -import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo; -import org.bouncycastle.cert.X509v3CertificateBuilder; -import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; -import org.bouncycastle.operator.ContentSigner; -import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; - public final class KafkaTestUtils { - - public static String createAndStoreKey(String subjectName, String issuerName, BigInteger serial, String keystorePassword, - String keystoreAlias, String keyPassword, KeyStore trustStore) throws Exception { - - // Create KeyPair - KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA"); - keyPairGenerator.initialize(2048, new SecureRandom()); - KeyPair keyPair = keyPairGenerator.generateKeyPair(); - - Date currentDate = new Date(); - Date expiryDate = new Date(currentDate.getTime() + 365L * 24L * 60L * 60L * 1000L); - - // Create X509Certificate - X509v3CertificateBuilder certBuilder = - new X509v3CertificateBuilder(new X500Name(RFC4519Style.INSTANCE, issuerName), serial, currentDate, expiryDate, - new X500Name(RFC4519Style.INSTANCE, subjectName), SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded())); - ContentSigner contentSigner = new JcaContentSignerBuilder("SHA256WithRSAEncryption").build(keyPair.getPrivate()); - X509Certificate certificate = new JcaX509CertificateConverter().getCertificate(certBuilder.build(contentSigner)); - - // Store Private Key + Certificate in Keystore - KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType()); - keystore.load(null, keystorePassword.toCharArray()); - keystore.setKeyEntry(keystoreAlias, keyPair.getPrivate(), keyPassword.toCharArray(), new Certificate[] {certificate}); - - File keystoreFile = File.createTempFile("kafkakeystore", ".jks"); - - try (OutputStream output = Files.newOutputStream(keystoreFile.toPath())) { - keystore.store(output, keystorePassword.toCharArray()); - } - - // Now store the Certificate in the truststore - trustStore.setCertificateEntry(keystoreAlias, certificate); - - return keystoreFile.getPath(); - + private KafkaTestUtils(){ + // to block instantiation } - static void createSomeTopics(Properties adminProps) { - try (AdminClient adminClient = AdminClient.create(adminProps)) { - adminClient.createTopics(Arrays.asList( - new NewTopic("test", 1, (short) 1), - new NewTopic("dev", 1, (short) 1) - )); - } - } + public static String createAndStoreKey(String subjectName, String issuerName, BigInteger serial, String keystorePassword, String keystoreAlias, String keyPassword, KeyStore trustStore) throws Exception { + // Create KeyPair + KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA"); + keyPairGenerator.initialize(2048, new SecureRandom()); + KeyPair keyPair = keyPairGenerator.generateKeyPair(); + + Date currentDate = new Date(); + Date expiryDate = new Date(currentDate.getTime() + 365L * 24L * 60L * 60L * 1000L); + + // Create X509Certificate + X509v3CertificateBuilder certBuilder = + new X509v3CertificateBuilder(new X500Name(RFC4519Style.INSTANCE, issuerName), serial, currentDate, expiryDate, + new X500Name(RFC4519Style.INSTANCE, subjectName), SubjectPublicKeyInfo.getInstance(keyPair.getPublic().getEncoded())); + ContentSigner contentSigner = new JcaContentSignerBuilder("SHA256WithRSAEncryption").build(keyPair.getPrivate()); + X509Certificate certificate = new JcaX509CertificateConverter().getCertificate(certBuilder.build(contentSigner)); + + // Store Private Key + Certificate in Keystore + KeyStore keystore = KeyStore.getInstance(KeyStore.getDefaultType()); + keystore.load(null, keystorePassword.toCharArray()); + keystore.setKeyEntry(keystoreAlias, keyPair.getPrivate(), keyPassword.toCharArray(), new Certificate[] {certificate}); + + File keystoreFile = File.createTempFile("kafkakeystore", ".jks"); + + try (OutputStream output = Files.newOutputStream(keystoreFile.toPath())) { + keystore.store(output, keystorePassword.toCharArray()); + } + + // Now store the Certificate in the truststore + trustStore.setCertificateEntry(keystoreAlias, certificate); + + return keystoreFile.getPath(); + } + + static void createSomeTopics(Properties adminProps) { + try (AdminClient adminClient = AdminClient.create(adminProps)) { + adminClient.createTopics(Arrays.asList(new NewTopic("test", 1, (short) 1), new NewTopic("dev", 1, (short) 1))); + } + } } diff --git a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/RangerAdminClientImpl.java b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/RangerAdminClientImpl.java index 9117c6457..3693b2308 100644 --- a/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/RangerAdminClientImpl.java +++ b/plugin-kafka/src/test/java/org/apache/ranger/authorization/kafka/authorizer/RangerAdminClientImpl.java @@ -17,50 +17,48 @@ package org.apache.ranger.authorization.kafka.authorizer; +import org.apache.ranger.admin.client.AbstractRangerAdminClient; +import org.apache.ranger.plugin.util.ServicePolicies; +import org.apache.ranger.plugin.util.ServiceTags; + import java.io.File; import java.nio.file.FileSystems; import java.nio.file.Files; import java.util.List; -import org.apache.ranger.admin.client.AbstractRangerAdminClient; -import org.apache.ranger.plugin.util.ServicePolicies; -import org.apache.ranger.plugin.util.ServiceTags; - /** * A test implementation of the RangerAdminClient interface that just reads policies in from a file and returns them */ public class RangerAdminClientImpl extends AbstractRangerAdminClient { - private final static String cacheFilename = "kafka-policies.json"; - private final static String tagFilename = "kafka-policies-tag.json"; + private static final String cacheFilename = "kafka-policies.json"; + private static final String tagFilename = "kafka-policies-tag.json"; public ServicePolicies getServicePoliciesIfUpdated(long lastKnownVersion, long lastActivationTimeInMillis) throws Exception { - String basedir = System.getProperty("basedir"); if (basedir == null) { basedir = new File(".").getCanonicalPath(); } - java.nio.file.Path cachePath = FileSystems.getDefault().getPath(basedir, "/src/test/resources/" + cacheFilename); - byte[] cacheBytes = Files.readAllBytes(cachePath); + java.nio.file.Path cachePath = FileSystems.getDefault().getPath(basedir, "/src/test/resources/" + cacheFilename); + byte[] cacheBytes = Files.readAllBytes(cachePath); return gson.fromJson(new String(cacheBytes), ServicePolicies.class); } public ServiceTags getServiceTagsIfUpdated(long lastKnownVersion, long lastActivationTimeInMillis) throws Exception { String basedir = System.getProperty("basedir"); + if (basedir == null) { basedir = new File(".").getCanonicalPath(); } - java.nio.file.Path cachePath = FileSystems.getDefault().getPath(basedir, "/src/test/resources/" + tagFilename); - byte[] cacheBytes = Files.readAllBytes(cachePath); + java.nio.file.Path cachePath = FileSystems.getDefault().getPath(basedir, "/src/test/resources/" + tagFilename); + byte[] cacheBytes = Files.readAllBytes(cachePath); return gson.fromJson(new String(cacheBytes), ServiceTags.class); } - public List<String> getTagTypes(String tagTypePattern) throws Exception { + public List<String> getTagTypes(String tagTypePattern) { return null; } - - -} \ No newline at end of file +}
