[ 
https://issues.apache.org/jira/browse/KAFKA-5799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Juhong NamGung updated KAFKA-5799:
----------------------------------
    Description: 
I try to integrate Kafka with Apache Strom.
I want to get data from Kafka, using KafkaSpout in Apache Storm. 

To get data from Kafka using KafkaSpout, SpoutConfig-scheme must be setting. 
(Scheme is an interface that dictates how the ByteBuffer consumed from Kafka 
gets transformed into a storm tuple)
I want to get both key and value in Kafka, so I used to KafkaSpoutConfig 
‘KeyValueSchemeAsMultiScheme’.

KeyValueSchemeAsMultiScheme’s Constructor is as follows.
[^2.JPG]
But, as you can see in the picture, implementing classes of Interface 
KeyValueScheme are only StringKeyValueScheme.
[^1.JPG]

Using StringKeyValueShceme causes problems when importing Integer data from 
Kafka. Because StringKeyValueScheme deserialize Bytebuffer to String.

So I implement ByteArrayKeyValueScheme that deserialize ByteBuffer to ByteArray.
ByteArrayKeyValueScheme imports data as BtyeArray.
If you use ByteArrayKeyValueScheme, you can import data regardless of data type 
from Kafka without error.
(But, you should convert data type ByteArray to data type that you want(e.g. 
String, Integer...))

[^bakvs.JPG]
{code:java}
// Some comments here
import java.nio.ByteBuffer;
import java.util.List;

import org.apache.storm.kafka.KeyValueScheme;
import org.apache.storm.spout.RawScheme;
import org.apache.storm.tuple.Values;
import com.google.common.collect.ImmutableMap;

public class ByteArrayKeyValueScheme extends RawScheme implements 
KeyValueScheme {

        @Override
        public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer 
value) {
                // TODO Auto-generated method stub
                if (key == null) {
                        return deserialize(value);
                }
                Object keytuple = deserialize(key).get(0);
                Object valuetuple = deserialize(value).get(0);

                return new Values(ImmutableMap.of(keytuple, valuetuple));
        }
}
{code}




  was:
I try to integrate Kafka with Apache Strom.
I want to get data from Kafka, using KafkaSpout in Apache Storm. 

To get data from Kafka using KafkaSpout, SpoutConfig-scheme must be setting. 
(Scheme is an interface that dictates how the ByteBuffer consumed from Kafka 
gets transformed into a storm tuple)
I want to get both key and value in Kafka, so I used to KafkaSpoutConfig 
‘KeyValueSchemeAsMultiScheme’.

KeyValueSchemeAsMultiScheme’s Constructor is as follows.
[^2.JPG]
But, as you can see in the picture, implementing classes of Interface 
KeyValueScheme are only StringKeyValueScheme.
[^1.JPG]

Using StringKeyValueShceme causes problems when importing Integer data from 
Kafka. Because StringKeyValueScheme deserialize Bytebuffer to String.

So I implement ByteArrayKeyValueScheme that deserialize ByteBuffer to ByteArray.
ByteArrayKeyValueScheme imports data as BtyeArray.
If you use ByteArrayKeyValueScheme, you can import data regardless of data type 
from Kafka without error.
(But, you should convert data type ByteArray to data type that you want(e.g. 
String, Integer...))

[^bakvs.JPG]




> New KafkaSpoutConfig(Scheme)-ByteArrayKeyValueScheme
> ----------------------------------------------------
>
>                 Key: KAFKA-5799
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5799
>             Project: Kafka
>          Issue Type: New Feature
>    Affects Versions: 0.11.0.0
>         Environment: apache-storm 1.1.0
>            Reporter: Juhong NamGung
>            Priority: Minor
>         Attachments: 1.JPG, 2.JPG, bakvs.JPG
>
>
> I try to integrate Kafka with Apache Strom.
> I want to get data from Kafka, using KafkaSpout in Apache Storm. 
> To get data from Kafka using KafkaSpout, SpoutConfig-scheme must be setting. 
> (Scheme is an interface that dictates how the ByteBuffer consumed from Kafka 
> gets transformed into a storm tuple)
> I want to get both key and value in Kafka, so I used to KafkaSpoutConfig 
> ‘KeyValueSchemeAsMultiScheme’.
> KeyValueSchemeAsMultiScheme’s Constructor is as follows.
> [^2.JPG]
> But, as you can see in the picture, implementing classes of Interface 
> KeyValueScheme are only StringKeyValueScheme.
> [^1.JPG]
> Using StringKeyValueShceme causes problems when importing Integer data from 
> Kafka. Because StringKeyValueScheme deserialize Bytebuffer to String.
> So I implement ByteArrayKeyValueScheme that deserialize ByteBuffer to 
> ByteArray.
> ByteArrayKeyValueScheme imports data as BtyeArray.
> If you use ByteArrayKeyValueScheme, you can import data regardless of data 
> type from Kafka without error.
> (But, you should convert data type ByteArray to data type that you want(e.g. 
> String, Integer...))
> [^bakvs.JPG]
> {code:java}
> // Some comments here
> import java.nio.ByteBuffer;
> import java.util.List;
> import org.apache.storm.kafka.KeyValueScheme;
> import org.apache.storm.spout.RawScheme;
> import org.apache.storm.tuple.Values;
> import com.google.common.collect.ImmutableMap;
> public class ByteArrayKeyValueScheme extends RawScheme implements 
> KeyValueScheme {
>       @Override
>       public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer 
> value) {
>               // TODO Auto-generated method stub
>               if (key == null) {
>                       return deserialize(value);
>               }
>               Object keytuple = deserialize(key).get(0);
>               Object valuetuple = deserialize(value).get(0);
>               return new Values(ImmutableMap.of(keytuple, valuetuple));
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to