METRON-976 KafkaUtils doesn't handle SASL_PLAINTEXT (justinleet via leet) closes apache/metron#600
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/61cbab46 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/61cbab46 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/61cbab46 Branch: refs/heads/Metron_0.4.0 Commit: 61cbab46e3e262e006e59c28eaa9192d0f7aaaab Parents: 5b72da7 Author: justinleet <[email protected]> Authored: Thu Jun 8 08:47:27 2017 -0400 Committer: YOUR NAME as In Apache <[email protected]> Committed: Thu Jun 8 08:47:27 2017 -0400 ---------------------------------------------------------------------- .../apache/metron/common/utils/KafkaUtils.java | 15 +- .../common/utils/KafkaUtilsEndpointTest.java | 64 -------- .../metron/common/utils/KafkaUtilsTest.java | 162 +++++++++++++++++++ 3 files changed, 173 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/61cbab46/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java index 04c1389..bbd8b30 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java @@ -44,6 +44,7 @@ public enum KafkaUtils { framework.close(); } } + public List<String> getBrokersFromZookeeper(CuratorFramework client) throws Exception { List<String> ret = new ArrayList<>(); for(String id : client.getChildren().forPath("/brokers/ids")) { @@ -68,12 +69,18 @@ public enum KafkaUtils { return ret; } - public List<String> fromEndpoint(String url) throws URISyntaxException { + /* + The URL accepted is NOT a general URL, and is assumed to follow the format used by the Kafka structures in Zookeeper. + See: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper + */ + List<String> fromEndpoint(String url){ List<String> ret = new ArrayList<>(); if(url != null) { - URI uri = new URI(url); - int port = uri.getPort(); - ret.add(uri.getHost() + ((port > 0)?(":" + port):"")); + Iterable<String> splits = Splitter.on("//").split(url); + if(Iterables.size(splits) == 2) { + String hostPort = Iterables.getLast(splits); + ret.add(hostPort); + } } return ret; } http://git-wip-us.apache.org/repos/asf/metron/blob/61cbab46/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java deleted file mode 100644 index 14a9e41..0000000 --- a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsEndpointTest.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.common.utils; - -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -@RunWith(Parameterized.class) -public class KafkaUtilsEndpointTest { - static String[] hostnames = new String[] { "node1", "localhost", "192.168.0.1", "my.domain.com" }; - static String[] schemes = new String[] { "SSL", "PLAINTEXTSASL", "PLAINTEXT"}; - static String[] ports = new String[] { "6667", "9091", null}; - private String endpoint; - private String expected; - - public KafkaUtilsEndpointTest(String endpoint, String expected) { - this.endpoint = endpoint; - this.expected = expected; - } - - @Parameterized.Parameters - public static Collection<Object[]> data() { - List<Object[]> ret = new ArrayList<>(); - for(String scheme : schemes) { - for(String hostname : hostnames) { - for(String port : ports) { - port = port != null?(":" + port):""; - String expected = hostname + port; - ret.add(new Object[]{scheme + "://" + expected, expected }); - } - } - } - return ret; - } - - @Test - public void testEndpointParsing() throws URISyntaxException { - Assert.assertEquals(expected, KafkaUtils.INSTANCE.fromEndpoint(endpoint).get(0)); - } - -} http://git-wip-us.apache.org/repos/asf/metron/blob/61cbab46/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsTest.java new file mode 100644 index 0000000..72ac51e --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/utils/KafkaUtilsTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.common.utils; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; + +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.adrianwalker.multilinestring.Multiline; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.GetChildrenBuilder; +import org.apache.curator.framework.api.GetDataBuilder; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(Enclosed.class) +public class KafkaUtilsTest { + @RunWith(MockitoJUnitRunner.class) + public static class ZkMockedUtils { + @Mock + CuratorFramework client; + @Mock + GetChildrenBuilder childrenBuilder; + @Mock + GetDataBuilder dataBuilder; + + /** + * { + * "host": "192.168.1.148", + * "port": 9092 + * } + */ + @Multiline + public static String brokerWithHostPort; + + @Test + public void testGetEndpointsFromZookeeperHostPort() throws Exception { + ArrayList<String> brokerIds = new ArrayList<>(); + brokerIds.add("1"); + + when(client.getChildren()).thenReturn(childrenBuilder); + when(childrenBuilder.forPath("/brokers/ids")).thenReturn(brokerIds); + when(client.getData()).thenReturn(dataBuilder); + when(dataBuilder.forPath("/brokers/ids/1")).thenReturn(brokerWithHostPort.getBytes()); + + ArrayList<String> expected = new ArrayList<>(); + expected.add("192.168.1.148:9092"); + assertEquals(expected, (KafkaUtils.INSTANCE.getBrokersFromZookeeper(client))); + } + + /** + * { + * "endpoints": ["PLAINTEXT://host1:9092", "SSL://host1:9093", "SASL_PLAINTEXT://host1:9094", "PLAINTEXTSASL://host1:9095"] + * } + */ + @Multiline + public static String brokerWithEndpoints; + + @Test + public void testGetEndpointsFromZookeeperEndpoints() throws Exception { + ArrayList<String> brokerIds = new ArrayList<>(); + brokerIds.add("1"); + + when(client.getChildren()).thenReturn(childrenBuilder); + when(childrenBuilder.forPath("/brokers/ids")).thenReturn(brokerIds); + when(client.getData()).thenReturn(dataBuilder); + when(dataBuilder.forPath("/brokers/ids/1")).thenReturn(brokerWithEndpoints.getBytes()); + + ArrayList<String> expected = new ArrayList<>(); + expected.add("host1:9092"); + expected.add("host1:9093"); + expected.add("host1:9094"); + expected.add("host1:9095"); + assertEquals(expected, (KafkaUtils.INSTANCE.getBrokersFromZookeeper(client))); + } + + /** + * { + * "host": "192.168.1.148", + * "port": 9092, + * "endpoints": ["PLAINTEXT://host1:9092", "SSL://host1:9093"] + * } + */ + @Multiline + public static String brokerWithHostPortAndEndpoints; + + @Test + public void testGetEndpointsFromZookeeperHostPortAndEndpoints() throws Exception { + ArrayList<String> brokerIds = new ArrayList<>(); + brokerIds.add("1"); + + when(client.getChildren()).thenReturn(childrenBuilder); + when(childrenBuilder.forPath("/brokers/ids")).thenReturn(brokerIds); + when(client.getData()).thenReturn(dataBuilder); + when(dataBuilder.forPath("/brokers/ids/1")) + .thenReturn(brokerWithHostPortAndEndpoints.getBytes()); + + ArrayList<String> expected = new ArrayList<>(); + expected.add("192.168.1.148:9092"); + assertEquals(expected, (KafkaUtils.INSTANCE.getBrokersFromZookeeper(client))); + } + } + + @RunWith(Parameterized.class) + public static class ParameterizedEndPointParsing { + static String[] hostnames = new String[]{"node1", "localhost", "192.168.0.1", "my.domain.com"}; + static String[] schemes = new String[]{"SSL", "PLAINTEXTSASL", "PLAINTEXT", "SASL_PLAINTEXT"}; + static String[] ports = new String[]{"6667", "9091", null}; + + private String endpoint; + private String expected; + + public ParameterizedEndPointParsing(String endpoint, String expected) { + this.endpoint = endpoint; + this.expected = expected; + } + + @Parameters(name = "{index}:endpoint({0}={1})") + public static Collection<Object[]> data() { + List<Object[]> ret = new ArrayList<>(); + for (String scheme : schemes) { + for (String hostname : hostnames) { + for (String port : ports) { + port = port != null ? (":" + port) : ""; + String expected = hostname + port; + ret.add(new Object[]{scheme + "://" + expected, expected}); + } + } + } + return ret; + } + + @Test + public void testEndpointParsing() throws URISyntaxException { + assertEquals(expected, KafkaUtils.INSTANCE.fromEndpoint(endpoint).get(0)); + } + } +}
