METRON-1427: Add support for storm 1.1 and hdp 2.6 (cstella via mmiklavc) 
closes apache/metron#907


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/644e951c
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/644e951c
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/644e951c

Branch: refs/heads/feature/METRON-1344-test-infrastructure
Commit: 644e951c8b2c33b5e52602a814d91012c3b325b1
Parents: 0874571
Author: cstella <ceste...@gmail.com>
Authored: Tue Jan 30 10:54:57 2018 -0700
Committer: Michael Miklavcic <michael.miklav...@gmail.com>
Committed: Tue Jan 30 10:54:57 2018 -0700

----------------------------------------------------------------------
 metron-analytics/metron-profiler/pom.xml        |  4 +-
 .../roles/ambari_common/defaults/main.yml       |  4 +-
 .../roles/ambari_config/defaults/main.yml       |  2 +-
 .../roles/ambari_config/vars/single_node_vm.yml |  7 ++
 .../roles/ambari_config/vars/small_cluster.yml  |  7 ++
 .../roles/ambari_gather_facts/defaults/main.yml | 19 ++++++
 .../roles/ambari_gather_facts/tasks/main.yml    | 67 ++++++++++++--------
 .../metron-mpack/src/main/resources/mpack.json  | 14 ++++
 .../apache/metron/rest/config/KafkaConfig.java  |  8 ++-
 .../rest/service/impl/StormCLIWrapper.java      |  3 +-
 .../apache/metron/common/utils/KafkaUtils.java  | 27 ++++++++
 metron-platform/metron-elasticsearch/pom.xml    |  4 +-
 .../parsers/topology/ParserTopologyBuilder.java |  3 +-
 metron-platform/metron-solr/pom.xml             | 12 +++-
 .../kafka/flux/SimpleStormKafkaBuilder.java     |  2 +
 .../apache/metron/writer/kafka/KafkaWriter.java |  1 +
 16 files changed, 144 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-analytics/metron-profiler/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler/pom.xml 
b/metron-analytics/metron-profiler/pom.xml
index d634cef..4d36782 100644
--- a/metron-analytics/metron-profiler/pom.xml
+++ b/metron-analytics/metron-profiler/pom.xml
@@ -341,8 +341,8 @@
                                     
<shadedPattern>org.apache.metron.guava.metron-profiler</shadedPattern>
                                 </relocation>
                                 <relocation>
-                                    
<pattern>com.fasterxml.jackson.core</pattern>
-                                    
<shadedPattern>com.fasterxml.jackson.core.metron.elasticsearch</shadedPattern>
+                                    <pattern>com.fasterxml.jackson</pattern>
+                                    
<shadedPattern>org.apache.metron.jackson</shadedPattern>
                                 </relocation>
                             </relocations>
                             <artifactSet>

http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-deployment/ansible/roles/ambari_common/defaults/main.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/ansible/roles/ambari_common/defaults/main.yml 
b/metron-deployment/ansible/roles/ambari_common/defaults/main.yml
index c04400e..0614e0f 100644
--- a/metron-deployment/ansible/roles/ambari_common/defaults/main.yml
+++ b/metron-deployment/ansible/roles/ambari_common/defaults/main.yml
@@ -17,7 +17,7 @@
 ---
 hadoop_logrotate_frequency: daily
 hadoop_logrotate_retention: 30
-centos_ambari_install_url: 
http://public-repo-1.hortonworks.com/ambari/centos6/2.x/updates/2.4.2.0/ambari.repo
-ubuntu_ambari_repo: 
http://public-repo-1.hortonworks.com/ambari/ubuntu14/2.x/updates/2.4.2.0
+centos_ambari_install_url: 
http://public-repo-1.hortonworks.com/ambari/centos6/2.x/updates/2.5.2.0/ambari.repo
+ubuntu_ambari_repo: 
http://public-repo-1.hortonworks.com/ambari/ubuntu14/2.x/updates/2.5.2.0
 ubuntu_elasticsearch_packages_repo: 
