Abhishek Tiwari created GOBBLIN-57:
--------------------------------------
Summary: Kafka Converter ClassCastException
Key: GOBBLIN-57
URL: https://issues.apache.org/jira/browse/GOBBLIN-57
Project: Apache Gobblin
Issue Type: Bug
Reporter: Abhishek Tiwari
Hi,
I've implemented an instance of Converter to convert messages from a byte array
to an instance of String.
```
public class ByteArrayToStringConverter extends Converter<byte[],
Class<String>, byte[], String> {
@Override
public Class<String> convertSchema(byte[] bytes, WorkUnitState
workUnitState) throws SchemaConversionException {
return String.class;
}
@Override
public Iterable<String> convertRecord(Class<String> stringClass, byte[]
bytes, WorkUnitState workUnitState) throws DataConversionException {
return new SingleRecordIterable<>(new String(bytes));
}
}
```
The configuration file I've used for running the migration from Kafka to my
local Fs is as follows:
```
job.name=GobblinKafkaQuickStart
job.group=GobblinKafka
job.description=Gobblin quick start job for Kafka
job.lock.enabled=false
kafka.brokers=localhost:9092
kafka.deserializer.type=BYTE_ARRAY
source.class=gobblin.source.extractor.extract.kafka.KafkaDeserializerSource
extract.namespace=gobblin.extract.kafka
writer.builder.class=gobblin.writer.SimpleDataWriterBuilder
writer.file.path.type=tablename
writer.destination.type=HDFS
writer.output.format=txt
converter.classes=gobblin.converter.string.ByteArrayToStringConverter
data.publisher.type=gobblin.publisher.BaseDataPublisher
mr.job.max.mappers=16
metrics.reporting.file.enabled=true
metrics.log.dir=$àenv:GOBBLIN_WORK_DIRè/metrics
metrics.reporting.file.suffix=txt
bootstrap.with.offset=earliest
```
However, when I start the job I am given an exception in the convertSchema
function of the ByteArrayToStringConverter class claiming that String cannot be
cast to a byte array. I assume that the issue occurs because Gobblin passes to
the convertSchema function a String instead of a byte array, although I have
specified otherwise in the job's configuration file by using
kafka.deserializer.type.
Any hint onto what might be going on and how can I use the
ByteArrayToStringConverter to convert a byte array to a String?
Thanks in advance!
Dominik
*Github Url* : https://github.com/linkedin/gobblin/issues/1557
*Github Reporter* : *dsafaric*
*Github Created At* : 2017-01-18T16:00:17Z
*Github Updated At* : 2017-04-14T23:14:10Z
h3. Comments
----
*ydai1124* wrote on 2017-04-14T23:13:20Z : Hi,
Did you solve your issue? Can you try to make the following changes your
converter:
```
public class ByteArrayToStringConverter extends Converter<String, String,
byte[], String> {
@Override
public String convertSchema(String inputSchema, WorkUnitState
workUnitState) throws SchemaConversionException {
return inputSchema;
}
@Override
public Iterable<String> convertRecord(String outputSchema, byte[] bytes,
WorkUnitState workUnitState) throws DataConversionException {
...
```
*Github Url* :
https://github.com/linkedin/gobblin/issues/1557#issuecomment-294253812
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)