Repository: incubator-metron
Updated Branches:
  refs/heads/master c13ee8265 -> 98dc7659a


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/Callback.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/Callback.java
 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/Callback.java
new file mode 100644
index 0000000..8e9622c
--- /dev/null
+++ 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/Callback.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface Callback extends AutoCloseable, Serializable {
+    List<Object> apply(List<Object> tuple, EmitContext context);
+    void initialize(EmitContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackCollector.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackCollector.java
 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackCollector.java
new file mode 100644
index 0000000..57f9f2d
--- /dev/null
+++ 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackCollector.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.kafka.spout.KafkaSpoutMessageId;
+import org.apache.storm.spout.SpoutOutputCollector;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class CallbackCollector extends SpoutOutputCollector implements 
Serializable {
+    static final long serialVersionUID = 0xDEADBEEFL;
+    Callback _callback;
+    SpoutOutputCollector _delegate;
+    EmitContext _context;
+    public CallbackCollector(Callback callback, SpoutOutputCollector 
collector, EmitContext context) {
+        super(collector);
+        this._callback = callback;
+        this._delegate = collector;
+        this._context = context;
+    }
+
+
+    public static int getPartition(Object messageIdObj) {
+        KafkaSpoutMessageId messageId = (KafkaSpoutMessageId)messageIdObj;
+        return messageId.getTopicPartition().partition();
+    }
+
+    /**
+     * Emits a new tuple to the specified output stream with the given message 
ID.
+     * When Storm detects that this tuple has been fully processed, or has 
failed
+     * to be fully processed, the spout will receive an ack or fail callback 
respectively
+     * with the messageId as long as the messageId was not null. If the 
messageId was null,
+     * Storm will not track the tuple and no callback will be received. The 
emitted values must be
+     * immutable.
+     *
+     * @param streamId
+     * @param tuple
+     * @param messageId
+     * @return the list of task ids that this tuple was sent to
+     */
+    @Override
+    public List<Integer> emit(String streamId, List<Object> tuple, Object 
messageId) {
+        List<Object> t = _callback.apply(tuple, 
_context.cloneContext().with(EmitContext.Type.PARTITION, 
getPartition(messageId))
+                                                                       
.with(EmitContext.Type.STREAM_ID, streamId)
+                                        );
+        return _delegate.emit(streamId, t, messageId);
+    }
+
+    /**
+     * Emits a new tuple to the default output stream with the given message 
ID.
+     * When Storm detects that this tuple has been fully processed, or has 
failed
+     * to be fully processed, the spout will receive an ack or fail callback 
respectively
+     * with the messageId as long as the messageId was not null. If the 
messageId was null,
+     * Storm will not track the tuple and no callback will be received. The 
emitted values must be
+     * immutable.
+     *
+     * @param tuple
+     * @param messageId
+     * @return the list of task ids that this tuple was sent to
+     */
+    @Override
+    public List<Integer> emit(List<Object> tuple, Object messageId) {
+        List<Object> t = _callback.apply(tuple, 
_context.cloneContext().with(EmitContext.Type.PARTITION, 
getPartition(messageId)));
+        return _delegate.emit(t, messageId);
+    }
+
+    /**
+     * Emits a tuple to the default output stream with a null message id. 
Storm will
+     * not track this message so ack and fail will never be called for this 
tuple. The
+     * emitted values must be immutable.
+     *
+     * @param tuple
+     */
+    @Override
+    public List<Integer> emit(List<Object> tuple) {
+        List<Object> t = _callback.apply(tuple, _context.cloneContext());
+        return _delegate.emit(t);
+    }
+
+    /**
+     * Emits a tuple to the specified output stream with a null message id. 
Storm will
+     * not track this message so ack and fail will never be called for this 
tuple. The
+     * emitted values must be immutable.
+     *
+     * @param streamId
+     * @param tuple
+     */
+    @Override
+    public List<Integer> emit(String streamId, List<Object> tuple) {
+        List<Object> t = _callback.apply(tuple, 
_context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId));
+        return _delegate.emit(streamId, t);
+    }
+
+    /**
+     * Emits a tuple to the specified task on the specified output stream. 
This output
+     * stream must have been declared as a direct stream, and the specified 
task must
+     * use a direct grouping on this stream to receive the message. The 
emitted values must be
+     * immutable.
+     *
+     * @param taskId
+     * @param streamId
+     * @param tuple
+     * @param messageId
+     */
+    @Override
+    public void emitDirect(int taskId, String streamId, List<Object> tuple, 
Object messageId) {
+        List<Object> t = _callback.apply(tuple, 
_context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)
+                                                                       
.with(EmitContext.Type.PARTITION, getPartition(messageId))
+                                                                       
.with(EmitContext.Type.TASK_ID, taskId)
+                                        );
+        _delegate.emitDirect(taskId, streamId, t, messageId);
+    }
+
+    /**
+     * Emits a tuple to the specified task on the default output stream. This 
output
+     * stream must have been declared as a direct stream, and the specified 
task must
+     * use a direct grouping on this stream to receive the message. The 
emitted values must be
+     * immutable.
+     *
+     * @param taskId
+     * @param tuple
+     * @param messageId
+     */
+    @Override
+    public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
+        List<Object> t = _callback.apply(tuple, 
_context.cloneContext().with(EmitContext.Type.PARTITION, 
getPartition(messageId))
+                                                                       
.with(EmitContext.Type.TASK_ID, taskId)
+                       );
+        _delegate.emitDirect(taskId, t, messageId);
+    }
+
+    /**
+     * Emits a tuple to the specified task on the specified output stream. 
This output
+     * stream must have been declared as a direct stream, and the specified 
task must
+     * use a direct grouping on this stream to receive the message. The 
emitted values must be
+     * immutable.
+     *
+     * <p> Because no message id is specified, Storm will not track this 
message
+     * so ack and fail will never be called for this tuple.
+     *
+     * @param taskId
+     * @param streamId
+     * @param tuple
+     */
+    @Override
+    public void emitDirect(int taskId, String streamId, List<Object> tuple) {
+        List<Object> t = _callback.apply(tuple, 
_context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)
+                                                                       
.with(EmitContext.Type.TASK_ID, taskId)
+                       );
+        _delegate.emitDirect(taskId, streamId, t);
+    }
+
+    /**
+     * Emits a tuple to the specified task on the default output stream. This 
output
+     * stream must have been declared as a direct stream, and the specified 
task must
+     * use a direct grouping on this stream to receive the message. The 
emitted values must be
+     * immutable.
+     *
+     * <p> Because no message id is specified, Storm will not track this 
message
+     * so ack and fail will never be called for this tuple.
+     *
+     * @param taskId
+     * @param tuple
+     */
+    @Override
+    public void emitDirect(int taskId, List<Object> tuple) {
+
+        List<Object> t = _callback.apply(tuple, 
_context.cloneContext().with(EmitContext.Type.TASK_ID, taskId));
+        _delegate.emitDirect(taskId, t);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
new file mode 100644
index 0000000..8592e13
--- /dev/null
+++ 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+
+import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder;
+import org.apache.metron.storm.kafka.flux.StormKafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.*;
+
+/**
+ * A kafka spout with a callback that is executed on message commit.
+ * @param <K> The Kafka key type
+ * @param <V> The Kafka value type
+ */
+public class CallbackKafkaSpout<K, V> extends StormKafkaSpout<K, V> {
+  static final long serialVersionUID = 0xDEADBEEFL;
+  Class<? extends Callback> callbackClazz;
+  Callback _callback;
+  EmitContext _context;
+  public CallbackKafkaSpout(SimpleStormKafkaBuilder<K, V> spoutConfig, String 
callbackClass) {
+    this(spoutConfig, toCallbackClass(callbackClass));
+  }
+
+  public CallbackKafkaSpout(SimpleStormKafkaBuilder<K, V> spoutConf, Class<? 
extends Callback> callback) {
+    super(spoutConf);
+    callbackClazz = callback;
+  }
+
+  public void initialize(TopologyContext context) {
+    _callback = createCallback(callbackClazz);
+    _context = new EmitContext().with(EmitContext.Type.SPOUT_CONFIG, 
_spoutConfig)
+            .with(EmitContext.Type.UUID, context.getStormId())
+            .with(EmitContext.Type.TOPIC, _topic);
+    _callback.initialize(_context);
+  }
+
+
+  private static Class<? extends Callback> toCallbackClass(String 
callbackClass)  {
+    try{
+      return (Class<? extends Callback>) Callback.class.forName(callbackClass);
+    }
+    catch (ClassNotFoundException e) {
+      throw new RuntimeException(callbackClass + " not found", e);
+    }
+  }
+
+  protected Callback createCallback(Class<? extends Callback> callbackClass)  {
+    try {
+      return callbackClass.getConstructor().newInstance();
+    } catch (InstantiationException | NoSuchMethodException | 
InvocationTargetException e) {
+      throw new RuntimeException("Unable to instantiate callback", e);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException("Illegal access", e);
+    }
+  }
+
+  /**
+   * This overrides and wraps the SpoutOutputCollector so that the callback 
can operate upon emit.
+   * @param conf
+   * @param context
+   * @param collector
+   */
+  @Override
+  public void open(Map conf, final TopologyContext context, final 
SpoutOutputCollector collector) {
+    if(_callback == null) {
+      initialize(context);
+    }
+    super.open( conf, context
+            , new CallbackCollector(_callback, collector
+                    
,_context.cloneContext().with(EmitContext.Type.OPEN_CONFIG, conf)
+                    .with(EmitContext.Type.TOPOLOGY_CONTEXT, context)
+            )
+    );
+  }
+
+  @Override
+  public void close() {
+    super.close();
+    if(_callback != null) {
+      try {
+        _callback.close();
+      } catch (Exception e) {
+        throw new IllegalStateException("Unable to close callback", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java
 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java
new file mode 100644
index 0000000..eac3ea8
--- /dev/null
+++ 
b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.task.TopologyContext;
+
+import java.io.Serializable;
+import java.util.EnumMap;
+import java.util.Map;
+
+/**
+ * The context for the emit call.  This allows us to pass static information 
into the spout callback.
+ */
+public class EmitContext implements Cloneable,Serializable {
+    static final long serialVersionUID = 0xDEADBEEFL;
+
+    /**
+     * The static information to be tracked.
+     */
+    public enum Type{
+        STREAM_ID(String.class)
+        ,TOPIC(String.class) //TODO: This should be pulled from the message 
directly with the new spout when we want to support multiple topics.
+        ,PARTITION(Integer.class)
+        ,TASK_ID(Integer.class)
+        ,UUID(String.class)
+        ,SPOUT_CONFIG(KafkaSpoutConfig.class)
+        ,OPEN_CONFIG(Map.class)
+        ,TOPOLOGY_CONTEXT(TopologyContext.class)
+        ;
+        Class<?> clazz;
+        Type(Class<?> clazz) {
+            this.clazz=  clazz;
+        }
+
+        public Class<?> clazz() {
+           return clazz;
+        }
+    }
+    public EmitContext() {
+        this(new EnumMap<>(Type.class));
+    }
+    public EmitContext(EnumMap<Type, Object> context) {
+        _context = context;
+    }
+    private EnumMap<Type, Object> _context;
+
+    public <T> EmitContext with(Type t, T o ) {
+        _context.put(t, t.clazz().cast(o));
+        return this;
+    }
+    public <T> void add(Type t, T o ) {
+        with(t, o);
+    }
+
+    public <T> T get(Type t) {
+        Object o = _context.get(t);
+        if(o == null) {
+            return null;
+        }
+        else {
+            return (T) o;
+        }
+    }
+
+    public EmitContext cloneContext() {
+        try {
+            return (EmitContext)this.clone();
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException("Unable to clone emit context.", e);
+        }
+    }
+
+
+    @Override
+    protected Object clone() throws CloneNotSupportedException {
+        EmitContext context = new EmitContext(_context.clone());
+        return context;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java
 
b/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java
new file mode 100644
index 0000000..fdef69d
--- /dev/null
+++ 
b/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.storm.kafka.flux;
+
+import org.apache.metron.common.utils.KafkaUtils;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SpoutConfigurationTest {
+
+  @Test
+  public void testSeparation() {
+    Map<String, Object>  config = new HashMap<String, Object>() {{
+      put(SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key, 
"UNCOMMITTED_EARLIEST");
+      put(SpoutConfiguration.MAX_RETRIES.key, "1000");
+      put("group.id", "foobar");
+    }};
+    Map<String, Object> spoutConfig = SpoutConfiguration.separate(config);
+    
Assert.assertTrue(spoutConfig.containsKey(SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key));
+    
Assert.assertEquals(spoutConfig.get(SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key),
 "UNCOMMITTED_EARLIEST");
+    
Assert.assertTrue(spoutConfig.containsKey(SpoutConfiguration.MAX_RETRIES.key));
+    Assert.assertEquals(spoutConfig.get(SpoutConfiguration.MAX_RETRIES.key), 
"1000");
+    Assert.assertEquals(2, spoutConfig.size());
+    Assert.assertEquals(1, config.size());
+    Assert.assertEquals(config.get("group.id"), "foobar");
+  }
+
+  @Test
+  public void testBuilderCreation() {
+    Map<String, Object>  config = new HashMap<String, Object>() {{
+      put(SpoutConfiguration.MAX_RETRIES.key, "1000");
+      put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "foo:1234");
+      put("group.id", "foobar");
+    }};
+    Map<String, Object> spoutConfig = SpoutConfiguration.separate(config);
+    KafkaSpoutConfig.Builder<Object, Object> builder = new 
SimpleStormKafkaBuilder(config, "topic", null);
+    SpoutConfiguration.configure(builder, spoutConfig);
+    KafkaSpoutConfig c = builder.build();
+    Assert.assertEquals(1000, c.getMaxTupleRetries() );
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml
index 18cff3f..73a3090 100644
--- a/metron-platform/pom.xml
+++ b/metron-platform/pom.xml
@@ -57,6 +57,7 @@
                <module>metron-hbase</module>
                <module>elasticsearch-shaded</module>
                <module>metron-elasticsearch</module>
+               <module>metron-storm-kafka</module>
        </modules>
        <dependencies>
                <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c9b0d10..7907435 100644
--- a/pom.xml
+++ b/pom.xml
@@ -67,20 +67,44 @@
         <!-- base project versions -->
         <base_storm_version>1.0.1</base_storm_version>
         <base_flux_version>1.0.1</base_flux_version>
-        <base_kafka_version>0.10.0.1</base_kafka_version>
+        <base_kafka_version>0.10.0</base_kafka_version>
         <base_hadoop_version>2.7.1</base_hadoop_version>
         <base_hbase_version>1.1.1</base_hbase_version>
-        <global_accumulo_version>1.8.0</global_accumulo_version>
         <base_flume_version>1.5.2</base_flume_version>
         <!-- full dependency versions -->
+        <global_accumulo_version>1.8.0</global_accumulo_version>
         <global_antlr_version>4.5</global_antlr_version>
         <global_opencsv_version>3.7</global_opencsv_version>
         <global_curator_version>2.7.1</global_curator_version>
         <global_classindex_version>3.3</global_classindex_version>
-        <global_storm_version>${base_storm_version}</global_storm_version>
+        <global_storm_version>1.0.3</global_storm_version>
+        <!--
+             This bears some explanation.  storm-kafka-client is our kafka 
spout.
+             If we ever hope to support kerberos, this provides the capability 
to do so
+             in apache.  Unfortunately, it also does not support, as of Storm 
1.0.x
+             Kafka 0.10.x (see 
https://issues.apache.org/jira/browse/STORM-2091).  
+             The consumer libraries (not to be confused with the protocol) on 
the JVM
+             are binary incompatible.  Note the discussion on 
https://issues.apache.org/jira/browse/KAFKA-3006,
+             the main issue is the move to Collection over List.  While this 
would be polymorphically
+             a non-issue, it would require a recompile of storm-kafka-client 
against Kafka 0.10.x.
+
+             Since a targeted platform is HDP 2.5.x, which ships only kafka 
0.10.x, we need
+             to support kafka 0.10.x.  Therefore, if we are to use this, then 
we would need 
+             to support both Kafka 0.9.x and 0.10.x.  Unfortunately, this 
would require us 
+             to fork some of the internal projects because the 0.9.x API has 
shifted 
+             (e.g. the Admin functions have different parameters) and behaves
+             differently than 0.10.x in subtle ways (e.g. KAFKA_GET doesn't 
work as implemented).
+
+             Rather than do this, we chose to depend on the HDP version of 
storm-kafka because 
+             it is compiled against 0.10.x and therefore would allow us to not 
fork our support
+             for kafka.  I do not like this bleeding of the HDP profile 
dependency into the default,
+             but I justify it by noting that this should be able to be removed 
when we migrate to
+             Storm 1.1.x, which properly supports Kafka 0.10.x.
+          -->
+        
<global_storm_kafka_version>1.0.1.2.5.0.0-1245</global_storm_kafka_version>
         <global_flux_version>${base_flux_version}</global_flux_version>
         <global_pcap_version>1.7.1</global_pcap_version>
-        <global_kafka_version>${base_kafka_version}</global_kafka_version>
+        <global_kafka_version>0.10.0.1</global_kafka_version>
         <global_hadoop_version>${base_hadoop_version}</global_hadoop_version>
         <global_hbase_version>${base_hbase_version}</global_hbase_version>
         <global_flume_version>${base_flume_version}</global_flume_version>
@@ -109,6 +133,7 @@
                 <hdp_version>2.5.0.0</hdp_version>
                 <build_number>1245</build_number>
                 
<global_storm_version>${base_storm_version}.${hdp_version}-${build_number}</global_storm_version>
+                
<global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version>
             </properties>
         </profile>
     </profiles>

Reply via email to