[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15474979#comment-15474979
 ] 

ASF GitHub Bot commented on FLINK-3599:
---------------------------------------

Github user Xazax-hun commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78087704
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java
 ---
    @@ -0,0 +1,259 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *  http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Constructor;
    +import java.lang.reflect.Field;
    +import java.lang.reflect.Modifier;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
    +import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField;
    +
    +public final class PojoSerializerGenerator<T> {
    +   private static final String packageName = 
"org.apache.flink.api.java.typeutils.runtime.generated";
    +
    +   private final Class<T> clazz;
    +   private final Field[] refFields;
    +   private final TypeSerializer<?>[] fieldSerializers;
    +   private final ExecutionConfig config;
    +   private String code;
    +
    +   public PojoSerializerGenerator(
    +           Class<T> clazz,
    +           TypeSerializer<?>[] fields,
    +           Field[] reflectiveFields,
    +           ExecutionConfig config) {
    +           this.clazz = checkNotNull(clazz);
    +           this.refFields = checkNotNull(reflectiveFields);
    +           this.fieldSerializers = checkNotNull(fields);
    +           this.config = checkNotNull(config);
    +           for (int i = 0; i < this.refFields.length; i++) {
    +                   this.refFields[i].setAccessible(true);
    +           }
    +   }
    +
    +   public TypeSerializer<T> createSerializer()  {
    +           final String className = clazz.getCanonicalName().replace('.', 
'_') + "_GeneratedSerializer";
    +           final String fullClassName = packageName + "." + className;
    +           Class<?> serializerClazz;
    +           code = InstantiationUtil.getCodeForCachedClass(fullClassName);
    +           if (code == null) {
    +                   generateCode(className);
    +           }
    +           if(config.isWrapGeneratedClassesEnabled()) {
    +                   return new GenTypeSerializerProxy<>(clazz, 
fullClassName, code, fieldSerializers, config);
    +           }
    +           try {
    +                   serializerClazz = 
InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code);
    +           }
    +           catch (Exception e) {
    +                   throw new RuntimeException("Unable to generate 
serializer: " + className, e);
    +           }
    +           Constructor<?>[] ctors = serializerClazz.getConstructors();
    +           assert ctors.length == 1;
    +           try {
    +                   return (TypeSerializer<T>) ctors[0].newInstance(new 
Object[]{clazz, fieldSerializers, config});
    +           } catch (Exception e) {
    +                   throw new RuntimeException("Unable to instantiate 
serializer: " + className, e);
    +           }
    +
    +   }
    +
    +   private void generateCode(String className) {
    +           assert fieldSerializers.length > 0;
    +           String typeName = clazz.getCanonicalName();
    +           StringBuilder members = new StringBuilder();
    +           for (int i = 0; i < fieldSerializers.length; ++i) {
    +                   members.append(String.format("final TypeSerializer 
f%d;\n", i));
    +           }
    +           StringBuilder initMembers = new StringBuilder();
    +           for (int i = 0; i < fieldSerializers.length; ++i) {
    +                   initMembers.append(String.format("f%d = 
serializerFields[%d];\n", i, i));
    +           }
    +           StringBuilder createFields = new StringBuilder();
    +           for (int i = 0; i < fieldSerializers.length; ++i) {
    +                   createFields.append(String.format("((" + typeName + 
")t)." + modifyStringForField(refFields[i],
    +                           "f%d.createInstance()") + ";\n", i));
    +           }
    +           StringBuilder copyFields = new StringBuilder();
    +           copyFields.append("Object value;\n");
    +           for (int i = 0; i < fieldSerializers.length; ++i) {
    +                   if (refFields[i].getType().isPrimitive()) {
    +                           copyFields.append(String.format("((" + typeName 
+ ")target)." + modifyStringForField(refFields[i],
    +                                   "f%d.copy(((" + typeName + ")from)." + 
accessStringForField(refFields[i])) + ");\n", i));
    +                   } else {
    +                           copyFields.append(String.format(
    +                                   "value = ((" + typeName + ")from)." + 
accessStringForField(refFields[i]) + ";\n" +
    +                                   "if (value != null) {\n" +
    +                                   "       ((" + typeName + ")target)." + 
modifyStringForField(refFields[i], "f%d.copy(value)") + ";\n" +
    +                                   "} else {\n" +
    +                                   "       ((" + typeName + ")target)." + 
modifyStringForField(refFields[i], "null") + ";\n" +
    +                                   "}\n", i));
    +                   }
    +           }
    +           StringBuilder reuseCopyFields = new StringBuilder();
    +           reuseCopyFields.append("Object value;\n");
    +           reuseCopyFields.append("Object reuseValue;\n");
    +           reuseCopyFields.append("Object copy;\n");
    +           for (int i = 0; i < fieldSerializers.length; ++i) {
    +                   if (refFields[i].getType().isPrimitive()) {
    +                           reuseCopyFields.append(String.format("((" + 
typeName + ")reuse)." + modifyStringForField(refFields[i],
    +                                   "f%d.copy(((" + typeName + ")from)." + 
accessStringForField(refFields[i]) + ")") + ";\n", i));
    +                   } else {
    +                           reuseCopyFields.append(String.format(
    +                                   "value = ((" + typeName + ")from)." + 
accessStringForField(refFields[i]) + ";\n" +
    +                                   "if (value != null) {\n" +
    +                                   "       reuseValue = ((" + typeName + 
")reuse)." + accessStringForField(refFields[i]) + ";\n" +
    +                                   "       if (reuseValue != null) {\n" +
    --- End diff --
    
    I think this is already specialized but I might be wrong about what you 
mean.


> GSoC: Code Generation in Serializers
> ------------------------------------
>
>                 Key: FLINK-3599
>                 URL: https://issues.apache.org/jira/browse/FLINK-3599
>             Project: Flink
>          Issue Type: Improvement
>          Components: Type Serialization System
>            Reporter: Márton Balassi
>            Assignee: Gabor Horvath
>              Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to