http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java index 9ac6d41..43cd7e0 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterCallback.java @@ -18,34 +18,31 @@ package org.apache.metron.spout.pcap; -import com.google.common.base.Function; import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableList; -import org.apache.commons.collections.map.HashedMap; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.SequenceFile; import org.apache.log4j.Logger; import org.apache.storm.kafka.Callback; import org.apache.storm.kafka.EmitContext; -import org.apache.storm.kafka.PartitionManager; -import javax.annotation.Nullable; +import javax.xml.bind.DatatypeConverter; import java.io.Closeable; import java.io.IOException; -import java.util.EnumSet; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +/** + * A callback which gets executed as part of the spout to write pcap data to HDFS. + */ public class HDFSWriterCallback implements Callback { static final long serialVersionUID = 0xDEADBEEFL; private static final Logger LOG = Logger.getLogger(HDFSWriterCallback.class); + /** + * A topic+partition. We split the files up by topic+partition so the writers don't clobber each other + */ static class Partition { String topic; int partition; @@ -83,6 +80,24 @@ public class HDFSWriterCallback implements Callback { } } + /** + * This is a static container of threadlocal LongWritables and BytesWritables. This keeps us from having to create so + * many objects on the heap. The Deserializers update these for every packet. + */ + private static class KeyValue { + static ThreadLocal<LongWritable> key = new ThreadLocal<LongWritable> () { + @Override + protected LongWritable initialValue() { + return new LongWritable(); + } + }; + static ThreadLocal<BytesWritable> value = new ThreadLocal<BytesWritable> () { + @Override + protected BytesWritable initialValue() { + return new BytesWritable(); + } + }; + } private HDFSWriterConfig config; private EmitContext context; private Map<Partition, PartitionHDFSWriter> writers = new HashMap<>(); @@ -96,16 +111,36 @@ public class HDFSWriterCallback implements Callback { this.config = config; return this; } + @Override public List<Object> apply(List<Object> tuple, EmitContext context) { - - List<Object> keyValue = (List<Object>) tuple.get(0); - LongWritable ts = (LongWritable) keyValue.get(0); - BytesWritable rawPacket = (BytesWritable)keyValue.get(1); + byte[] key = (byte[]) tuple.get(0); + byte[] value = (byte[]) tuple.get(1); + if(!config.getDeserializer().deserializeKeyValue(key, value, KeyValue.key.get(), KeyValue.value.get())) { + if(LOG.isDebugEnabled()) { + List<String> debugStatements = new ArrayList<>(); + if(key != null) { + debugStatements.add("Key length: " + key.length); + debugStatements.add("Key: " + DatatypeConverter.printHexBinary(key)); + } + else { + debugStatements.add("Key is null!"); + } + + if(value != null) { + debugStatements.add("Value length: " + value.length); + debugStatements.add("Value: " + DatatypeConverter.printHexBinary(value)); + } + else { + debugStatements.add("Value is null!"); + } + LOG.debug("Dropping malformed packet: " + Joiner.on(" / ").join(debugStatements)); + } + } try { getWriter(new Partition( topic , context.get(EmitContext.Type.PARTITION)) - ).handle(ts, rawPacket); + ).handle(KeyValue.key.get(), KeyValue.value.get()); } catch (IOException e) { LOG.error(e.getMessage(), e); //drop? not sure.. @@ -133,45 +168,15 @@ public class HDFSWriterCallback implements Callback { @Override public void initialize(EmitContext context) { this.context = context; - this.topic = context.get(EmitContext.Type.TOPIC); + Object topics = context.get(EmitContext.Type.TOPIC); + if(topics instanceof List) { + this.topic = Joiner.on(",").join((List<String>)topics); + } + else { + this.topic = "" + topics; + } } - /** - * Closes this resource, relinquishing any underlying resources. - * This method is invoked automatically on objects managed by the - * {@code try}-with-resources statement. - * - * <p>While this interface method is declared to throw {@code - * Exception}, implementers are <em>strongly</em> encouraged to - * declare concrete implementations of the {@code close} method to - * throw more specific exceptions, or to throw no exception at all - * if the close operation cannot fail. - * - * <p><em>Implementers of this interface are also strongly advised - * to not have the {@code close} method throw {@link - * InterruptedException}.</em> - * - * <p>This exception interacts with a thread's interrupted status, - * and runtime misbehavior is likely to occur if an {@code - * InterruptedException} is {@linkplain Throwable#addSuppressed - * suppressed}. - * - * <p>More generally, if it would cause problems for an - * exception to be suppressed, the {@code AutoCloseable.close} - * method should not throw it. - * - * <p>Note that unlike the {@link Closeable#close close} - * method of {@link Closeable}, this {@code close} method - * is <em>not</em> required to be idempotent. In other words, - * calling this {@code close} method more than once may have some - * visible side effect, unlike {@code Closeable.close} which is - * required to have no effect if called more than once. - * - * <p>However, implementers of this interface are strongly encouraged - * to make their {@code close} methods idempotent. - * - * @throws Exception if this resource cannot be closed - */ @Override public void close() throws Exception { for(PartitionHDFSWriter writer : writers.values()) {
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java index b6efbc5..66bb359 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/HDFSWriterConfig.java @@ -20,33 +20,70 @@ package org.apache.metron.spout.pcap; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; import org.apache.metron.common.utils.timestamp.TimestampConverters; +import org.apache.metron.spout.pcap.deserializer.Deserializers; +import org.apache.metron.spout.pcap.deserializer.KeyValueDeserializer; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +/** + * Configure the HDFS Writer for PCap + */ public class HDFSWriterConfig implements Serializable { static final long serialVersionUID = 0xDEADBEEFL; private long numPackets; private long maxTimeNS; private String outputPath; private String zookeeperQuorum; + private KeyValueDeserializer deserializer; + /** + * Set the deserializer, the bit of logic that defines how the timestamp and packet are read. + * @param deserializer One of the Deserializers in org.apache.metron.spout.pcap.deserializer.Deserializers + * @param timestampConverter One of org.apache.metron.common.utils.timestamp.TimestampConverters. This defines the units of our timestamp. + * @return + */ + public HDFSWriterConfig withDeserializer(String deserializer, String timestampConverter) { + this.deserializer = Deserializers.create(deserializer, timestampConverter); + return this; + } + + /** + * The output path in HDFS to write to. + * @param path + * @return + */ public HDFSWriterConfig withOutputPath(String path) { outputPath = path; return this; } + /** + * The number of packets to write before a file is rolled. + * @param n + * @return + */ public HDFSWriterConfig withNumPackets(long n) { numPackets = n; return this; } + /** + * The total amount of time (in ms) to write before a file is rolled. + * @param t + * @return + */ public HDFSWriterConfig withMaxTimeMS(long t) { maxTimeNS = TimestampConverters.MILLISECONDS.toNanoseconds(t); return this; } + /** + * The zookeeper quorum to use. + * @param zookeeperQuorum + * @return + */ public HDFSWriterConfig withZookeeperQuorum(String zookeeperQuorum) { this.zookeeperQuorum = zookeeperQuorum; return this; @@ -75,6 +112,10 @@ public class HDFSWriterConfig implements Serializable { return null; } + public KeyValueDeserializer getDeserializer() { + return deserializer; + } + public String getOutputPath() { return outputPath; } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java index e4f1113..ddfd14a 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/KafkaToHDFSSpout.java @@ -18,14 +18,20 @@ package org.apache.metron.spout.pcap; +import org.apache.metron.storm.kafka.flux.SimpleStormKafkaBuilder; import org.apache.storm.kafka.Callback; import org.apache.storm.kafka.CallbackKafkaSpout; -public class KafkaToHDFSSpout extends CallbackKafkaSpout { +public class KafkaToHDFSSpout extends CallbackKafkaSpout<byte[], byte[]> { static final long serialVersionUID = 0xDEADBEEFL; HDFSWriterConfig config = null; - public KafkaToHDFSSpout(SpoutConfig spoutConfig, HDFSWriterConfig config) { - super(spoutConfig, HDFSWriterCallback.class); + public KafkaToHDFSSpout( SimpleStormKafkaBuilder<byte[], byte[]> spoutConfig + , HDFSWriterConfig config + ) + { + super(spoutConfig + , HDFSWriterCallback.class + ); this.config = config; } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java index d99a594..f0ea1eb 100644 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java @@ -33,6 +33,9 @@ import org.apache.metron.pcap.PcapHelper; import java.io.*; import java.util.EnumSet; +/** + * This class is intended to handle the writing of an individual file. + */ public class PartitionHDFSWriter implements AutoCloseable, Serializable { static final long serialVersionUID = 0xDEADBEEFL; private static final Logger LOG = Logger.getLogger(PartitionHDFSWriter.class); @@ -42,6 +45,10 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable { void sync(FSDataOutputStream outputStream) throws IOException; } + /* + The sync handlers are FileSystem specific implementations of sync'ing. The more often you sync, the more atomic the + writing is. There is a natural tradeoff between sync'ing often and performance. + */ public static enum SyncHandlers implements SyncHandler{ DEFAULT(new SyncHandler() { @@ -114,7 +121,7 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable { public void handle(LongWritable ts, BytesWritable value) throws IOException { turnoverIfNecessary(ts.get()); - writer.append(ts, new BytesWritable(value.getBytes())); + writer.append(ts, value); syncHandler.sync(outputStream); numWritten++; } http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java deleted file mode 100644 index 8a76548..0000000 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/SpoutConfig.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.spout.pcap; - -import org.apache.metron.common.utils.timestamp.TimestampConverters; -import org.apache.metron.spout.pcap.scheme.TimestampScheme; -import org.apache.storm.kafka.BrokerHosts; - -public class SpoutConfig extends org.apache.metron.common.spout.kafka.SpoutConfig{ - - public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { - super(hosts, topic, zkRoot, id); - } - - public SpoutConfig withTimestampScheme(String scheme, String granularity) { - super.scheme = TimestampScheme.getScheme(scheme, TimestampConverters.getConverter(granularity)); - return this; - } -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/Deserializers.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/Deserializers.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/Deserializers.java new file mode 100644 index 0000000..e139c27 --- /dev/null +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/Deserializers.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.spout.pcap.deserializer; + +import org.apache.metron.common.utils.timestamp.TimestampConverters; +import org.apache.metron.common.utils.timestamp.TimestampConverter; + +import java.util.function.Function; + +/** + * Deserializers take the raw bytes from kafka key and value and construct the timestamp and raw bytes for PCAP. + */ +public enum Deserializers { + /** + * Extract the timestamp from the key and the raw packet (global-headerless) from the value + */ + FROM_KEY( converter -> new FromKeyDeserializer(converter)) + /** + * Ignore the key and pull the timestamp directly from the packet itself. Also, assume that the packet isn't global-headerless. + */ + ,FROM_PACKET(converter -> new FromPacketDeserializer()); + ; + Function<TimestampConverter, KeyValueDeserializer> creator; + Deserializers(Function<TimestampConverter, KeyValueDeserializer> creator) + { + this.creator = creator; + } + + public static KeyValueDeserializer create(String scheme, TimestampConverter converter) { + try { + Deserializers ts = Deserializers.valueOf(scheme.toUpperCase()); + return ts.creator.apply(converter); + } + catch(IllegalArgumentException iae) { + return Deserializers.FROM_KEY.creator.apply(converter); + } + } + + public static KeyValueDeserializer create(String scheme, String converter) { + return create(scheme, TimestampConverters.getConverter(converter)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java new file mode 100644 index 0000000..de1e24b --- /dev/null +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromKeyDeserializer.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.spout.pcap.deserializer; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.log4j.Logger; +import org.apache.metron.common.utils.timestamp.TimestampConverter; +import org.apache.metron.pcap.PcapHelper; +import org.apache.metron.spout.pcap.Endianness; + + +/** + * Extract the timestamp from the key and raw data from the packet. + */ +public class FromKeyDeserializer extends KeyValueDeserializer { + private static final Logger LOG = Logger.getLogger(FromKeyDeserializer.class); + private static Endianness endianness = Endianness.getNativeEndianness(); + + + public FromKeyDeserializer(TimestampConverter converter) { + super(converter); + } + + @Override + public boolean deserializeKeyValue(byte[] key, byte[] value, LongWritable outKey, BytesWritable outValue) { + Long ts = converter.toNanoseconds(fromBytes(key)); + outKey.set(ts); + byte[] packetHeaderized = PcapHelper.addPacketHeader(ts, value, endianness); + byte[] globalHeaderized= PcapHelper.addGlobalHeader(packetHeaderized, endianness); + outValue.set(globalHeaderized, 0, globalHeaderized.length); + return true; + } + + /** + * Convert the byte array representation for a long into a proper long. + * @param data + * @return a long + */ + private static long fromBytes(byte[] data) { + long value = 0L; + int len = data.length; + + for(int i = 0; i < len; ++i) { + byte b = data[i]; + //make room in the long + value <<= 8; + //drop the byte in + value |= (long)(b & 255); + } + + return Long.valueOf(value); + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java new file mode 100644 index 0000000..6098904 --- /dev/null +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/FromPacketDeserializer.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.spout.pcap.deserializer; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.log4j.Logger; +import org.apache.metron.pcap.PcapHelper; +/** + * Extract the timestamp and raw data from the packet. + */ +public class FromPacketDeserializer extends KeyValueDeserializer { + private static final Logger LOG = Logger.getLogger(FromPacketDeserializer.class); + + @Override + public boolean deserializeKeyValue(byte[] key, byte[] value, LongWritable outKey, BytesWritable outValue) { + Long ts = PcapHelper.getTimestamp(value); + if(ts != null) { + outKey.set(ts); + outValue.set(value, 0, value.length); + return true; + } + else { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java new file mode 100644 index 0000000..48bea87 --- /dev/null +++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/deserializer/KeyValueDeserializer.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.spout.pcap.deserializer; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.metron.common.utils.timestamp.TimestampConverter; +import org.apache.metron.common.utils.timestamp.TimestampConverters; + +import java.io.Serializable; + +public abstract class KeyValueDeserializer implements Serializable { + protected TimestampConverter converter; + + public KeyValueDeserializer() { + this(TimestampConverters.MICROSECONDS); + } + + public KeyValueDeserializer(TimestampConverter converter) { + this.converter = converter; + } + + public abstract boolean deserializeKeyValue(byte[] key, byte[] value, LongWritable outKey, BytesWritable outValue); + +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java deleted file mode 100644 index 625cc2d..0000000 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromKeyScheme.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.spout.pcap.scheme; - -import org.apache.kafka.common.utils.Utils; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; -import com.google.common.collect.ImmutableList; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.log4j.Logger; -import org.apache.metron.common.utils.timestamp.TimestampConverter; -import org.apache.metron.common.utils.timestamp.TimestampConverters; -import org.apache.metron.pcap.PcapHelper; -import org.apache.metron.spout.pcap.Endianness; -import org.apache.storm.kafka.KeyValueScheme; - -import java.nio.ByteBuffer; -import java.util.List; - -public class FromKeyScheme implements KeyValueScheme, KeyConvertible { - private static final Logger LOG = Logger.getLogger(FromKeyScheme.class); - - private TimestampConverter converter = TimestampConverters.MICROSECONDS; - private static Endianness endianness = Endianness.getNativeEndianness(); - @Override - public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) { - Long ts = converter.toNanoseconds(key.asLongBuffer().get()); - byte[] packetHeaderized = PcapHelper.addPacketHeader(ts, Utils.toArray(value), endianness); - byte[] globalHeaderized= PcapHelper.addGlobalHeader(packetHeaderized, endianness); - return new Values(ImmutableList.of(new LongWritable(ts), new BytesWritable(globalHeaderized))); - } - - @Override - public List<Object> deserialize(ByteBuffer ser) { - throw new UnsupportedOperationException("Really only interested in deserializing a key and a value"); - } - - @Override - public Fields getOutputFields() { - return new Fields(TimestampScheme.KV_FIELD); - } - - @Override - public FromKeyScheme withTimestampConverter(TimestampConverter converter) { - try { - this.converter = converter; - } - catch(IllegalArgumentException iae) { - LOG.error(iae.getMessage(), iae); - } - return this; - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java deleted file mode 100644 index 236db0b..0000000 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/FromPacketScheme.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.spout.pcap.scheme; - -import org.apache.storm.spout.MultiScheme; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.Values; -import com.google.common.collect.ImmutableList; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.log4j.Logger; -import org.apache.metron.common.utils.timestamp.TimestampConverter; -import org.apache.metron.pcap.PcapHelper; -import org.apache.storm.utils.Utils; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; - -public class FromPacketScheme implements MultiScheme,KeyConvertible { - private static final Logger LOG = Logger.getLogger(FromPacketScheme.class); - @Override - public Iterable<List<Object>> deserialize(ByteBuffer rawValue) { - byte[] value = Utils.toByteArray(rawValue); - Long ts = PcapHelper.getTimestamp(value); - if(ts != null) { - return ImmutableList.of(new Values(ImmutableList.of(new LongWritable(ts), new BytesWritable(value)))); - } - else { - return ImmutableList.of(new Values(Collections.EMPTY_LIST)); - } - } - - @Override - public Fields getOutputFields() { - return new Fields(TimestampScheme.KV_FIELD); - } - - - @Override - public FromPacketScheme withTimestampConverter(TimestampConverter converter) { - return this; - } -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/KeyConvertible.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/KeyConvertible.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/KeyConvertible.java deleted file mode 100644 index 54e52e8..0000000 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/KeyConvertible.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.metron.spout.pcap.scheme; - - -import org.apache.metron.common.utils.timestamp.TimestampConverter; - -public interface KeyConvertible { - KeyConvertible withTimestampConverter(TimestampConverter converter); -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java deleted file mode 100644 index 2be55a9..0000000 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampScheme.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.spout.pcap.scheme; - -import org.apache.storm.spout.MultiScheme; -import org.apache.metron.common.utils.timestamp.TimestampConverter; -import org.apache.storm.kafka.KeyValueSchemeAsMultiScheme; - -public enum TimestampScheme { - FROM_KEY( converter -> new KeyValueSchemeAsMultiScheme(new FromKeyScheme().withTimestampConverter(converter))) - ,FROM_PACKET(converter -> new FromPacketScheme().withTimestampConverter(converter)); - ; - public static final String KV_FIELD = "kv"; - TimestampSchemeCreator creator; - TimestampScheme(TimestampSchemeCreator creator) - { - this.creator = creator; - } - - public static MultiScheme getScheme(String scheme, TimestampConverter converter) { - try { - TimestampScheme ts = TimestampScheme.valueOf(scheme.toUpperCase()); - return ts.creator.create(converter); - } - catch(IllegalArgumentException iae) { - return TimestampScheme.FROM_KEY.creator.create(converter); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java deleted file mode 100644 index 78aa527..0000000 --- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/scheme/TimestampSchemeCreator.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.spout.pcap.scheme; - -import org.apache.storm.spout.MultiScheme; -import org.apache.metron.common.utils.timestamp.TimestampConverter; - -public interface TimestampSchemeCreator { - MultiScheme create(TimestampConverter converter); -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java index 00cc62d..8b292d7 100644 --- a/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java +++ b/metron-platform/metron-pcap-backend/src/test/java/org/apache/metron/pcap/integration/PcapTopologyIntegrationTest.java @@ -49,7 +49,7 @@ import org.apache.metron.pcap.filter.fixed.FixedPcapFilter; import org.apache.metron.pcap.filter.query.QueryPcapFilter; import org.apache.metron.pcap.mr.PcapJob; import org.apache.metron.spout.pcap.Endianness; -import org.apache.metron.spout.pcap.scheme.TimestampScheme; +import org.apache.metron.spout.pcap.deserializer.Deserializers; import org.apache.metron.test.utils.UnitTestHelper; import org.json.simple.JSONObject; import org.junit.Assert; @@ -136,7 +136,7 @@ public class PcapTopologyIntegrationTest { @Nullable @Override public Void apply(@Nullable Properties input) { - input.setProperty("kafka.pcap.ts_scheme", TimestampScheme.FROM_PACKET.toString()); + input.setProperty("kafka.pcap.ts_scheme", Deserializers.FROM_PACKET.toString()); return null; } }, (kafkaComponent, pcapEntries) -> kafkaComponent.writeMessages( KAFKA_TOPIC @@ -153,7 +153,7 @@ public class PcapTopologyIntegrationTest { @Nullable @Override public Void apply(@Nullable Properties input) { - input.setProperty("kafka.pcap.ts_scheme", TimestampScheme.FROM_KEY.toString()); + input.setProperty("kafka.pcap.ts_scheme", Deserializers.FROM_KEY.toString()); return null; } }, new SendEntries() { http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/pom.xml b/metron-platform/metron-pcap/pom.xml index 0c20ed8..07fdd46 100644 --- a/metron-platform/metron-pcap/pom.xml +++ b/metron-platform/metron-pcap/pom.xml @@ -116,15 +116,9 @@ </exclusions> </dependency> <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-kafka</artifactId> - <version>${global_storm_version}</version> - <exclusions> - <exclusion> - <artifactId>org.apache.curator</artifactId> - <groupId>curator-client</groupId> - </exclusion> - </exclusions> + <groupId>org.apache.metron</groupId> + <artifactId>metron-storm-kafka</artifactId> + <version>${project.parent.version}</version> </dependency> <dependency> <groupId>org.apache.metron</groupId> http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/Callback.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/Callback.java b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/Callback.java deleted file mode 100644 index 8e9622c..0000000 --- a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/Callback.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka; - -import java.io.Serializable; -import java.util.List; - -public interface Callback extends AutoCloseable, Serializable { - List<Object> apply(List<Object> tuple, EmitContext context); - void initialize(EmitContext context); -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackCollector.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackCollector.java b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackCollector.java deleted file mode 100644 index 01b2f5c..0000000 --- a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackCollector.java +++ /dev/null @@ -1,186 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka; - -import org.apache.storm.spout.SpoutOutputCollector; - -import java.io.Serializable; -import java.util.List; - -public class CallbackCollector extends SpoutOutputCollector implements Serializable { - static final long serialVersionUID = 0xDEADBEEFL; - Callback _callback; - SpoutOutputCollector _delegate; - EmitContext _context; - public CallbackCollector(Callback callback, SpoutOutputCollector collector, EmitContext context) { - super(collector); - this._callback = callback; - this._delegate = collector; - this._context = context; - } - - - public static int getPartition(Object messageIdObj) { - PartitionManager.KafkaMessageId messageId = (PartitionManager.KafkaMessageId) messageIdObj; - return messageId.partition.partition; - } - - /** - * Emits a new tuple to the specified output stream with the given message ID. - * When Storm detects that this tuple has been fully processed, or has failed - * to be fully processed, the spout will receive an ack or fail callback respectively - * with the messageId as long as the messageId was not null. If the messageId was null, - * Storm will not track the tuple and no callback will be received. The emitted values must be - * immutable. - * - * @param streamId - * @param tuple - * @param messageId - * @return the list of task ids that this tuple was sent to - */ - @Override - public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { - List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId)) - .with(EmitContext.Type.STREAM_ID, streamId) - ); - return _delegate.emit(streamId, t, messageId); - } - - /** - * Emits a new tuple to the default output stream with the given message ID. - * When Storm detects that this tuple has been fully processed, or has failed - * to be fully processed, the spout will receive an ack or fail callback respectively - * with the messageId as long as the messageId was not null. If the messageId was null, - * Storm will not track the tuple and no callback will be received. The emitted values must be - * immutable. - * - * @param tuple - * @param messageId - * @return the list of task ids that this tuple was sent to - */ - @Override - public List<Integer> emit(List<Object> tuple, Object messageId) { - List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId))); - return _delegate.emit(t, messageId); - } - - /** - * Emits a tuple to the default output stream with a null message id. Storm will - * not track this message so ack and fail will never be called for this tuple. The - * emitted values must be immutable. - * - * @param tuple - */ - @Override - public List<Integer> emit(List<Object> tuple) { - List<Object> t = _callback.apply(tuple, _context.cloneContext()); - return _delegate.emit(t); - } - - /** - * Emits a tuple to the specified output stream with a null message id. Storm will - * not track this message so ack and fail will never be called for this tuple. The - * emitted values must be immutable. - * - * @param streamId - * @param tuple - */ - @Override - public List<Integer> emit(String streamId, List<Object> tuple) { - List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId)); - return _delegate.emit(streamId, t); - } - - /** - * Emits a tuple to the specified task on the specified output stream. This output - * stream must have been declared as a direct stream, and the specified task must - * use a direct grouping on this stream to receive the message. The emitted values must be - * immutable. - * - * @param taskId - * @param streamId - * @param tuple - * @param messageId - */ - @Override - public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) { - List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId) - .with(EmitContext.Type.PARTITION, getPartition(messageId)) - .with(EmitContext.Type.TASK_ID, taskId) - ); - _delegate.emitDirect(taskId, streamId, t, messageId); - } - - /** - * Emits a tuple to the specified task on the default output stream. This output - * stream must have been declared as a direct stream, and the specified task must - * use a direct grouping on this stream to receive the message. The emitted values must be - * immutable. - * - * @param taskId - * @param tuple - * @param messageId - */ - @Override - public void emitDirect(int taskId, List<Object> tuple, Object messageId) { - List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.PARTITION, getPartition(messageId)) - .with(EmitContext.Type.TASK_ID, taskId) - ); - _delegate.emitDirect(taskId, t, messageId); - } - - /** - * Emits a tuple to the specified task on the specified output stream. This output - * stream must have been declared as a direct stream, and the specified task must - * use a direct grouping on this stream to receive the message. The emitted values must be - * immutable. - * - * <p> Because no message id is specified, Storm will not track this message - * so ack and fail will never be called for this tuple. - * - * @param taskId - * @param streamId - * @param tuple - */ - @Override - public void emitDirect(int taskId, String streamId, List<Object> tuple) { - List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.STREAM_ID, streamId) - .with(EmitContext.Type.TASK_ID, taskId) - ); - _delegate.emitDirect(taskId, streamId, t); - } - - /** - * Emits a tuple to the specified task on the default output stream. This output - * stream must have been declared as a direct stream, and the specified task must - * use a direct grouping on this stream to receive the message. The emitted values must be - * immutable. - * - * <p> Because no message id is specified, Storm will not track this message - * so ack and fail will never be called for this tuple. - * - * @param taskId - * @param tuple - */ - @Override - public void emitDirect(int taskId, List<Object> tuple) { - - List<Object> t = _callback.apply(tuple, _context.cloneContext().with(EmitContext.Type.TASK_ID, taskId)); - _delegate.emitDirect(taskId, t); - } -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java deleted file mode 100644 index 21f675b..0000000 --- a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/CallbackKafkaSpout.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka; - -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; - -import java.lang.reflect.InvocationTargetException; -import java.util.*; - -public class CallbackKafkaSpout extends KafkaSpout { - static final long serialVersionUID = 0xDEADBEEFL; - Class<? extends Callback> callbackClazz; - Callback _callback; - EmitContext _context; - public CallbackKafkaSpout(SpoutConfig spoutConfig, String callbackClass) { - this(spoutConfig, toCallbackClass(callbackClass)); - } - - public CallbackKafkaSpout(SpoutConfig spoutConf, Class<? extends Callback> callback) { - super(spoutConf); - callbackClazz = callback; - } - - public void initialize(TopologyContext context) { - _callback = createCallback(callbackClazz); - _context = new EmitContext().with(EmitContext.Type.SPOUT_CONFIG, _spoutConfig) - .with(EmitContext.Type.UUID, context.getStormId()) - .with(EmitContext.Type.TOPIC, _spoutConfig.topic); - _callback.initialize(_context); - } - - - private static Class<? extends Callback> toCallbackClass(String callbackClass) { - try{ - return (Class<? extends Callback>) Callback.class.forName(callbackClass); - } - catch (ClassNotFoundException e) { - throw new RuntimeException(callbackClass + " not found", e); - } - } - - protected Callback createCallback(Class<? extends Callback> callbackClass) { - try { - return callbackClass.getConstructor().newInstance(); - } catch (InstantiationException | NoSuchMethodException | InvocationTargetException e) { - throw new RuntimeException("Unable to instantiate callback", e); - } catch (IllegalAccessException e) { - throw new RuntimeException("Illegal access", e); - } - } - - @Override - public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) { - if(_callback == null) { - initialize(context); - } - super.open( conf, context - , new CallbackCollector(_callback, collector - ,_context.cloneContext().with(EmitContext.Type.OPEN_CONFIG, conf) - .with(EmitContext.Type.TOPOLOGY_CONTEXT, context) - ) - ); - } - - @Override - public void close() { - super.close(); - if(_callback != null) { - try { - _callback.close(); - } catch (Exception e) { - throw new IllegalStateException("Unable to close callback", e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/EmitContext.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/EmitContext.java b/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/EmitContext.java deleted file mode 100644 index 434d884..0000000 --- a/metron-platform/metron-pcap/src/main/java/org/apache/storm/kafka/EmitContext.java +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka; - -import org.apache.storm.task.TopologyContext; - -import java.io.Serializable; -import java.util.EnumMap; -import java.util.Map; - -public class EmitContext implements Cloneable,Serializable { - static final long serialVersionUID = 0xDEADBEEFL; - - public enum Type{ - STREAM_ID(String.class) - ,TOPIC(String.class) - ,PARTITION(Integer.class) - ,TASK_ID(Integer.class) - ,UUID(String.class) - ,SPOUT_CONFIG(SpoutConfig.class) - ,OPEN_CONFIG(Map.class) - ,TOPOLOGY_CONTEXT(TopologyContext.class) - ; - Class<?> clazz; - Type(Class<?> clazz) { - this.clazz= clazz; - } - - public Class<?> clazz() { - return clazz; - } - } - public EmitContext() { - this(new EnumMap<>(Type.class)); - } - public EmitContext(EnumMap<Type, Object> context) { - _context = context; - } - private EnumMap<Type, Object> _context; - - public <T> EmitContext with(Type t, T o ) { - _context.put(t, t.clazz().cast(o)); - return this; - } - public <T> void add(Type t, T o ) { - with(t, o); - } - - public <T> T get(Type t) { - Object o = _context.get(t); - if(o == null) { - return null; - } - else { - return (T) o; - } - } - - public EmitContext cloneContext() { - try { - return (EmitContext)this.clone(); - } catch (CloneNotSupportedException e) { - throw new RuntimeException("Unable to clone emit context.", e); - } - } - - /** - * Creates and returns a copy of this object. The precise meaning - * of "copy" may depend on the class of the object. The general - * intent is that, for any object {@code x}, the expression: - * <blockquote> - * <pre> - * x.clone() != x</pre></blockquote> - * will be true, and that the expression: - * <blockquote> - * <pre> - * x.clone().getClass() == x.getClass()</pre></blockquote> - * will be {@code true}, but these are not absolute requirements. - * While it is typically the case that: - * <blockquote> - * <pre> - * x.clone().equals(x)</pre></blockquote> - * will be {@code true}, this is not an absolute requirement. - * - * By convention, the returned object should be obtained by calling - * {@code super.clone}. If a class and all of its superclasses (except - * {@code Object}) obey this convention, it will be the case that - * {@code x.clone().getClass() == x.getClass()}. - * - * By convention, the object returned by this method should be independent - * of this object (which is being cloned). To achieve this independence, - * it may be necessary to modify one or more fields of the object returned - * by {@code super.clone} before returning it. Typically, this means - * copying any mutable objects that comprise the internal "deep structure" - * of the object being cloned and replacing the references to these - * objects with references to the copies. If a class contains only - * primitive fields or references to immutable objects, then it is usually - * the case that no fields in the object returned by {@code super.clone} - * need to be modified. - * - * The method {@code clone} for class {@code Object} performs a - * specific cloning operation. First, if the class of this object does - * not implement the interface {@code Cloneable}, then a - * {@code CloneNotSupportedException} is thrown. Note that all arrays - * are considered to implement the interface {@code Cloneable} and that - * the return type of the {@code clone} method of an array type {@code T[]} - * is {@code T[]} where T is any reference or primitive type. - * Otherwise, this method creates a new instance of the class of this - * object and initializes all its fields with exactly the contents of - * the corresponding fields of this object, as if by assignment; the - * contents of the fields are not themselves cloned. Thus, this method - * performs a "shallow copy" of this object, not a "deep copy" operation. - * - * The class {@code Object} does not itself implement the interface - * {@code Cloneable}, so calling the {@code clone} method on an object - * whose class is {@code Object} will result in throwing an - * exception at run time. - * - * @return a clone of this instance. - * @throws CloneNotSupportedException if the object's class does not - * support the {@code Cloneable} interface. Subclasses - * that override the {@code clone} method can also - * throw this exception to indicate that an instance cannot - * be cloned. - * @see Cloneable - */ - @Override - protected Object clone() throws CloneNotSupportedException { - EmitContext context = new EmitContext(_context.clone()); - return context; - } -} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-solr/src/main/config/solr.properties ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/main/config/solr.properties b/metron-platform/metron-solr/src/main/config/solr.properties index d8dd25f..35f368c 100644 --- a/metron-platform/metron-solr/src/main/config/solr.properties +++ b/metron-platform/metron-solr/src/main/config/solr.properties @@ -22,7 +22,8 @@ indexing.executors=0 kafka.zk=node1:2181 kafka.broker=node1:6667 -kafka.start=WHERE_I_LEFT_OFF +# One of EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST +kafka.start=UNCOMMITTED_EARLIEST ##### Indexing ##### index.input.topic=indexing http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/pom.xml b/metron-platform/metron-storm-kafka/pom.xml new file mode 100644 index 0000000..62b844c --- /dev/null +++ b/metron-platform/metron-storm-kafka/pom.xml @@ -0,0 +1,128 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software + Foundation (ASF) under one or more contributor license agreements. See the + NOTICE file distributed with this work for additional information regarding + copyright ownership. The ASF licenses this file to You under the Apache License, + Version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.metron</groupId> + <artifactId>metron-platform</artifactId> + <version>0.3.1</version> + </parent> + <artifactId>metron-storm-kafka</artifactId> + <name>metron-storm-kafka</name> + <description>Components that extend the Storm/Kafka spout</description> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <commons.config.version>1.10</commons.config.version> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-kafka-client</artifactId> + <version>${global_storm_kafka_version}</version> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>${global_kafka_version}</version> + </dependency> + <dependency> + <groupId>org.apache.storm</groupId> + <artifactId>storm-core</artifactId> + <version>${global_storm_version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>servlet-api</artifactId> + <groupId>javax.servlet</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-over-slf4j</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-slf4j-impl</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.10</artifactId> + <version>${global_kafka_version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>com.sun.jmx</groupId> + <artifactId>jmxri</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jdmk</groupId> + <artifactId>jmxtools</artifactId> + </exclusion> + <exclusion> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-common</artifactId> + <version>${project.parent.version}</version> + </dependency> + </dependencies> + + <reporting> + <plugins> + <!-- Normally, dependency report takes time, skip it --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-project-info-reports-plugin</artifactId> + <version>2.7</version> + + <configuration> + <dependencyLocationsEnabled>false</dependencyLocationsEnabled> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>emma-maven-plugin</artifactId> + <version>1.0-alpha-3</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-pmd-plugin</artifactId> + <configuration> + <targetJdk>${global_java_version}</targetJdk> + </configuration> + </plugin> + </plugins> + </reporting> + <build> + <plugins> + </plugins> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java new file mode 100644 index 0000000..bf5250b --- /dev/null +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SimpleStormKafkaBuilder.java @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.storm.kafka.flux; + +import com.google.common.base.Joiner; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.metron.common.utils.KafkaUtils; +import org.apache.storm.kafka.spout.*; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.OutputFieldsGetter; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +/** + * This is a convenience layer on top of the KafkaSpoutConfig.Builder available in storm-kafka-client. + * The justification for this class is two-fold. First, there are a lot of moving parts and a simplified + * approach to constructing spouts is useful. Secondly, and perhaps more importantly, the Builder pattern + * is decidedly unfriendly to use inside of Flux. Finally, we can make things a bit more friendly by only requiring + * zookeeper and automatically figuring out the brokers for the bootstrap server. + * + * @param <K> The kafka key type + * @param <V> The kafka value type + */ +public class SimpleStormKafkaBuilder<K, V> extends KafkaSpoutConfig.Builder<K, V> { + final static String STREAM = "default"; + + /** + * The fields exposed by the kafka consumer. These will show up in the Storm tuple. + */ + public enum FieldsConfiguration { + KEY("key", record -> record.key()), + VALUE("value", record -> record.value()), + PARTITION("partition", record -> record.partition()), + TOPIC("topic", record -> record.topic()) + ; + String fieldName; + Function<ConsumerRecord,Object> recordExtractor; + + FieldsConfiguration(String fieldName, Function<ConsumerRecord,Object> recordExtractor) { + this.recordExtractor = recordExtractor; + this.fieldName = fieldName; + } + + /** + * Return a list of the enums + * @param configs + * @return + */ + public static List<FieldsConfiguration> toList(String... configs) { + List<FieldsConfiguration> ret = new ArrayList<>(); + for(String config : configs) { + ret.add(FieldsConfiguration.valueOf(config.toUpperCase())); + } + return ret; + } + + /** + * Return a list of the enums from their string representation. + * @param configs + * @return + */ + public static List<FieldsConfiguration> toList(List<String> configs) { + List<FieldsConfiguration> ret = new ArrayList<>(); + for(String config : configs) { + ret.add(FieldsConfiguration.valueOf(config.toUpperCase())); + } + return ret; + } + + /** + * Construct a Fields object from an iterable of enums. These fields are the fields + * exposed in the Storm tuple emitted from the spout. + * @param configs + * @return + */ + public static Fields getFields(Iterable<FieldsConfiguration> configs) { + List<String> fields = new ArrayList<>(); + for(FieldsConfiguration config : configs) { + fields.add(config.fieldName); + } + return new Fields(fields); + } + } + + /** + * Build a tuple given the fields and the topic. We want to use our FieldsConfiguration enum + * to define what this tuple looks like. + * @param <K> The key type in kafka + * @param <V> The value type in kafka + */ + public static class TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> { + private List<FieldsConfiguration> configurations; + private TupleBuilder(String topic, List<FieldsConfiguration> configurations) { + super(topic); + this.configurations = configurations; + } + + /** + * Builds a list of tuples using the ConsumerRecord specified as parameter + * + * @param consumerRecord whose contents are used to build tuples + * @return list of tuples + */ + @Override + public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) { + Values ret = new Values(); + for(FieldsConfiguration config : configurations) { + ret.add(config.recordExtractor.apply(consumerRecord)); + } + return ret; + } + } + + private String topic; + + /** + * Create an object with the specified properties. This will expose fields "key" and "value." + * @param kafkaProps The special kafka properties + * @param topic The kafka topic. TODO: In the future, support multiple topics and regex patterns. + * @param zkQuorum The zookeeper quorum. We will use this to pull the brokers from this. + */ + public SimpleStormKafkaBuilder( Map<String, Object> kafkaProps + , String topic + , String zkQuorum + ) + { + this(kafkaProps, topic, zkQuorum, Arrays.asList("key", "value")); + } + + /** + * Create an object with the specified properties and exposing the specified fields. + * @param kafkaProps The special kafka properties + * @param topic The kafka topic. TODO: In the future, support multiple topics and regex patterns. + * @param zkQuorum The zookeeper quorum. We will use this to pull the brokers from this. + * @param fieldsConfiguration The fields to expose in the storm tuple emitted. + */ + public SimpleStormKafkaBuilder( Map<String, Object> kafkaProps + , String topic + , String zkQuorum + , List<String> fieldsConfiguration + ) + { + super( modifyKafkaProps(kafkaProps, zkQuorum) + , createStreams(fieldsConfiguration, topic) + , createTuplesBuilder(fieldsConfiguration, topic) + ); + this.topic = topic; + } + + /** + * Get the kafka topic. TODO: In the future, support multiple topics and regex patterns. + * @return + */ + public String getTopic() { + return topic; + } + + /** + * Create a StormKafkaSpout from a given topic, zookeeper quorum and fields. Also, configure the spout + * using a Map that configures both kafka as well as the spout (see the properties in SpoutConfiguration). + * @param topic + * @param zkQuorum + * @param fieldsConfiguration + * @param kafkaProps The aforementioned map. + * @return + */ + public static <K, V> StormKafkaSpout<K, V> create( String topic + , String zkQuorum + , List<String> fieldsConfiguration + , Map<String, Object> kafkaProps + ) + { + Map<String, Object> spoutConfig = SpoutConfiguration.separate(kafkaProps); + + SimpleStormKafkaBuilder<K, V> builder = new SimpleStormKafkaBuilder<>(kafkaProps, topic, zkQuorum, fieldsConfiguration); + SpoutConfiguration.configure(builder, spoutConfig); + return new StormKafkaSpout<>(builder); + } + + private static Map<String, Object> modifyKafkaProps(Map<String, Object> props, String zkQuorum) { + try { + if(!props.containsKey(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS)) { + //this isn't a putIfAbsent because I only want to pull the brokers from zk if it's absent. + List<String> brokers = KafkaUtils.INSTANCE.getBrokersFromZookeeper(zkQuorum); + props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, Joiner.on(",").join(brokers)); + } + props.putIfAbsent(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, ByteArrayDeserializer.class.getName()); + props.putIfAbsent(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, ByteArrayDeserializer.class.getName()); + + } catch (Exception e) { + throw new IllegalStateException("Unable to retrieve brokers from zookeeper: " + e.getMessage(), e); + } + return props; + } + + private static <K,V> KafkaSpoutTuplesBuilder<K, V> createTuplesBuilder(List<String> config, String topic) { + TupleBuilder<K, V> tb = new TupleBuilder<K, V>(topic, FieldsConfiguration.toList(config)); + return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(tb).build(); + } + + + private static KafkaSpoutStreams createStreams(List<String> config, String topic) { + final Fields fields = FieldsConfiguration.getFields(FieldsConfiguration.toList(config)); + return new KafkaSpoutStreamsNamedTopics.Builder(fields, STREAM, new String[] { topic} ).build(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java new file mode 100644 index 0000000..6c0f148 --- /dev/null +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/SpoutConfiguration.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.storm.kafka.flux; + +import org.apache.metron.common.utils.ConversionUtils; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +/** + * To enable the configuration of spouts with a single map containing both kafka properties as well as spout properties, + * this enum exists to expose spout-specific configurations and expose utility functions to split the kafka bits from the + * spout-specific bits of configuration. + */ +public enum SpoutConfiguration { + /** + * The poll timeout for the kafka consumer in milliseconds + */ + POLL_TIMEOUT_MS("spout.pollTimeoutMs" + , container -> container.builder.setPollTimeoutMs(ConversionUtils.convert(container.value, Long.class)) + ) + /** + * The offset strategy to use. This can be one of + * * EARLIEST, + * * LATEST, + * * UNCOMMITTED_EARLIEST, + * * UNCOMMITTED_LATEST + */ + ,FIRST_POLL_OFFSET_STRATEGY("spout.firstPollOffsetStrategy" + , container -> container.builder.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.valueOf(container.value.toString())) + ) + /** + * The maximum number of retries + */ + ,MAX_RETRIES("spout.maxRetries" + , container -> container.builder.setMaxRetries(ConversionUtils.convert(container.value, Integer.class)) + ) + /** + * The maximum amount of uncommitted offsets + */ + ,MAX_UNCOMMITTED_OFFSETS("spout.maxUncommittedOffsets" + , container -> container.builder.setMaxUncommittedOffsets(ConversionUtils.convert(container.value, Integer.class)) + ) + /** + * The offset commit period in milliseconds + */ + ,OFFSET_COMMIT_PERIOD_MS("spout.offsetCommitPeriodMs" + , container -> container.builder.setOffsetCommitPeriodMs(ConversionUtils.convert(container.value, Long.class)) + ) + ; + private static class Container { + Map<String, Object> config; + KafkaSpoutConfig.Builder builder; + Object value; + public Container(Map<String, Object> config, KafkaSpoutConfig.Builder builder, Object value) { + this.config = config; + this.builder = builder; + this.value = value; + } + } + Consumer<Container> consumer; + public String key; + SpoutConfiguration(String key, Consumer<Container> consumer) { + this.consumer = consumer; + this.key = key; + } + + /** + * Split the spout-specific configuration from this Map. NOTE: This mutates the parameter and removes the spout-specific config. + * @param config + * @return The spout-specific configuration + */ + public static Map<String, Object> separate(Map<String, Object> config) { + Map<String, Object> ret = new HashMap<>(); + for(SpoutConfiguration spoutConfig : SpoutConfiguration.values()) { + if(config.containsKey(spoutConfig.key)) { + Object val = config.get(spoutConfig.key); + config.remove(spoutConfig.key); + ret.put(spoutConfig.key, val); + } + } + return ret; + } + + /** + * Configure a builder from a configuration. + * @param builder + * @param config + * @param <K> + * @param <V> + * @return + */ + public static <K, V> KafkaSpoutConfig.Builder configure( KafkaSpoutConfig.Builder<K, V> builder + , Map<String, Object> config + ) + { + for(SpoutConfiguration spoutConfig : SpoutConfiguration.values()) { + if(config.containsKey(spoutConfig.key)) { + Container container = new Container(config, builder, config.get(spoutConfig.key)); + spoutConfig.consumer.accept(container); + } + } + return builder; + } + + /** + * List all of the spout-specific and kafka configuration options. + * @return + */ + public static List<String> allOptions() { + List<String> ret = new ArrayList<>(); + for(SpoutConfiguration spoutConfig : SpoutConfiguration.values()) { + ret.add(spoutConfig.key); + } + ret.add(KafkaSpoutConfig.Consumer.GROUP_ID); + ret.add(KafkaSpoutConfig.Consumer.AUTO_COMMIT_INTERVAL_MS); + ret.add(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT); + return ret; + } +} http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/98dc7659/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java new file mode 100644 index 0000000..030348f --- /dev/null +++ b/metron-platform/metron-storm-kafka/src/main/java/org/apache/metron/storm/kafka/flux/StormKafkaSpout.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.storm.kafka.flux; + +import org.apache.kafka.common.errors.WakeupException; +import org.apache.log4j.Logger; +import org.apache.storm.kafka.spout.KafkaSpout; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; + +/** + * A thin wrapper atop the KafkaSpout to allow us to pass in the Builder rather than the SpoutConfig. + * This enables creating a simplified interface suitable for use in flux for this spout. + * @param <K> + * @param <V> + */ +public class StormKafkaSpout<K, V> extends KafkaSpout<K, V> { + private static final Logger LOG = Logger.getLogger(StormKafkaSpout.class); + protected KafkaSpoutConfig<K,V> _spoutConfig; + protected String _topic; + public StormKafkaSpout(SimpleStormKafkaBuilder<K,V> builder) { + super(builder.build()); + this._topic = builder.getTopic(); + this._spoutConfig = builder.build(); + } + + @Override + public void deactivate() { + try { + super.deactivate(); + } + catch(WakeupException we) { + //see https://issues.apache.org/jira/browse/STORM-2184 + LOG.warn("You can generally ignore these, as per https://issues.apache.org/jira/browse/STORM-2184 -- " + we.getMessage(), we); + } + } + + @Override + public void close() { + try { + super.close(); + } + catch(WakeupException we) { + //see https://issues.apache.org/jira/browse/STORM-2184 + LOG.warn("You can generally ignore these, as per https://issues.apache.org/jira/browse/STORM-2184 -- " + we.getMessage(), we); + } + } +}