STORM-697: Review feedback: Removed tupleMetaData from SpoutConfig, used instanceof check on spout scheme to determine if tuples should be generated with meta data
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4d79d9b3 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4d79d9b3 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4d79d9b3 Branch: refs/heads/master Commit: 4d79d9b3b66bd381d6654e68fb418d2efad8e922 Parents: 47f4a3c Author: Matt Tieman <matt.tie...@inin.com> Authored: Fri Oct 23 23:47:13 2015 -0400 Committer: Matt Tieman <matt.tie...@inin.com> Committed: Fri Oct 23 23:47:13 2015 -0400 ---------------------------------------------------------------------- .../src/jvm/storm/kafka/KafkaConfig.java | 4 ++-- .../src/jvm/storm/kafka/KafkaUtils.java | 11 ++------- .../MessageMetadataSchemeAsMultiScheme.java | 1 - .../src/jvm/storm/kafka/PartitionManager.java | 8 +++++-- .../kafka/trident/TridentKafkaEmitter.java | 4 ++-- .../src/test/storm/kafka/KafkaUtilsTest.java | 25 ++------------------ 6 files changed, 14 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java index ea0e421..49c7526 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java @@ -23,7 +23,8 @@ import backtype.storm.spout.RawMultiScheme; import java.io.Serializable; public class KafkaConfig implements Serializable { - + private static final long serialVersionUID = 5276718734571623855L; + public final BrokerHosts hosts; public final String topic; public final String clientId; @@ -38,7 +39,6 @@ public class KafkaConfig implements Serializable { public long maxOffsetBehind = Long.MAX_VALUE; public boolean useStartOffsetTimeIfOffsetOutOfRange = true; public int metricsTimeBucketSizeInSecs = 60; - public boolean tupleMetaData = false; // True to generate tuples from MessageAndRealOffset, false to generate only from the message public KafkaConfig(BrokerHosts hosts, String topic) { this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId()); http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java index 340f629..2e047b3 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java @@ -208,19 +208,12 @@ public class KafkaUtils { return tups; } - public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg, Partition partition, long offset) { - Iterable<List<Object>> tups; + public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, Partition partition, long offset) { ByteBuffer payload = msg.payload(); if (payload == null) { return null; } - - if (kafkaConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) { - tups = ((MessageMetadataSchemeAsMultiScheme) kafkaConfig.scheme).deserializeMessageWithMetadata(Utils.toByteArray(payload), partition, offset); - } else { - tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload)); - } - return tups; + return scheme.deserializeMessageWithMetadata(Utils.toByteArray(payload), partition, offset); } http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java index dcdbf8b..e89e391 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java +++ b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java @@ -12,7 +12,6 @@ public class MessageMetadataSchemeAsMultiScheme extends SchemeAsMultiScheme { super(scheme); } - @SuppressWarnings("unchecked") public Iterable<List<Object>> deserializeMessageWithMetadata(byte[] message, Partition partition, long offset) { List<Object> o = ((MessageMetadataScheme) scheme).deserializeMessageWithMetadata(message, partition, offset); if (o == null) { http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java index 980ed8b..39e42ed 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java @@ -23,12 +23,16 @@ import backtype.storm.metric.api.CountMetric; import backtype.storm.metric.api.MeanReducer; import backtype.storm.metric.api.ReducedMetric; import backtype.storm.spout.SpoutOutputCollector; + import com.google.common.collect.ImmutableMap; + import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.MessageAndOffset; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import storm.kafka.KafkaSpout.EmitState; import storm.kafka.KafkaSpout.MessageAndRealOffset; import storm.kafka.trident.MaxMetric; @@ -137,8 +141,8 @@ public class PartitionManager { } Iterable<List<Object>> tups; - if (_spoutConfig.tupleMetaData) { - tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg, _partition, toEmit.offset); + if (_spoutConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) { + tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.msg, _partition, toEmit.offset); } else { tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg); } http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java index 5ac5709..39aac1a 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java +++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java @@ -181,8 +181,8 @@ public class TridentKafkaEmitter { private void emit(TridentCollector collector, Message msg, Partition partition, long offset) { Iterable<List<Object>> values; - if (_config.tupleMetaData) { - values = KafkaUtils.generateTuples(_config, msg, partition, offset); + if (_config.scheme instanceof MessageMetadataSchemeAsMultiScheme) { + values = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _config.scheme, msg, partition, offset); } else { values = KafkaUtils.generateTuples(_config, msg); } http://git-wip-us.apache.org/repos/asf/storm/blob/4d79d9b3/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java index 8f4343e..65e8d2b 100644 --- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java +++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java @@ -180,39 +180,18 @@ public class KafkaUtilsTest { mockPartition.partition = 0; long offset = 0L; - config.scheme = new MessageMetadataSchemeAsMultiScheme(new StringMessageAndMetadataScheme()); - config.tupleMetaData = true; + MessageMetadataSchemeAsMultiScheme scheme = new MessageMetadataSchemeAsMultiScheme(new StringMessageAndMetadataScheme()); createTopicAndSendMessage(null, value); ByteBufferMessageSet messageAndOffsets = getLastMessage(); for (MessageAndOffset msg : messageAndOffsets) { - Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), mockPartition, offset); + Iterable<List<Object>> lists = KafkaUtils.generateTuples(scheme, msg.message(), mockPartition, offset); List<Object> values = lists.iterator().next(); assertEquals("Message is incorrect", value, values.get(0)); assertEquals("Partition is incorrect", mockPartition.partition, values.get(1)); assertEquals("Offset is incorrect", offset, values.get(2)); } } - - @Test - public void generateTuplesWithValueSchemeAndMessageAndMetadata() { - String value = "value"; - Partition mockPartition = Mockito.mock(Partition.class); - mockPartition.partition = 0; - Long offset = 0L; - - config.scheme = new SchemeAsMultiScheme(new StringScheme()); - config.tupleMetaData = true; - - createTopicAndSendMessage(null, value); - ByteBufferMessageSet messageAndOffsets = getLastMessage(); - for (MessageAndOffset msg : messageAndOffsets) { - Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), mockPartition, offset); - List<Object> values = lists.iterator().next(); - assertEquals("Incorrect number of tuple values", 1, values.size()); - assertEquals("Message is incorrect", value, values.get(0)); - } - } private ByteBufferMessageSet getLastMessage() { long offsetOfLastMessage = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime()) - 1;