Repository: metron Updated Branches: refs/heads/master a6b5eddd1 -> fd4a6d164
METRON-1291 Kafka produce REST endpoint does not work in a Kerberized cluster (merrimanr) closes apache/metron#826 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/fd4a6d16 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/fd4a6d16 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/fd4a6d16 Branch: refs/heads/master Commit: fd4a6d16407476366afe339342c2ed0d6d88faf9 Parents: a6b5edd Author: merrimanr <[email protected]> Authored: Fri Nov 17 08:47:46 2017 -0600 Committer: merrimanr <[email protected]> Committed: Fri Nov 17 08:47:46 2017 -0600 ---------------------------------------------------------------------- .../apache/metron/rest/config/KafkaConfig.java | 5 +- .../src/main/resources/application.yml | 2 + .../metron/rest/config/KafkaConfigTest.java | 75 ++++++++++++++++++++ 3 files changed, 81 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/fd4a6d16/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java index a2abbeb..a15c48f 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java @@ -86,7 +86,7 @@ public class KafkaConfig { props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); if (environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)) { - props.put("security.protocol", "SASL_PLAINTEXT"); + props.put("security.protocol", environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)); } return props; } @@ -108,6 +108,9 @@ public class KafkaConfig { producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producerConfig.put("request.required.acks", 1); + if (environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)) { + producerConfig.put("security.protocol", environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)); + } return producerConfig; } http://git-wip-us.apache.org/repos/asf/metron/blob/fd4a6d16/metron-interface/metron-rest/src/main/resources/application.yml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/resources/application.yml b/metron-interface/metron-rest/src/main/resources/application.yml index 452a91f..426effa 100644 --- a/metron-interface/metron-rest/src/main/resources/application.yml +++ b/metron-interface/metron-rest/src/main/resources/application.yml @@ -35,6 +35,8 @@ zookeeper: connection: 10000 kafka: + security: + protocol: SASL_PLAINTEXT topics: escalation: escalation http://git-wip-us.apache.org/repos/asf/metron/blob/fd4a6d16/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/KafkaConfigTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/KafkaConfigTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/KafkaConfigTest.java new file mode 100644 index 0000000..dab924f --- /dev/null +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/KafkaConfigTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.rest.config; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Map; +import org.apache.metron.rest.MetronRestConstants; +import org.junit.Before; +import org.junit.Test; +import org.springframework.core.env.Environment; + +public class KafkaConfigTest { + + private Environment environment; + private KafkaConfig kafkaConfig; + + @Before + public void setUp() throws Exception { + environment = mock(Environment.class); + kafkaConfig = new KafkaConfig(environment); + } + + @Test + public void kafkaConfigShouldProperlyReturnConsumerProperties() throws Exception { + when(environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY)).thenReturn("broker urls"); + when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(false); + + Map<String, Object> consumerProperties = kafkaConfig.consumerProperties(); + assertEquals("broker urls", consumerProperties.get("bootstrap.servers")); + assertNull(consumerProperties.get("security.protocol")); + + when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(true); + when(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)).thenReturn("kafka security protocol"); + + consumerProperties = kafkaConfig.consumerProperties(); + assertEquals("kafka security protocol", consumerProperties.get("security.protocol")); + } + + @Test + public void kafkaConfigShouldProperlyReturnProducerProperties() throws Exception { + when(environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY)).thenReturn("broker urls"); + when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(false); + + Map<String, Object> producerProperties = kafkaConfig.producerProperties(); + assertEquals("broker urls", producerProperties.get("bootstrap.servers")); + assertNull(producerProperties.get("security.protocol")); + + when(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)).thenReturn(true); + when(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)).thenReturn("kafka security protocol"); + + producerProperties = kafkaConfig.consumerProperties(); + assertEquals("kafka security protocol", producerProperties.get("security.protocol")); + } + + +}
