Repository: incubator-sentry Updated Branches: refs/heads/kafka b675b9a53 -> 0a9fb9f8d
SENTRY-1014: Add end-to-end tests for Kafka (Ashish K Singh, Reviewed by: Hao Hao, Anne Yu and Dapeng Sun) Change-Id: I4398811da2f80e56ab9fa6ae7a9967e4c22d0558 Project: http://git-wip-us.apache.org/repos/asf/incubator-sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-sentry/commit/0a9fb9f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-sentry/tree/0a9fb9f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-sentry/diff/0a9fb9f8 Branch: refs/heads/kafka Commit: 0a9fb9f8d7174654392e945e87fed546fc77ae06 Parents: b675b9a Author: hahao <hao....@cloudera.com> Authored: Wed Mar 9 23:24:15 2016 -0800 Committer: hahao <hao....@cloudera.com> Committed: Wed Mar 9 23:24:15 2016 -0800 ---------------------------------------------------------------------- .../sentry/tests/e2e/kafka/KafkaTestServer.java | 4 +- .../sentry/tests/e2e/kafka/TestAuthorize.java | 297 +++++++++++++++++++ 2 files changed, 299 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0a9fb9f8/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java index 129191a..85e7d21 100644 --- a/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java +++ b/sentry-tests/sentry-tests-kafka/src/main/java/org/apache/sentry/tests/e2e/kafka/KafkaTestServer.java @@ -118,7 +118,7 @@ public class KafkaTestServer { } } - public String getBootstrapServers() { - return "localhost:" + kafkaPort; + public String getBootstrapServers() throws UnknownHostException { + return InetAddress.getLocalHost().getHostAddress() + ":" + kafkaPort; } } http://git-wip-us.apache.org/repos/asf/incubator-sentry/blob/0a9fb9f8/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java new file mode 100644 index 0000000..a5cd3da --- /dev/null +++ b/sentry-tests/sentry-tests-kafka/src/test/java/org/apache/sentry/tests/e2e/kafka/TestAuthorize.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sentry.tests.e2e.kafka; + +import com.google.common.collect.Sets; +import junit.framework.Assert; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.sentry.core.model.kafka.Cluster; +import org.apache.sentry.core.model.kafka.ConsumerGroup; +import org.apache.sentry.core.model.kafka.KafkaActionConstant; +import org.apache.sentry.core.model.kafka.Host; +import org.apache.sentry.core.model.kafka.Topic; +import org.apache.sentry.provider.db.generic.service.thrift.SentryGenericServiceClient; +import org.apache.sentry.provider.db.generic.service.thrift.TAuthorizable; +import org.apache.sentry.provider.db.generic.service.thrift.TSentryPrivilege; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; + +public class TestAuthorize extends AbstractKafkaSentryTestBase { + private static final Logger LOGGER = LoggerFactory.getLogger(TestAuthorize.class); + + @Test + public void testProduceConsumeForSuperuser() { + try { + final String SuperuserName = "test"; + testProduce(SuperuserName); + testConsume(SuperuserName); + } catch (Exception ex) { + Assert.fail("Superuser must have been allowed to perform any and all actions. \nException: \n" + ex); + } + } + + @Test + public void testProduceConsumeCycle() throws Exception { + final String localhost = InetAddress.getLocalHost().getHostAddress(); + + // START TESTING PRODUCER + try { + testProduce("user1"); + Assert.fail("user1 must not have been authorized to create topic t1."); + } catch (ExecutionException ex) { + assertCausedMessage(ex, "Not authorized to access topics: [t1]"); + } + + final String role = StaticUserGroupRole.ROLE_1; + final String group = StaticUserGroupRole.GROUP_1; + + // Allow HOST=localhost->Topic=t1->action=describe + ArrayList<TAuthorizable> authorizables = new ArrayList<TAuthorizable>(); + Host host = new Host(localhost); + authorizables.add(new TAuthorizable(host.getTypeName(), host.getName())); + Topic topic = new Topic("t1"); + authorizables.add(new TAuthorizable(topic.getTypeName(), topic.getName())); + addPermissions(role, group, KafkaActionConstant.DESCRIBE, authorizables); + try { + testProduce("user1"); + Assert.fail("user1 must not have been authorized to create topic t1."); + } catch (ExecutionException ex) { + assertCausedMessage(ex, "Not authorized to access topics: [t1]"); + } + + // Allow HOST=localhost->Cluster=kafka-cluster->action=create + authorizables = new ArrayList<TAuthorizable>(); + authorizables.add(new TAuthorizable(host.getTypeName(), host.getName())); + Cluster cluster = new Cluster(); + authorizables.add(new TAuthorizable(cluster.getTypeName(), cluster.getName())); + addPermissions(role, group, KafkaActionConstant.CREATE, authorizables); + try { + testProduce("user1"); + Assert.fail("user1 must not have been authorized to create topic t1."); + } catch (ExecutionException ex) { + assertCausedMessage(ex, "Not authorized to access topics: [t1]"); + } + + // Allow HOST=localhost->Topic=t1->action=write + authorizables = new ArrayList<TAuthorizable>(); + authorizables.add(new TAuthorizable(host.getTypeName(), host.getName())); + authorizables.add(new TAuthorizable(topic.getTypeName(), topic.getName())); + addPermissions(role, group, KafkaActionConstant.WRITE, authorizables); + try { + testProduce("user1"); + } catch (Exception ex) { + Assert.fail("user1 should have been able to successfully produce to topic t1. \n Exception: " + ex); + } + + // START TESTING CONSUMER + try { + testConsume("user1"); + Assert.fail("user1 must not have been authorized to describe consumer group sentrykafkaconsumer."); + } catch (Exception ex) { + assertCausedMessage(ex, "Not authorized to access group: sentrykafkaconsumer"); + } + + // HOST=localhost->Group=SentryKafkaConsumer->action=describe + authorizables = new ArrayList<TAuthorizable>(); + authorizables.add(new TAuthorizable(host.getTypeName(), host.getName())); + ConsumerGroup consumerGroup = new ConsumerGroup("sentrykafkaconsumer"); + authorizables.add(new TAuthorizable(consumerGroup.getTypeName(), consumerGroup.getName())); + addPermissions(role, group, KafkaActionConstant.DESCRIBE, authorizables); + try { + testConsume("user1"); + Assert.fail("user1 must not have been authorized to read consumer group sentrykafkaconsumer."); + } catch (Exception ex) { + assertCausedMessage(ex, "Not authorized to access group: sentrykafkaconsumer"); + } + + // HOST=localhost->Group=SentryKafkaConsumer->action=read + authorizables = new ArrayList<TAuthorizable>(); + authorizables.add(new TAuthorizable(host.getTypeName(), host.getName())); + authorizables.add(new TAuthorizable(consumerGroup.getTypeName(), consumerGroup.getName())); + addPermissions(role, group, KafkaActionConstant.READ, authorizables); + try { + testConsume("user1"); + Assert.fail("user1 must not have been authorized to read from topic t1."); + } catch (Exception ex) { + assertCausedMessage(ex, "Not authorized to access topics: [t1]"); + } + + // HOST=localhost->Topic=t1->action=read + authorizables = new ArrayList<TAuthorizable>(); + authorizables.add(new TAuthorizable(host.getTypeName(), host.getName())); + authorizables.add(new TAuthorizable(topic.getTypeName(), topic.getName())); + addPermissions(role, group, KafkaActionConstant.READ, authorizables); + testConsume("user1"); + } + + private void addPermissions(String role, String group, String action, ArrayList<TAuthorizable> authorizables) throws Exception { + SentryGenericServiceClient sentryClient = getSentryClient(); + try { + sentryClient.createRoleIfNotExist(ADMIN_USER, role, COMPONENT); + sentryClient.addRoleToGroups(ADMIN_USER, role, COMPONENT, Sets.newHashSet(group)); + + sentryClient.grantPrivilege(ADMIN_USER, role, COMPONENT, + new TSentryPrivilege(COMPONENT, "kafka", authorizables, + action)); + } finally { + if (sentryClient != null) { + sentryClient.close(); + sentryClient = null; + } + } + } + + private void testProduce(String producerUser) throws Exception { + final KafkaProducer<String, String> kafkaProducer = createKafkaProducer(producerUser); + try { + final String topic = "t1"; + final String msg = "message1"; + ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, msg); + kafkaProducer.send(producerRecord).get(); + LOGGER.debug("Sent message: " + producerRecord); + } finally { + kafkaProducer.close(); + } + } + + private void testConsume(String consumerUser) throws Exception { + final KafkaConsumer<String, String> kafkaConsumer = createKafkaConsumer(consumerUser); + try { + final String topic = "t1"; + final String msg = "message1"; + kafkaConsumer.subscribe(Collections.singletonList(topic), new CustomRebalanceListener(kafkaConsumer)); + waitTillTrue("Did not receive expected message.", 60, 2, new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); + if (records.isEmpty()) + LOGGER.debug("No record received from consumer."); + for (ConsumerRecord<String, String> record : records) { + if (record.value().equals(msg)) { + LOGGER.debug("Received message: " + record); + return true; + } + } + return false; + } + }); + } finally { + kafkaConsumer.close(); + } + } + + private KafkaProducer<String, String> createKafkaProducer(String user) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "SentryKafkaProducer"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS"); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KafkaTestServer.class.getResource("/" + user + ".keystore.jks").getPath()); + props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, user + "-ks-passwd"); + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, user + "-key-passwd"); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, KafkaTestServer.class.getResource("/" + user + ".truststore.jks").getPath()); + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, user + "-ts-passwd"); + + return new KafkaProducer<String, String>(props); + } + + private KafkaConsumer<String, String> createKafkaConsumer(String user) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "sentrykafkaconsumer"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); + props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); + props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS"); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, KafkaTestServer.class.getResource("/" + user + ".keystore.jks").getPath()); + props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, user + "-ks-passwd"); + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, user + "-key-passwd"); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, KafkaTestServer.class.getResource("/" + user + ".truststore.jks").getPath()); + props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, user + "-ts-passwd"); + + return new KafkaConsumer<String, String>(props); + } + + /** + * Wait for a condition to succeed up to specified time. + * + * @param failureMessage Message to be displayed on failure. + * @param maxWaitTime Max waiting time for success in seconds. + * @param loopInterval Wait time between checks in seconds. + * @param testFunc Check to be performed for success, should return boolean. + * @throws Exception + */ + private void waitTillTrue( + String failureMessage, long maxWaitTime, long loopInterval, Callable<Boolean> testFunc) + throws Exception { + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime <= maxWaitTime * 1000L) { + if (testFunc.call()) { + return; // Success + } + Thread.sleep(loopInterval * 1000L); + } + + Assert.fail(failureMessage); + } + + class CustomRebalanceListener implements ConsumerRebalanceListener { + + KafkaConsumer consumer = null; + + CustomRebalanceListener(KafkaConsumer kafkaConsumer) { + consumer = kafkaConsumer; + } + + @Override + public void onPartitionsRevoked(Collection<TopicPartition> collection) { + + } + + @Override + public void onPartitionsAssigned(Collection<TopicPartition> collection) { + for (TopicPartition tp : collection) { + consumer.seekToBeginning(tp); + } + } + } +} \ No newline at end of file