[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15476185#comment-15476185 ]
ASF GitHub Bot commented on FLINK-3599: --------------------------------------- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r78138160 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField; +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField; + +public final class PojoSerializerGenerator<T> { + private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated"; + + private final Class<T> clazz; + private final Field[] refFields; + private final TypeSerializer<?>[] fieldSerializers; + private final ExecutionConfig config; + private String code; + + public PojoSerializerGenerator( + Class<T> clazz, + TypeSerializer<?>[] fields, + Field[] reflectiveFields, + ExecutionConfig config) { + this.clazz = checkNotNull(clazz); + this.refFields = checkNotNull(reflectiveFields); + this.fieldSerializers = checkNotNull(fields); + this.config = checkNotNull(config); + for (int i = 0; i < this.refFields.length; i++) { + this.refFields[i].setAccessible(true); + } + } + + public TypeSerializer<T> createSerializer() { + final String className = clazz.getCanonicalName().replace('.', '_') + "_GeneratedSerializer"; + final String fullClassName = packageName + "." + className; + Class<?> serializerClazz; + code = InstantiationUtil.getCodeForCachedClass(fullClassName); + if (code == null) { + generateCode(className); + } + if(config.isWrapGeneratedClassesEnabled()) { + return new GenTypeSerializerProxy<>(clazz, fullClassName, code, fieldSerializers, config); + } + try { + serializerClazz = InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code); + } + catch (Exception e) { + throw new RuntimeException("Unable to generate serializer: " + className, e); + } + Constructor<?>[] ctors = serializerClazz.getConstructors(); + assert ctors.length == 1; + try { + return (TypeSerializer<T>) ctors[0].newInstance(new Object[]{clazz, fieldSerializers, config}); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate serializer: " + className, e); + } + + } + + private void generateCode(String className) { + assert fieldSerializers.length > 0; + String typeName = clazz.getCanonicalName(); + StringBuilder members = new StringBuilder(); + for (int i = 0; i < fieldSerializers.length; ++i) { + members.append(String.format("final TypeSerializer f%d;\n", i)); + } + StringBuilder initMembers = new StringBuilder(); + for (int i = 0; i < fieldSerializers.length; ++i) { + initMembers.append(String.format("f%d = serializerFields[%d];\n", i, i)); + } + StringBuilder createFields = new StringBuilder(); + for (int i = 0; i < fieldSerializers.length; ++i) { + createFields.append(String.format("((" + typeName + ")t)." + modifyStringForField(refFields[i], + "f%d.createInstance()") + ";\n", i)); + } + StringBuilder copyFields = new StringBuilder(); + copyFields.append("Object value;\n"); + for (int i = 0; i < fieldSerializers.length; ++i) { + if (refFields[i].getType().isPrimitive()) { + copyFields.append(String.format("((" + typeName + ")target)." + modifyStringForField(refFields[i], + "f%d.copy(((" + typeName + ")from)." + accessStringForField(refFields[i])) + ");\n", i)); + } else { + copyFields.append(String.format( + "value = ((" + typeName + ")from)." + accessStringForField(refFields[i]) + ";\n" + + "if (value != null) {\n" + + " ((" + typeName + ")target)." + modifyStringForField(refFields[i], "f%d.copy(value)") + ";\n" + + "} else {\n" + + " ((" + typeName + ")target)." + modifyStringForField(refFields[i], "null") + ";\n" + + "}\n", i)); + } + } + StringBuilder reuseCopyFields = new StringBuilder(); + reuseCopyFields.append("Object value;\n"); + reuseCopyFields.append("Object reuseValue;\n"); + reuseCopyFields.append("Object copy;\n"); + for (int i = 0; i < fieldSerializers.length; ++i) { + if (refFields[i].getType().isPrimitive()) { + reuseCopyFields.append(String.format("((" + typeName + ")reuse)." + modifyStringForField(refFields[i], + "f%d.copy(((" + typeName + ")from)." + accessStringForField(refFields[i]) + ")") + ";\n", i)); + } else { + reuseCopyFields.append(String.format( + "value = ((" + typeName + ")from)." + accessStringForField(refFields[i]) + ";\n" + + "if (value != null) {\n" + + " reuseValue = ((" + typeName + ")reuse)." + accessStringForField(refFields[i]) + ";\n" + + " if (reuseValue != null) {\n" + --- End diff -- However, in the primitive case, you don't need to call the `copy` method of `f%d`, you can just do an assignment. (Generally, the main purpose of `copy` is to solve the problem of how to make deep copies.) > GSoC: Code Generation in Serializers > ------------------------------------ > > Key: FLINK-3599 > URL: https://issues.apache.org/jira/browse/FLINK-3599 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System > Reporter: Márton Balassi > Assignee: Gabor Horvath > Labels: gsoc2016, mentor > > The current implementation of the serializers can be a > performance bottleneck in some scenarios. These performance problems were > also reported on the mailing list recently [1]. > E.g. the PojoSerializer uses reflection for accessing the fields, which is > slow [2]. > For the complete proposal see [3]. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369 > [3] > https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk -- This message was sent by Atlassian JIRA (v6.3.4#6332)