STORM-697: Fixed incorrect typing for offset
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/11194653 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/11194653 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/11194653 Branch: refs/heads/master Commit: 11194653f43ce0d4f65d0051270cd86f2191cbc5 Parents: 2f119c6 Author: matt.tieman <matt.tie...@inin.com> Authored: Tue Mar 3 16:42:31 2015 -0500 Committer: matt.tieman <matt.tie...@inin.com> Committed: Tue Mar 3 16:42:31 2015 -0500 ---------------------------------------------------------------------- external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java | 2 +- .../src/jvm/storm/kafka/MessageMetadataScheme.java | 2 +- .../jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java | 2 +- .../storm-kafka/src/jvm/storm/kafka/PartitionManager.java | 9 ++++++++- .../src/jvm/storm/kafka/StringMessageAndMetadataScheme.java | 2 +- .../storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java | 8 ++++---- 6 files changed, 16 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/11194653/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java index 9af49fe..17d0fb7 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java @@ -211,7 +211,7 @@ public class KafkaUtils { return tups; } - public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg, Partition partition, int offset) { + public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg, Partition partition, long offset) { Iterable<List<Object>> tups; ByteBuffer payload = msg.payload(); if (payload == null) { http://git-wip-us.apache.org/repos/asf/storm/blob/11194653/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java index d0dd2be..da7acbf 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java +++ b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java @@ -21,5 +21,5 @@ import backtype.storm.spout.Scheme; * limitations under the License. */ public interface MessageMetadataScheme extends Scheme { - public List<Object> deserializeMessageWithMetadata(byte[] message, Partition partition, int offset); + public List<Object> deserializeMessageWithMetadata(byte[] message, Partition partition, long offset); } http://git-wip-us.apache.org/repos/asf/storm/blob/11194653/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java index 6226676..5eb20b5 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java +++ b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java @@ -14,7 +14,7 @@ public class MessageMetadataSchemeAsMultiScheme extends SchemeAsMultiScheme { } @SuppressWarnings("unchecked") - public Iterable<List<Object>> deserializeMessageWithMetadata(byte[] message, Partition partition, int offset) { + public Iterable<List<Object>> deserializeMessageWithMetadata(byte[] message, Partition partition, long offset) { List<Object> o = ((MessageMetadataScheme) scheme).deserializeMessageWithMetadata(message, partition, offset); if (o == null) { return null; http://git-wip-us.apache.org/repos/asf/storm/blob/11194653/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java index 63e70cf..e1186e3 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java @@ -134,7 +134,14 @@ public class PartitionManager { if (toEmit == null) { return EmitState.NO_EMITTED; } - Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg); + + Iterable<List<Object>> tups; + if (_spoutConfig.tupleMetaData) { + tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg, _partition, toEmit.offset); + } else { + tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg); + } + if (tups != null) { for (List<Object> tup : tups) { collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); http://git-wip-us.apache.org/repos/asf/storm/blob/11194653/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java b/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java index 262a27c..2dc4c02 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java +++ b/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java @@ -12,7 +12,7 @@ public class StringMessageAndMetadataScheme extends StringScheme implements Mess public static final String STRING_SCHEME_OFFSET = "offset"; @Override - public List<Object> deserializeMessageWithMetadata(byte[] message, Partition partition, int offset) { + public List<Object> deserializeMessageWithMetadata(byte[] message, Partition partition, long offset) { String stringMessage = StringScheme.deserializeString(message); return new Values(stringMessage, partition.partition, offset); } http://git-wip-us.apache.org/repos/asf/storm/blob/11194653/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java index a7c9b2b..362d721 100644 --- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java +++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java @@ -176,7 +176,7 @@ public class KafkaUtilsTest { String value = "value"; Partition mockPartition = Mockito.mock(Partition.class); mockPartition.partition = 0; - int offset = 0; + long offset = 0L; config.scheme = new MessageMetadataSchemeAsMultiScheme(new StringMessageAndMetadataScheme()); config.tupleMetaData = true; @@ -187,8 +187,8 @@ public class KafkaUtilsTest { Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, msg.message(), mockPartition, offset); List<Object> values = lists.iterator().next(); assertEquals("Message is incorrect", value, values.get(0)); - assertEquals("Offset is incorrect", offset, values.get(1)); - assertEquals("Partition is incorrect", mockPartition.partition, values.get(2)); + assertEquals("Partition is incorrect", mockPartition.partition, values.get(1)); + assertEquals("Offset is incorrect", offset, values.get(2)); } } @@ -197,7 +197,7 @@ public class KafkaUtilsTest { String value = "value"; Partition mockPartition = Mockito.mock(Partition.class); mockPartition.partition = 0; - int offset = 0; + Long offset = 0L; config.scheme = new SchemeAsMultiScheme(new StringScheme()); config.tupleMetaData = true;