https://artifacts.elastic.co/packages/5.x/apt
 ubuntu_elasticsearch_curator_repo: https://packages.elastic.co/curator/5/debian

http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-deployment/ansible/roles/ambari_config/defaults/main.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/ansible/roles/ambari_config/defaults/main.yml 
b/metron-deployment/ansible/roles/ambari_config/defaults/main.yml
index e0de145..ad7ca9e 100644
--- a/metron-deployment/ansible/roles/ambari_config/defaults/main.yml
+++ b/metron-deployment/ansible/roles/ambari_config/defaults/main.yml
@@ -34,5 +34,5 @@ mapred_reduce_java_opts : -Xmx1024m
 mapred_map_mem_mb : 1229
 mapred_reduce_mem_mb : 1229
 topology_classpath: '/etc/hbase/conf:/etc/hadoop/conf'
-hdp_stack: "2.5"
+hdp_stack: "2.6"
 elasticsearch_network_interface: _site_

http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-deployment/ansible/roles/ambari_config/vars/single_node_vm.yml
----------------------------------------------------------------------
diff --git 
a/metron-deployment/ansible/roles/ambari_config/vars/single_node_vm.yml 
b/metron-deployment/ansible/roles/ambari_config/vars/single_node_vm.yml
index 6a60902..bf54fe0 100644
--- a/metron-deployment/ansible/roles/ambari_config/vars/single_node_vm.yml
+++ b/metron-deployment/ansible/roles/ambari_config/vars/single_node_vm.yml
@@ -87,6 +87,13 @@ configurations:
       supervisor.slots.ports: "[6700, 6701, 6702, 6703, 6704, 6705]"
       storm.local.dir: '{{ storm_local_dir }}'
       topology.classpath: '{{ topology_classpath }}'
+      # Storm expects ambari metrics to be available in 2.6.  We do *not* 
install ambari metrics in full-dev, so we need to revert to the old consumer
+      storm.cluster.metrics.consumer.register: '[{"class": 
"org.apache.storm.metric.LoggingMetricsConsumer"}]'
+      topology.metrics.consumer.register: '[{"class": 
"org.apache.storm.metric.LoggingMetricsConsumer", "parallelism.hint": 1, 
"whitelist": ["kafkaOffset\\..+/", "__complete-latency", "__process-latency", 
"__receive\\.population$", "__sendqueue\\.population$", "__execute-count", 
"__emit-count", "__ack-count", "__fail-count", "memory/heap\\.usedBytes$", 
"memory/nonHeap\\.usedBytes$", "GC/.+\\.count$", "GC/.+\\.timeMs$"]}]'
+      # Storm expects ambari metrics to be available in 2.6 and ambari metrics 
pulls data via JMX, but since we don't use ambari metrics here, we don't have 
the javaagent around to use and thus that must be removed from nimbus, 
supervisor and worker properties 
+      nimbus.childopts: '-Xmx1024m _JAAS_PLACEHOLDER'
+      supervisor.childopts: '-Xmx256m _JAAS_PLACEHOLDER'
+      worker.childopts: "-Xmx768m _JAAS_PLACEHOLDER"
   - kafka-env:
       content: "{% raw %}\n#!/bin/bash\n\n# Set KAFKA specific environment 
