Hi,
Unfortunately I found another mysterious issue. If I compile the generated
code using Janino, the readObject method will not triggered after an object
of that class is deserialized. In case I am compiling the class using
Oracle JDK, the readObject method will be triggered as expected. Did any of
you see similar phenomenon? What would be the best way to debug this?
I have attached the generated Java code.
Thanks in advance,
Gábor
On 9 May 2016 at 20:41, Gábor Horváth <[email protected]> wrote:
> Hi,
>
> Thank you for your support, I was able to solve this issue :)
>
> Gábor
>
> On 9 May 2016 at 12:15, Aljoscha Krettek <[email protected]> wrote:
>
>> Hi Gábor,
>> I used it, yes, but I never encountered such a problem. Let's hope that
>> the
>> error message Márton discovered helps. :-)
>>
>> Cheers,
>> Aljoscha
>>
>> On Mon, 9 May 2016 at 11:38 Márton Balassi <[email protected]>
>> wrote:
>>
>> > Hi Gabor,
>> >
>> > I have checked out your branch and tried debugging WordCountPojo to
>> > reproduce the behaviour. I am on a Mac with jdk1.8.0_91. I have received
>> > the following error when trying to access the constructors of the class
>> in
>> > question:
>> >
>> > Exception in thread "main" java.lang.VerifyError: (class:
>> >
>> >
>> org/apache/flink/api/java/typeutils/runtime/generated/Word_GeneratedSerializer,
>> > method: deserialize signature:
>> >
>> >
>> (Lorg/apache/flink/core/memory/DataInputView;)Lorg/apache/flink/examples/java/wordcount/WordCountPojo$Word;)
>> > Register 3 contains wrong type
>> > at java.lang.Class.getDeclaredConstructors0(Native Method)
>> > at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
>> > at java.lang.Class.getConstructors(Class.java:1651)
>> > at
>> >
>> >
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializerGenerator.createSerializer(PojoSerializerGenerator.java:57)
>> > at
>> >
>> >
>> org.apache.flink.api.java.typeutils.PojoTypeInfo.createSerializer(PojoTypeInfo.java:306)
>> > at org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
>> > at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>> > at
>> >
>> >
>> org.apache.flink.examples.java.wordcount.WordCountPojo.main(WordCountPojo.java:119)
>> > Disconnected from the target VM, address: '127.0.0.1:52140', transport:
>> > 'socket'
>> >
>> > I hope this helps.
>> >
>> > Marton
>> >
>> > On Sun, May 8, 2016 at 2:59 PM, Gábor Horváth <[email protected]>
>> wrote:
>> >
>> > > Hi!
>> > >
>> > > I have created a proof of concept implementation of my GSoC project
>> [1]:
>> > > introducing code generation to the serializers. The code is available
>> > here
>> > > [2]. Unfortunately I have run into a problem that I am unable to
>> debug. I
>> > > generated some code that I compiled using the Janino compiler [4]. I
>> did
>> > > not get any exception during the compilation, and I was able to get
>> the
>> > > Class from the compiler. Unfortunately I am unable to get the list of
>> the
>> > > constructors from the Class and can not debug what is the source of
>> the
>> > > problem. There are no exceptions thrown, no errors displayed in the
>> event
>> > > log, no fatal error logfile generated by the JVM, but the process
>> > > terminates (at line [3]). I suspect that the Class generated by the
>> > Janino
>> > > compiler is invalid, but it does not emit any exceptions or warnings.
>> I
>> > am
>> > > using Oracle JDK 8, on Arch Linux. Have you seen any similar problem?
>> Is
>> > > there a way to debug a situation like this? Is there a way to get
>> extra
>> > > diagnostics from the Janino compiler?
>> > >
>> > > @aljoscha: you have some experience with the Janino compiler, what do
>> you
>> > > think?
>> > >
>> > > Thanks in advance,
>> > > Gábor
>> > >
>> > > [1] https://issues.apache.org/jira/browse/FLINK-3599
>> > > [2] https://github.com/Xazax-hun/flink/commits/serializer_codegen
>> > > [3]
>> > >
>> > >
>> >
>> https://github.com/Xazax-hun/flink/commit/af499d5bebe5c1dba6b970977852318346636a8f#diff-7a2080515bac95cec58032655867d6cfR57
>> > > [4] http://unkrig.de/w/Janino
>> > >
>> >
>>
>
>
package org.apache.flink.api.java.typeutils.runtime.generated;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import static org.apache.flink.util.Preconditions.checkNotNull;
public class Word_GeneratedSerializer extends TypeSerializer<org.apache.flink.examples.java.wordcount.WordCountPojo.Word> {
private static byte IS_NULL = 1;
private static byte NO_SUBCLASS = 2;
private static byte IS_SUBCLASS = 4;
private static byte IS_TAGGED_SUBCLASS = 8;
private static final long serialVersionUID = 1L;
private int numFields;
private transient Field[] fields;
private ExecutionConfig executionConfig;
private transient Map<Class<?>, TypeSerializer<?>> subclassSerializerCache;
private transient ClassLoader cl;
Class<org.apache.flink.examples.java.wordcount.WordCountPojo.Word> clazz;
final org.apache.flink.api.common.typeutils.base.IntSerializer f0;
final org.apache.flink.api.common.typeutils.base.StringSerializer f1;
public Word_GeneratedSerializer(Class<org.apache.flink.examples.java.wordcount.WordCountPojo.Word> clazz, TypeSerializer<?>[] serializerFields, Field[] reflectiveFields, ExecutionConfig e) {
this.clazz = clazz;
executionConfig = e;
cl = Thread.currentThread().getContextClassLoader();
this.numFields = reflectiveFields.length;
this.fields = reflectiveFields;
f0 = (org.apache.flink.api.common.typeutils.base.IntSerializer)serializerFields[0];
f1 = (org.apache.flink.api.common.typeutils.base.StringSerializer)serializerFields[1];
for (int i = 0; i < numFields; i++) {
this.fields[i].setAccessible(true);
}
}
public boolean isImmutableType() { return false; }
private void writeObject(ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
out.writeInt(fields.length);
for (Field field: fields) {
out.writeObject(field.getDeclaringClass());
out.writeUTF(field.getName());
}
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
cl = Thread.currentThread().getContextClassLoader();
in.defaultReadObject();
int numFields = in.readInt();
fields = new Field[numFields];
for (int i = 0; i < numFields; i++) {
Class<?> clazz = (Class<?>)in.readObject();
String fieldName = in.readUTF();
fields[i] = null;
while(clazz != null) {
try {
fields[i] = clazz.getDeclaredField(fieldName);
fields[i].setAccessible(true);
break;
} catch (NoSuchFieldException e) {
clazz = clazz.getSuperclass();
}
}
}
}
public Word_GeneratedSerializer duplicate() {
return this;
}
public org.apache.flink.examples.java.wordcount.WordCountPojo.Word createInstance() {
if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) {
return null;
}
try {
org.apache.flink.examples.java.wordcount.WordCountPojo.Word t = (org.apache.flink.examples.java.wordcount.WordCountPojo.Word)clazz.newInstance();
fields[0].set(t, f0.createInstance());
fields[1].set(t, f1.createInstance());
return t;
}
catch (Exception e) {
throw new RuntimeException("Cannot instantiate class.", e);
}
}
public org.apache.flink.examples.java.wordcount.WordCountPojo.Word copy(Object from) {
if (from == null) return null;
Class<?> actualType = from.getClass();
org.apache.flink.examples.java.wordcount.WordCountPojo.Word target;
try {
target = (org.apache.flink.examples.java.wordcount.WordCountPojo.Word) from.getClass().newInstance();
}
catch (Throwable t) {
throw new RuntimeException("Cannot instantiate class.", t);
}
try {
Object value;
value = fields[0].get(from);
if (value != null) {
fields[0].set(target, f0.copy(value));
} else {
fields[0].set(target, null);
}
value = fields[1].get(from);
if (value != null) {
fields[1].set(target, f1.copy(value));
} else {
fields[1].set(target, null);
}
} catch (IllegalAccessException e) {
throw new RuntimeException("Error during POJO copy.");
}
return target;
}
public org.apache.flink.examples.java.wordcount.WordCountPojo.Word copy(Object from, Object resuse) {
if (from == null) return null;
return copy(from);
}
public int getLength() {
return -1;
}
public void serialize(Object value, DataOutputView target) throws IOException {
int flags = 0;
if (value == null) {
flags |= IS_NULL;
target.writeByte(flags);
return;
}
Integer subclassTag = -1;
Class<?> actualClass = value.getClass();
TypeSerializer subclassSerializer = null;
if (clazz != actualClass) {
// TODO
} else {
flags |= NO_SUBCLASS;
}
target.writeByte(flags);
if ((flags & IS_SUBCLASS) != 0) {
target.writeUTF(actualClass.getName());
} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
target.writeByte(subclassTag);
}
if ((flags & NO_SUBCLASS) != 0) {
try {
Object o;
o = fields[0].get(value);
if (o == null) {
target.writeBoolean(true);
} else {
target.writeBoolean(false);
f0.serialize(o, target);
}
o = fields[1].get(value);
if (o == null) {
target.writeBoolean(true);
} else {
target.writeBoolean(false);
f1.serialize(o, target);
}
} catch (IllegalAccessException e) {
throw new RuntimeException("Error during POJO copy, this should" +
"not happen since we check the fields before.");
}
} else {
// TOOD
}
}
public org.apache.flink.examples.java.wordcount.WordCountPojo.Word deserialize(DataInputView source) throws IOException {
int flags = source.readByte();
if((flags & IS_NULL) != 0) {
return null;
}
org.apache.flink.examples.java.wordcount.WordCountPojo.Word target = null;
if ((flags & IS_SUBCLASS) != 0) {
// TODO
} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
// TODO
} else {
target = createInstance();
}
if ((flags & NO_SUBCLASS) != 0) {
try {
boolean isNull;
isNull = source.readBoolean();
if (isNull) {
fields[0].set(target, null);
} else {
fields[0].set(target, f0.deserialize(source));
}
isNull = source.readBoolean();
if (isNull) {
fields[1].set(target, null);
} else {
fields[1].set(target, f1.deserialize(source));
}
} catch (IllegalAccessException e) {
throw new RuntimeException("Error during POJO copy, this should not happen" +
"since we check the fieldsbefore.");
}
} else {
//TODO
}
return target;
}
public org.apache.flink.examples.java.wordcount.WordCountPojo.Word deserialize(Object reuse, DataInputView source) throws IOException {
return deserialize(source);
}
public void copy(DataInputView source, DataOutputView target) throws IOException {
int flags = source.readByte();
target.writeByte(flags);
if ((flags & IS_NULL) != 0) {
return;
}
if ((flags & IS_SUBCLASS) != 0) {
// TODO
} else if ((flags & IS_TAGGED_SUBCLASS) != 0) {
// TODO
}
if ((flags & NO_SUBCLASS) != 0) {
boolean isNull;
isNull = source.readBoolean();
target.writeBoolean(isNull);
if (!isNull) {
f0.copy(source, target);
}
isNull = source.readBoolean();
target.writeBoolean(isNull);
if (!isNull) {
f1.copy(source, target);
}
} else {
// TODO
}
}
public boolean equals(Object obj) {
if (obj instanceof Word_GeneratedSerializer) {
Word_GeneratedSerializer other = (Word_GeneratedSerializer)obj;
return other.canEqual(this) && this.clazz == other.clazz && this.numFields == other.numFields
&& Objects.equals(this.f0, other.f0) && Objects.equals(this.f1, other.f1) ;
} else {
return false;
}
}
public boolean canEqual(Object obj) { return obj instanceof Word_GeneratedSerializer; }
public int hashCode() {
return Objects.hash(clazz, numFields, f0, f1);
}
}