Repository: incubator-metron Updated Branches: refs/heads/master c13ee8265 -> 98dc7659a
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/Callback.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/Callback.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/Callback.java new file mode 100644 index 0000000..8e9622c --- /dev/null +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/Callback.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import java.io.Serializable; +import java.util.List; + +public interface Callback extends AutoCloseable, Serializable { + List<Object> apply(List<Object> tuple, EmitContext context); + void initialize(EmitContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackCollector.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackCollector.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackCollector.java new file mode 100644 index 0000000..57f9f2d --- /dev/null +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackCollector.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.kafka.spout.KafkaSpoutMessageId; +import org.apache.storm.spout.SpoutOutputCollector; + +import java.io.Serializable; +import java.util.List; + +public class CallbackCollector extends SpoutOutputCollector implements Serializable { + static final long serialVersionUID = 0xDEADBEEFL; + Callback _callback; + SpoutOutputCollector _delegate; + EmitContext _context; + public CallbackCollector(Callback callback, SpoutOutputCollector collector, EmitContext context) { + super(collector); + this._callback = callback; + this._delegate = collector; + this._context = context; + } + + + public static int getPartition(Object messageIdObj) { + KafkaSpoutMessageId messageId = (KafkaSpoutMessageId)messageIdObj; + return messageId.getTopicPartition().partition(); + } + + /** + * Emits a new tuple to the specified output stream with the given message ID. + * When Storm detects that this tuple has been fully processed, or has failed + * to be fully processed, the spout will receive an ack or fail callback respectively + * with the messageId as long as the messageId was not null. If the messageId was null, + * Storm will not track the tuple and no callback will be received. The emitted values must be + * immutable. + * + * @param streamId + * @param tuple + * @param messageId + * @return the list of task ids that this tuple was sent to + */ + @Override + public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { + List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId)) + .with(EmitContext.Type.STREAM_ID, streamId) + ); + return _delegate.emit(streamId, t, messageId); + } + + /** + * Emits a new tuple to the default output stream with the given message ID. + * When Storm detects that this tuple has been fully processed, or has failed + * to be fully processed, the spout will receive an ack or fail callback respectively + * with the messageId as long as the messageId was not null. If the messageId was null, + * Storm will not track the tuple and no callback will be received. The emitted values must be + * immutable. + * + * @param tuple + * @param messageId + * @return the list of task ids that this tuple was sent to + */ + @Override + public List<Integer> emit(List<Object> tuple, Object messageId) { + List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId))); + return _delegate.emit(t, messageId); + } + + /** + * Emits a tuple to the default output stream with a null message id. Storm will + * not track this message so ack and fail will never be called for this tuple. The + * emitted values must be immutable. + * + * @param tuple + */ + @Override + public List<Integer> emit(List<Object> tuple) { + List<Object> t = _callback.apply(tuple, _context.cloneContext()); + return _delegate.emit(t); + } + + /** + * Emits a tuple to the specified output stream with a null message id. Storm will + * not track this message so ack and fail will never be called for this tuple. The + * emitted values must be immutable. + * + * @param streamId + * @param tuple + */ + @Override + public List<Integer> emit(String streamId, List<Object> tuple) { + List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)); + return _delegate.emit(streamId, t); + } + + /** + * Emits a tuple to the specified task on the specified output stream. This output + * stream must have been declared as a direct stream, and the specified task must + * use a direct grouping on this stream to receive the message. The emitted values must be + * immutable. + * + * @param taskId + * @param streamId + * @param tuple + * @param messageId + */ + @Override + public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) { + List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId) + .with(EmitContext.Type.PARTITION, getPartition(messageId)) + .with(EmitContext.Type.TASK_ID, taskId) + ); + _delegate.emitDirect(taskId, streamId, t, messageId); + } + + /** + * Emits a tuple to the specified task on the default output stream. This output + * stream must have been declared as a direct stream, and the specified task must + * use a direct grouping on this stream to receive the message. The emitted values must be + * immutable. + * + * @param taskId + * @param tuple + * @param messageId + */ + @Override + public void emitDirect(int taskId, List<Object> tuple, Object messageId) { + List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId)) + .with(EmitContext.Type.TASK_ID, taskId) + ); + _delegate.emitDirect(taskId, t, messageId); + } + + /** + * Emits a tuple to the specified task on the specified output stream. This output + * stream must have been declared as a direct stream, and the specified task must + * use a direct grouping on this stream to receive the message. The emitted values must be + * immutable. + * + * <p> Because no message id is specified, Storm will not track this message + * so ack and fail will never be called for this tuple. + * + * @param taskId + * @param streamId + * @param tuple + */ + @Override + public void emitDirect(int taskId, String streamId, List<Object> tuple) { + List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId) + .with(EmitContext.Type.TASK_ID, taskId) + ); + _delegate.emitDirect(taskId, streamId, t); + } + + /** + * Emits a tuple to the specified task on the default output stream. This output + * stream must have been declared as a direct stream, and the specified task must + * use a direct grouping on this stream to receive the message. The emitted values must be + * immutable. + * + * <p> Because no message id is specified, Storm will not track this message + * so ack and fail will never be called for this tuple. + * + * @param taskId + * @param tuple + */ + @Override + public void emitDirect(int taskId, List<Object> tuple) { + + List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.TASK_ID, taskId)); + _delegate.emitDirect(taskId, t); + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java new file mode 100644 index 0000000..8592e13 --- /dev/null +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + + +import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder; +import org.apache.metron.storm.kafka.flux.StormKafkaSpout; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; + +import java.lang.reflect.InvocationTargetException; +import java.util.*; + +/** + * A kafka spout with a callback that is executed on message commit. + * @param <K> The Kafka key type + * @param <V> The Kafka value type + */ +public class CallbackKafkaSpout<K, V> extends StormKafkaSpout<K, V> { + static final long serialVersionUID = 0xDEADBEEFL; + Class<? extends Callback> callbackClazz; + Callback _callback; + EmitContext _context; + public CallbackKafkaSpout(SimpleStormKafkaBuilder<K, V> spoutConfig, String callbackClass) { + this(spoutConfig, toCallbackClass(callbackClass)); + } + + public CallbackKafkaSpout(SimpleStormKafkaBuilder<K, V> spoutConf, Class<? extends Callback> callback) { + super(spoutConf); + callbackClazz = callback; + } + + public void initialize(TopologyContext context) { + _callback = createCallback(callbackClazz); + _context = new EmitContext().with(EmitContext.Type.SPOUT_CONFIG, _spoutConfig) + .with(EmitContext.Type.UUID, context.getStormId()) + .with(EmitContext.Type.TOPIC, _topic); + _callback.initialize(_context); + } + + + private static Class<? extends Callback> toCallbackClass(String callbackClass) { + try{ + return (Class<? extends Callback>) Callback.class.forName(callbackClass); + } + catch (ClassNotFoundException e) { + throw new RuntimeException(callbackClass + " not found", e); + } + } + + protected Callback createCallback(Class<? extends Callback> callbackClass) { + try { + return callbackClass.getConstructor().newInstance(); + } catch (InstantiationException | NoSuchMethodException | InvocationTargetException e) { + throw new RuntimeException("Unable to instantiate callback", e); + } catch (IllegalAccessException e) { + throw new RuntimeException("Illegal access", e); + } + } + + /** + * This overrides and wraps the SpoutOutputCollector so that the callback can operate upon emit. + * @param conf + * @param context + * @param collector + */ + @Override + public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) { + if(_callback == null) { + initialize(context); + } + super.open( conf, context + , new CallbackCollector(_callback, collector + ,_context.cloneContext().with(EmitContext.Type.OPEN_CONFIG, conf) + .with(EmitContext.Type.TOPOLOGY_CONTEXT, context) + ) + ); + } + + @Override + public void close() { + super.close(); + if(_callback != null) { + try { + _callback.close(); + } catch (Exception e) { + throw new IllegalStateException("Unable to close callback", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java new file mode 100644 index 0000000..eac3ea8 --- /dev/null +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/storm/kafka/EmitContext.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.kafka; + +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.task.TopologyContext; + +import java.io.Serializable; +import java.util.EnumMap; +import java.util.Map; + +/** + * The context for the emit call. This allows us to pass static information into the spout callback. + */ +public class EmitContext implements Cloneable,Serializable { + static final long serialVersionUID = 0xDEADBEEFL; + + /** + * The static information to be tracked. + */ + public enum Type{ + STREAM_ID(String.class) + ,TOPIC(String.class) //TODO: This should be pulled from the message directly with the new spout when we want to support multiple topics. + ,PARTITION(Integer.class) + ,TASK_ID(Integer.class) + ,UUID(String.class) + ,SPOUT_CONFIG(KafkaSpoutConfig.class) + ,OPEN_CONFIG(Map.class) + ,TOPOLOGY_CONTEXT(TopologyContext.class) + ; + Class<?> clazz; + Type(Class<?> clazz) { + this.clazz= clazz; + } + + public Class<?> clazz() { + return clazz; + } + } + public EmitContext() { + this(new EnumMap<>(Type.class)); + } + public EmitContext(EnumMap<Type, Object> context) { + _context = context; + } + private EnumMap<Type, Object> _context; + + public <T> EmitContext with(Type t, T o ) { + _context.put(t, t.clazz().cast(o)); + return this; + } + public <T> void add(Type t, T o ) { + with(t, o); + } + + public <T> T get(Type t) { + Object o = _context.get(t); + if(o == null) { + return null; + } + else { + return (T) o; + } + } + + public EmitContext cloneContext() { + try { + return (EmitContext)this.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException("Unable to clone emit context.", e); + } + } + + + @Override + protected Object clone() throws CloneNotSupportedException { + EmitContext context = new EmitContext(_context.clone()); + return context; + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java b/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java new file mode 100644 index 0000000..fdef69d --- /dev/null +++ b/metron-platform/metron-storm-kafka/src/test/java/org/apache/metron/storm/kafka/flux/SpoutConfigurationTest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.storm.kafka.flux; + +import org.apache.metron.common.utils.KafkaUtils; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.junit.Assert; +import org.junit.Test; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; + +public class SpoutConfigurationTest { + + @Test + public void testSeparation() { + Map<String, Object> config = new HashMap<String, Object>() {{ + put(SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key, "UNCOMMITTED_EARLIEST"); + put(SpoutConfiguration.MAX_RETRIES.key, "1000"); + put("group.id", "foobar"); + }}; + Map<String, Object> spoutConfig = SpoutConfiguration.separate(config); + Assert.assertTrue(spoutConfig.containsKey(SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key)); + Assert.assertEquals(spoutConfig.get(SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key), "UNCOMMITTED_EARLIEST"); + Assert.assertTrue(spoutConfig.containsKey(SpoutConfiguration.MAX_RETRIES.key)); + Assert.assertEquals(spoutConfig.get(SpoutConfiguration.MAX_RETRIES.key), "1000"); + Assert.assertEquals(2, spoutConfig.size()); + Assert.assertEquals(1, config.size()); + Assert.assertEquals(config.get("group.id"), "foobar"); + } + + @Test + public void testBuilderCreation() { + Map<String, Object> config = new HashMap<String, Object>() {{ + put(SpoutConfiguration.MAX_RETRIES.key, "1000"); + put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "foo:1234"); + put("group.id", "foobar"); + }}; + Map<String, Object> spoutConfig = SpoutConfiguration.separate(config); + KafkaSpoutConfig.Builder<Object, Object> builder = new SimpleStormKafkaBuilder(config, "topic", null); + SpoutConfiguration.configure(builder, spoutConfig); + KafkaSpoutConfig c = builder.build(); + Assert.assertEquals(1000, c.getMaxTupleRetries() ); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/pom.xml b/metron-platform/pom.xml index 18cff3f..73a3090 100644 --- a/metron-platform/pom.xml +++ b/metron-platform/pom.xml @@ -57,6 +57,7 @@ <module>metron-hbase</module> <module>elasticsearch-shaded</module> <module>metron-elasticsearch</module> + <module>metron-storm-kafka</module> </modules> <dependencies> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c9b0d10..7907435 100644 --- a/pom.xml +++ b/pom.xml @@ -67,20 +67,44 @@ <!-- base project versions --> <base_storm_version>1.0.1</base_storm_version> <base_flux_version>1.0.1</base_flux_version> - <base_kafka_version>0.10.0.1</base_kafka_version> + <base_kafka_version>0.10.0</base_kafka_version> <base_hadoop_version>2.7.1</base_hadoop_version> <base_hbase_version>1.1.1</base_hbase_version> - <global_accumulo_version>1.8.0</global_accumulo_version> <base_flume_version>1.5.2</base_flume_version> <!-- full dependency versions --> + <global_accumulo_version>1.8.0</global_accumulo_version> <global_antlr_version>4.5</global_antlr_version> <global_opencsv_version>3.7</global_opencsv_version> <global_curator_version>2.7.1</global_curator_version> <global_classindex_version>3.3</global_classindex_version> - <global_storm_version>${base_storm_version}</global_storm_version> + <global_storm_version>1.0.3</global_storm_version> + <!-- + This bears some explanation. storm-kafka-client is our kafka spout. + If we ever hope to support kerberos, this provides the capability to do so + in apache. Unfortunately, it also does not support, as of Storm 1.0.x + Kafka 0.10.x (see https://issues.apache.org/jira/browse/STORM-2091). + The consumer libraries (not to be confused with the protocol) on the JVM + are binary incompatible. Note the discussion on https://issues.apache.org/jira/browse/KAFKA-3006, + the main issue is the move to Collection over List. While this would be polymorphically + a non-issue, it would require a recompile of storm-kafka-client against Kafka 0.10.x. + + Since a targeted platform is HDP 2.5.x, which ships only kafka 0.10.x, we need + to support kafka 0.10.x. Therefore, if we are to use this, then we would need + to support both Kafka 0.9.x and 0.10.x. Unfortunately, this would require us + to fork some of the internal projects because the 0.9.x API has shifted + (e.g. the Admin functions have different parameters) and behaves + differently than 0.10.x in subtle ways (e.g. KAFKA_GET doesn't work as implemented). + + Rather than do this, we chose to depend on the HDP version of storm-kafka because + it is compiled against 0.10.x and therefore would allow us to not fork our support + for kafka. I do not like this bleeding of the HDP profile dependency into the default, + but I justify it by noting that this should be able to be removed when we migrate to + Storm 1.1.x, which properly supports Kafka 0.10.x. + --> + <global_storm_kafka_version>1.0.1.2.5.0.0-1245</global_storm_kafka_version> <global_flux_version>${base_flux_version}</global_flux_version> <global_pcap_version>1.7.1</global_pcap_version> - <global_kafka_version>${base_kafka_version}</global_kafka_version> + <global_kafka_version>0.10.0.1</global_kafka_version> <global_hadoop_version>${base_hadoop_version}</global_hadoop_version> <global_hbase_version>${base_hbase_version}</global_hbase_version> <global_flume_version>${base_flume_version}</global_flume_version> @@ -109,6 +133,7 @@ <hdp_version>2.5.0.0</hdp_version> <build_number>1245</build_number> <global_storm_version>${base_storm_version}.${hdp_version}-${build_number}</global_storm_version> + <global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version> </properties> </profile> </profiles>