variables here.\n\n# The java implementation to use.\nexport 
KAFKA_HEAP_OPTS=\"-Xms256M -Xmx256M\"\nexport 
KAFKA_JVM_PERFORMANCE_OPTS=\"-server -XX:+UseG1GC -XX:+DisableExplicitGC 
-Djava.awt.headless=true\"\nexport JAVA_HOME={{java64_home}}\nexport 
PATH=$PATH:$JAVA_HOME/bin\nexport PID_DIR={{kafka_pid_dir}}\nexport 
LOG_DIR={{kafka_log_dir}}\nexport 
KAFKA_KERBEROS_PARAMS={{kafka_kerberos_params}}\n# Add kafka sink to classpath 
and related depenencies\nif [ -e 
\"/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar\" ]; then\n  
export 
CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/ambari-metrics-kafka-sink.jar\n
  export CLASSPATH=$CLASSPATH:/usr/lib/ambari-metrics-kafka-sink/lib/*\nfi\nif 
[ -f /etc/kafka/conf/kafka-ranger-env.sh ]; then\n   . 
/etc/kafka/conf/kafka-ranger-env.sh\nfi{% endraw %}"
   - kafka-broker:

http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-deployment/ansible/roles/ambari_config/vars/small_cluster.yml
----------------------------------------------------------------------
diff --git 
a/metron-deployment/ansible/roles/ambari_config/vars/small_cluster.yml 
b/metron-deployment/ansible/roles/ambari_config/vars/small_cluster.yml
index 4ec8458..218e267 100644
--- a/metron-deployment/ansible/roles/ambari_config/vars/small_cluster.yml
+++ b/metron-deployment/ansible/roles/ambari_config/vars/small_cluster.yml
@@ -85,6 +85,13 @@ configurations:
       supervisor.slots.ports: "[6700, 6701, 6702, 6703, 6704, 6705]"
       storm.local.dir: '{{ storm_local_dir | default("/hadoop/storm") }}'
       topology.classpath: '{{ topology_classpath }}'
+      # Storm expects ambari metrics to be available in 2.6.  We do *not* 
install ambari metrics in full-dev, so we need to revert to the old consumer
+      storm.cluster.metrics.consumer.register: '[{"class": 
"org.apache.storm.metric.LoggingMetricsConsumer"}]'
+      topology.metrics.consumer.register: '[{"class": 
"org.apache.storm.metric.LoggingMetricsConsumer", "parallelism.hint": 1, 
"whitelist": ["kafkaOffset\\..+/", "__complete-latency", "__process-latency", 
"__receive\\.population$", "__sendqueue\\.population$", "__execute-count", 
"__emit-count", "__ack-count", "__fail-count", "memory/heap\\.usedBytes$", 
"memory/nonHeap\\.usedBytes$", "GC/.+\\.count$", "GC/.+\\.timeMs$"]}]'
+      # Storm expects ambari metrics to be available in 2.6 and ambari metrics 
pulls data via JMX, but since we don't use ambari metrics here, we don't have 
the javaagent around to use and thus that must be removed from nimbus, 
supervisor and worker properties 
+      nimbus.childopts: '-Xmx1024m _JAAS_PLACEHOLDER'
+      supervisor.childopts: '-Xmx256m _JAAS_PLACEHOLDER'
+      worker.childopts: "-Xmx768m _JAAS_PLACEHOLDER"
   - kafka-broker:
       log.dirs: '{{ kafka_log_dirs | default("/kafka-log") }}'
   - metron-rest-env:

http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-deployment/ansible/roles/ambari_gather_facts/defaults/main.yml
----------------------------------------------------------------------
diff --git 
a/metron-deployment/ansible/roles/ambari_gather_facts/defaults/main.yml 
b/metron-deployment/ansible/roles/ambari_gather_facts/defaults/main.yml
new file mode 100644
index 0000000..5351a60
--- /dev/null
+++ b/metron-deployment/ansible/roles/ambari_gather_facts/defaults/main.yml
@@ -0,0 +1,19 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+curl: "curl -s -u {{ ambari_user }}:{{ ambari_password }} -X GET -H 
\"X-Requested-By: ambari\""
+parse_json: "import sys, json; print json.load(sys.stdin)"

http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-deployment/ansible/roles/ambari_gather_facts/tasks/main.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/ansible/roles/ambari_gather_facts/tasks/main.yml 
b/metron-deployment/ansible/roles/ambari_gather_facts/tasks/main.yml
index 2b37eec..25f0982 100644
--- a/metron-deployment/ansible/roles/ambari_gather_facts/tasks/main.yml
+++ b/metron-deployment/ansible/roles/ambari_gather_facts/tasks/main.yml
@@ -32,55 +32,55 @@
     cluster_name: "{{ (cluster_name_response.content | 
from_json)['items'][0].Clusters.cluster_name }}"
   when: cluster_name is undefined
 
+- set_fact:
+    base_url: "http://{{ groups.ambari_master[0] }}:{{ ambari_port 
}}/api/v1/clusters/{{ cluster_name }}"
+
 #
 # namenode_host
 #
 - name: "Ask Ambari: namenode_host"
-  uri:
-    url: "http://{{ groups.ambari_master[0] }}:{{ ambari_port 
}}/api/v1/clusters/{{ cluster_name }}/services/HDFS/components/NAMENODE"
-    user: "{{ ambari_user }}"
-    password: "{{ ambari_password }}"
-    force_basic_auth: yes
-    return_content: yes
-  register: namenode_hosts_response
+  shell: >
+    {{ curl }} '{{ base_url }}/services/HDFS/components/NAMENODE' \
+      | python -c '{{ parse_json 
}}["host_components"][0]["HostRoles"]["host_name"]'
+  args:
+    warn: false
+  register: namenode_host_response
   when: namenode_host is undefined
 
 - set_fact:
-    namenode_host: "{{ (namenode_hosts_response.content | 
from_json).host_components[0].HostRoles.host_name }}"
+    namenode_host: "{{ namenode_host_response.stdout_lines[0] }}"
   when: namenode_host is undefined
 
 #
 # core_site_tag
 #
 - name: "Ask Ambari: core_site_tag"
-  uri:
-    url: "http://{{ groups.ambari_master[0] }}:{{ ambari_port 
}}/api/v1/clusters/{{ cluster_name }}/hosts/{{ namenode_host 
}}/host_components/NAMENODE"
-    user: "{{ ambari_user }}"
-    password: "{{ ambari_password }}"
-    force_basic_auth: yes
-    return_content: yes
+  shell: >
+    {{ curl }} '{{ base_url }}/hosts/{{ namenode_host 
}}/host_components/NAMENODE' \
+      | python -c '{{ parse_json 
}}["HostRoles"]["actual_configs"]["core-site"]["default"]'
+  args:
+    warn: false
   register: core_site_tag_response
   when: core_site_tag is undefined
 
 - set_fact:
-    core_site_tag: "{{ (core_site_tag_response.content | 
from_json).HostRoles.actual_configs['core-site'].default }}"
+    core_site_tag: "{{ core_site_tag_response.stdout_lines[0] }}"
   when: core_site_tag is undefined
 
 #
 # hdfs_url
 #
 - name: "Ask Ambari: hdfs_url"
-  uri:
-    url: "http://{{ groups.ambari_master[0] }}:{{ ambari_port 
}}/api/v1/clusters/{{ cluster_name }}/configurations?type=core-site&tag={{ 
core_site_tag }}"
-    user: "{{ ambari_user }}"
-    password: "{{ ambari_password }}"
-    force_basic_auth: yes
-    return_content: yes
-  register: core_site_response
+  shell: >
+    {{ curl }} '{{ base_url }}/configurations?type=core-site&tag={{ 
core_site_tag }}' \
+      | python -c '{{ parse_json }}["items"][0]["properties"]["fs.defaultFS"]'
+  args:
+    warn: false
+  register: hdfs_url_response
   when: hdfs_url is undefined
 
 - set_fact:
-    hdfs_url: "{{ (core_site_response.content | 
from_json)['items'][0].properties['fs.defaultFS'] }}"
+    hdfs_url: "{{ hdfs_url_response.stdout_lines[0] }}"
   when: hdfs_url is undefined
 
 #
@@ -88,7 +88,7 @@
 #
 - name: "Ask Ambari: kafka_broker_hosts"
   uri:
-    url: "http://{{ groups.ambari_master[0] }}:{{ ambari_port 
}}/api/v1/clusters/{{ cluster_name }}/services/KAFKA/components/KAFKA_BROKER"
+    url: "{{ base_url }}/services/KAFKA/components/KAFKA_BROKER"
     user: "{{ ambari_user }}"
     password: "{{ ambari_password }}"
     force_basic_auth: yes
@@ -105,7 +105,7 @@
 #
 - name: "Ask Ambari: kafka_broker_tag"
   uri:
-    url: "http://{{ groups.ambari_master[0] }}:{{ ambari_port 
}}/api/v1/clusters/{{ cluster_name }}/hosts/{{ kafka_broker_hosts[0] 
}}/host_components/KAFKA_BROKER"
+    url: "{{ base_url }}/hosts/{{ kafka_broker_hosts[0] 
}}/host_components/KAFKA_BROKER"
     user: "{{ ambari_user }}"
     password: "{{ ambari_password }}"
     force_basic_auth: yes
@@ -122,7 +122,8 @@
 #
 - name: "Ask Ambari: kafka_broker_port"
   shell: >
-    curl -s -u {{ ambari_user }}:{{ ambari_password }} -X GET -H 
"X-Requested-By: ambari" "http://{{ groups.ambari_master[0] }}:{{ ambari_port 
}}/api/v1/clusters/{{ cluster_name }}/configurations?type=kafka-broker&tag={{ 
kafka_broker_tag }}" | python -c 'import sys, json; print 
json.load(sys.stdin)["items"][0]["properties"]["listeners"]'
+    {{ curl }} '{{ base_url }}/configurations?type=kafka-broker&tag={{ 
kafka_broker_tag }}' \
+      | python -c '{{ parse_json }}["items"][0]["properties"]["listeners"]'
   args:
     warn: false
   register: kafka_broker_port_response
@@ -191,6 +192,9 @@
     zookeeper_url: "{% for host in zookeeper_hosts %}{% if loop.index != 1 
%},{% endif %}{{ host }}:{{ zookeeper_port }}{% endfor %}"
   when: zookeeper_url is undefined
 
+#
+# metron_hosts
+#
 - name: "Ask Ambari: metron_hosts"
   uri:
     url: "http://{{ groups.ambari_master[0] }}:{{ ambari_port 
}}/api/v1/clusters/{{ cluster_name 
}}/services/METRON/components/METRON_INDEXING"
@@ -205,6 +209,9 @@
     metron_hosts: "{{ (metron_hosts_response.content | 
from_json).host_components | map(attribute='HostRoles.host_name') | list }}"
   when: metron_hosts is undefined
 
+#
+# kibana hosts
+#
 - name: "Ask Ambari: kibana_hosts"
   uri:
     url: "http://{{ groups.ambari_master[0] }}:{{ ambari_port 
}}/api/v1/clusters/{{ cluster_name }}/services/KIBANA/components/KIBANA_MASTER"
@@ -225,10 +232,14 @@
 #
 - name: debug
   debug:
-    msg: "zookeeper_port = {{ zookeeper_port }},
+    msg: "cluster_name = {{ cluster_name }},
+          namenode_host = {{ namenode_host }},
+          hdfs_url = {{ hdfs_url }},
+          zookeeper_port = {{ zookeeper_port }},
           zookeeper_hosts = {{ zookeeper_hosts }},
           zookeeper_url = {{ zookeeper_url }},
           kafka_broker_port = {{ kafka_broker_port }},
           kafka_broker_hosts = {{ kafka_broker_hosts }},
           kafka_broker_url = {{ kafka_broker_url }},
-          metron_hosts = {{ metron_hosts }}"
+          metron_hosts = {{ metron_hosts }},
+          kibana_hosts = {{ kibana_hosts }}"

http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/mpack.json
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/mpack.json 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/mpack.json
index 7a9d892..3946881 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/mpack.json
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/mpack.json
@@ -38,7 +38,12 @@
             {
               "stack_name" : "HDP",
               "stack_version" : "2.5"
+            },
+            {
+              "stack_name" : "HDP",
+              "stack_version" : "2.6"
             }
+
           ]
         },
         {
@@ -56,8 +61,13 @@
             {
               "stack_name" : "HDP",
               "stack_version" : "2.5"
+            },
+            {
+              "stack_name" : "HDP",
+              "stack_version" : "2.6"
             }
 
+
           ]
         },
         {
@@ -75,6 +85,10 @@
             {
               "stack_name" : "HDP",
               "stack_version" : "2.5"
+            },
+            {
+              "stack_name" : "HDP",
+              "stack_version" : "2.6"
             }
 
           ]

http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
index a15c48f..7e9b468 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java
@@ -22,6 +22,8 @@ import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.metron.common.utils.KafkaUtils;
 import org.apache.metron.rest.MetronRestConstants;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
@@ -86,7 +88,7 @@ public class KafkaConfig {
     props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
     if 
(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, 
Boolean.class, false)) {
-      props.put("security.protocol", 
environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY));
+      props.put("security.protocol", 
KafkaUtils.INSTANCE.normalizeProtocol(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)));
     }
     return props;
   }
@@ -109,11 +111,13 @@ public class KafkaConfig {
     producerConfig.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
     producerConfig.put("request.required.acks", 1);
     if 
(environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, 
Boolean.class, false)) {
-      producerConfig.put("security.protocol", 
environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY));
+      producerConfig.put("security.protocol", 
KafkaUtils.INSTANCE.normalizeProtocol(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)));
     }
     return producerConfig;
   }
 
+
+
   @Bean
   public KafkaProducer kafkaProducer() {
     return new KafkaProducer<>(producerProperties());

http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
index 463c925..fff7390 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormCLIWrapper.java
@@ -18,6 +18,7 @@
 package org.apache.metron.rest.service.impl;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.metron.common.utils.KafkaUtils;
 import org.apache.metron.rest.MetronRestConstants;
 import org.apache.metron.rest.RestException;
 import org.slf4j.Logger;
@@ -117,7 +118,7 @@ public class StormCLIWrapper {
 
     // kafka security protocol
     command.add( "-ksp");
-    command.add( 
environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY));
+    
command.add(KafkaUtils.INSTANCE.normalizeProtocol(environment.getProperty(MetronRestConstants.KAFKA_SECURITY_PROTOCOL_SPRING_PROPERTY)));
 
     // extra topology options
     boolean kerberosEnabled = 
environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, 
Boolean.class, false);

http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
index d54e2b8..796bc42 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KafkaUtils.java
@@ -25,6 +25,7 @@ import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.kafka.common.protocol.SecurityProtocol;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -32,6 +33,7 @@ import java.util.Map;
 
 public enum KafkaUtils {
   INSTANCE;
+  public static final String SECURITY_PROTOCOL = "security.protocol";
   public List<String> getBrokersFromZookeeper(String zkQuorum) throws 
Exception {
     RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
     CuratorFramework framework = CuratorFrameworkFactory.newClient(zkQuorum, 
retryPolicy);
@@ -66,6 +68,31 @@ public enum KafkaUtils {
     return ret;
   }
 
+  public Map<String, Object> normalizeProtocol(Map<String, Object> configs) {
+    if(configs.containsKey(SECURITY_PROTOCOL)) {
+      String protocol = 
normalizeProtocol((String)configs.get(SECURITY_PROTOCOL));
+      configs.put(SECURITY_PROTOCOL, protocol);
+    }
+    return configs;
+  }
+
+  public String normalizeProtocol(String protocol) {
+    if(protocol.equalsIgnoreCase("PLAINTEXTSASL") || 
protocol.equalsIgnoreCase("SASL_PLAINTEXT")) {
+      if(SecurityProtocol.getNames().contains("PLAINTEXTSASL")) {
+        return "PLAINTEXTSASL";
+      }
+      else if(SecurityProtocol.getNames().contains("SASL_PLAINTEXT")) {
+        return "SASL_PLAINTEXT";
+      }
+      else {
+        throw new IllegalStateException("Unable to find the appropriate SASL 
protocol, " +
+                "viable options are: " + 
Joiner.on(",").join(SecurityProtocol.getNames()));
+      }
+    }
+    else {
+      return protocol.trim();
+    }
+  }
   /*
   The URL accepted is NOT a general URL, and is assumed to follow the format 
used by the Kafka structures in Zookeeper.
   See: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper

http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-platform/metron-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/pom.xml 
b/metron-platform/metron-elasticsearch/pom.xml
index 97f4062..141d8aa 100644
--- a/metron-platform/metron-elasticsearch/pom.xml
+++ b/metron-platform/metron-elasticsearch/pom.xml
@@ -264,8 +264,8 @@
                                     
<shadedPattern>org.apache.metron.guava.metron-elasticsearch</shadedPattern>
                                 </relocation>
                                 <relocation>
-                                    
<pattern>com.fasterxml.jackson.core</pattern>
-                                    
<shadedPattern>com.fasterxml.jackson.core.metron.elasticsearch</shadedPattern>
+                                    <pattern>com.fasterxml.jackson</pattern>
+                                    
<shadedPattern>org.apache.metron.jackson</shadedPattern>
                                 </relocation>
                             </relocations>
                             <artifactSet>

http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index c918703..1039e56 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -18,6 +18,7 @@
 package org.apache.metron.parsers.topology;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.metron.common.utils.KafkaUtils;
 import org.apache.metron.parsers.topology.config.ValueSupplier;
 import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
 import org.apache.metron.storm.kafka.flux.SpoutConfiguration;
@@ -163,7 +164,7 @@ public class ParserTopologyBuilder {
             , inputTopic + "_parser"
     );
     if(securityProtocol.isPresent()) {
-      kafkaSpoutConfigOptions.putIfAbsent("security.protocol", 
securityProtocol.get());
+      kafkaSpoutConfigOptions.putIfAbsent("security.protocol", 
KafkaUtils.INSTANCE.normalizeProtocol(securityProtocol.get()));
     }
     return SimpleStormKafkaBuilder.create( inputTopic
                                          , zkQuorum

http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-platform/metron-solr/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/pom.xml 
b/metron-platform/metron-solr/pom.xml
index a2eee71..9c9c7fb 100644
--- a/metron-platform/metron-solr/pom.xml
+++ b/metron-platform/metron-solr/pom.xml
@@ -261,7 +261,17 @@
                                   <exclude>META-INF/*.RSA</exclude>
                                 </excludes>
                               </filter>
-                            </filters> 
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.google.common</pattern>
+                                    
<shadedPattern>org.apache.metron.guava</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.fasterxml.jackson</pattern>
+                                    
<shadedPattern>org.apache.metron.jackson</shadedPattern>
+                                </relocation>
+                            </relocations>
                             <artifactSet>
                                 <excludes>
                                     <exclude>storm:storm-core:*</exclude>

http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
index 1bcee9a..f99e549 100644
--- 
a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
+++ 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java
@@ -203,6 +203,8 @@ public class SimpleStormKafkaBuilder<K, V> extends 
KafkaSpoutConfig.Builder<K, V
          , 
createDeserializer(Optional.ofNullable((String)kafkaProps.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)),
 DEFAULT_DESERIALIZER)
          , subscription
     );
+
+    kafkaProps = KafkaUtils.INSTANCE.normalizeProtocol(kafkaProps);
     setProp(kafkaProps);
     setRecordTranslator(new 
SpoutRecordTranslator<>(FieldsConfiguration.toList(fieldsConfiguration)));
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/644e951c/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
index 7ce9b9b..f73e0f4 100644
--- 
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
+++ 
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
@@ -158,6 +158,7 @@ public class KafkaWriter extends AbstractWriter implements 
MessageWriter<JSONObj
     producerConfig.put("value.serializer", valueSerializer);
     producerConfig.put("request.required.acks", requiredAcks);
     producerConfig.putAll(producerConfigs == null?new 
HashMap<>():producerConfigs);
+    producerConfig = KafkaUtils.INSTANCE.normalizeProtocol(producerConfig);
     return producerConfig;
   }
 

Reply via email to