Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2762
  
    Hi @fhueske 
    Ok, I'll give a try.
    
    I've pushed an updated PR where Schema is now external. I'll work on the 
transformation part tomorrow.
    
    I had few issues while trying to accommodate your comments.
    1. When I try to to use DateTime as a value of one of the fields of a 
GenericRecord I receive the following exception:
    
    ```
    0    [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - 
class org.joda.time.DateTime is not a valid POJO type
    
    java.lang.RuntimeException: java.lang.InstantiationException
    
        at 
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:316)
        at 
org.apache.avro.specific.SpecificData.newRecord(SpecificData.java:332)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:173)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
        at 
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
        at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
        at 
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:230)
        at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
        at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
        at 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema.readRecord(AvroRowDeserializationSchema.java:77)
        at 
org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema.deserialize(AvroRowDeserializationSchema.java:70)
        at 
org.apache.flink.streaming.connectors.kafka.AvroDeserializationSchemaTest.deserializeRowWithComplexTypes(AvroDeserializationSchemaTest.java:117)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
        at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
        at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
    Caused by: java.lang.InstantiationException
        at 
sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:48)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
        at 
org.apache.avro.specific.SpecificData.newInstance(SpecificData.java:314)
        ... 43 more
    ```
    
    I am not how to serialize DateTime using Avro. Do you have any suggestions?
    
    2. You suggested that "creating a new ByteArrayInputStream and Decoder for 
each record is quite expensive". I don't see a way how to do this. The problem 
is that to create Decoder I need to pass an instance of an InputStream, but 
when an ByteArrayInputStream is created its content cannot be changed. 
Therefore a new instance of ByteArrayInputStream should be created for each 
message.
    
    I think I can create a different implementation of InputStream that allows 
to set a buffer (let's call it `MutableByteArrayInputStream`). Then we can 
create one instance of `MutableByteArrayInputStream` and just call 
`inputStream.setBuffer(message)` for every message that should be deserialized. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to