Github user mushketyk commented on the issue:
https://github.com/apache/flink/pull/2762
Hi @fhueske
Ok, I'll give a try.
I've pushed an updated PR where Schema is now external. I'll work on the
transformation part tomorrow.
I had few issues while trying to accommodate your comments.
1. When I try to to use DateTime as a value of one of the fields of a
GenericRecord I receive the following exception:
```
0 [main] INFO org.apache.flink.api.java.typeutils.TypeExtractor -
class org.joda.time.DateTime is not a valid POJO type
java.lang.RuntimeException: java.lang.InstantiationException
at
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:316)
at
org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:332)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:173)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
at
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
at
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema.readRecord(AvroRowDeserializationSchema.java:77)
at
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:70)
at
org.apache.flink.streaming.connectors.kafka.AvroDeserializationSchemaTest.deserializeRowWithComplexTypes(AvroDeserializationSchemaTest.java:117)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
at
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.lang.InstantiationException
at
sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:314)
... 43 more
```
I am not how to serialize DateTime using Avro. Do you have any suggestions?
2. You suggested that "creating a new ByteArrayInputStream and Decoder for
each record is quite expensive". I don't see a way how to do this. The problem
is that to create Decoder I need to pass an instance of an InputStream, but
when an ByteArrayInputStream is created its content cannot be changed.
Therefore a new instance of ByteArrayInputStream should be created for each
message.
I think I can create a different implementation of InputStream that allows
to set a buffer (let's call it `MutableByteArrayInputStream`). Then we can
create one instance of `MutableByteArrayInputStream` and just call
`inputStream.setBuffer(message)` for every message that should be deserialized.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---