[ 
https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473799#comment-15473799
 ] 

ASF GitHub Bot commented on FLINK-3599:
---------------------------------------

Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77999725
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java
 ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *  http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Constructor;
    +import java.lang.reflect.Field;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static 
org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
    +
    +public final class PojoComparatorGenerator<T> {
    +   private static final String packageName = 
"org.apache.flink.api.java.typeutils.runtime.generated";
    +
    +   private transient Field[] keyFields;
    +   private transient Integer[] keyFieldIds;
    +   private final TypeComparator<Object>[] comparators;
    +   private final TypeSerializer<T> serializer;
    +   private final Class<T> type;
    +   private final ExecutionConfig config;
    +   private String code;
    +
    +   public PojoComparatorGenerator(Field[] keyFields, TypeComparator<?>[] 
comparators, TypeSerializer<T> serializer,
    +                                                                   
Class<T> type, Integer[] keyFieldIds, ExecutionConfig config) {
    +           this.keyFields = keyFields;
    +           this.comparators = (TypeComparator<Object>[]) comparators;
    +
    +           this.type = type;
    +           this.serializer = serializer;
    +           this.keyFieldIds = keyFieldIds;
    +           this.config = config;
    +   }
    +
    +   public TypeComparator<T> createComparator() {
    +           // Multiple comparators can be generated for each type based on 
a list of keys. The list of keys and the type
    +           // name should determine the generated comparator. This 
information is used for caching (avoiding
    +           // recompilation). Note that, the name of the field is not 
sufficient because nested POJOs might have a field
    +           // with the name.
    +           StringBuilder keyBuilder = new StringBuilder();
    +           for(Integer i : keyFieldIds) {
    +                   keyBuilder.append(i);
    +                   keyBuilder.append("_");
    +           }
    +           final String className = type.getCanonicalName().replace('.', 
'_') + "_GeneratedComparator" +
    +                   keyBuilder.toString();
    +           final String fullClassName = packageName + "." + className;
    +           Class<?> comparatorClazz;
    +           code = InstantiationUtil.getCodeForCachedClass(fullClassName);
    +           if (code == null) {
    +                   generateCode(className);
    +           }
    +           if (config.isWrapGeneratedClassesEnabled()) {
    +                   return new GenTypeComparatorProxy<>(type, 
fullClassName, code, comparators, serializer);
    +           }
    +           try {
    +                   comparatorClazz = 
InstantiationUtil.compile(type.getClassLoader(), fullClassName, code);
    +           } catch (Exception e) {
    +                   throw new RuntimeException("Unable to generate 
comparator: " + className, e);
    +           }
    +           Constructor<?>[] ctors = comparatorClazz.getConstructors();
    +           assert ctors.length == 1;
    +           try {
    +                   return (TypeComparator<T>) ctors[0].newInstance(new 
Object[]{comparators, serializer, type});
    +           } catch (Exception e) {
    +                   throw new RuntimeException("Unable to instantiate 
comparator using: " + ctors[0].getName(), e);
    +           }
    +   }
    +
    +
    +   private void generateCode(String className) {
    +           String typeName = type.getCanonicalName();
    +           StringBuilder members = new StringBuilder();
    +           for (int i = 0; i < comparators.length; ++i) {
    +                   members.append(String.format("final TypeComparator 
f%d;\n", i));
    +           }
    +           StringBuilder initMembers = new StringBuilder();
    +           for (int i = 0; i < comparators.length; ++i) {
    +                   initMembers.append(String.format("f%d = 
comparators[%d];\n", i, i));
    +           }
    +           StringBuilder normalizableKeys = new StringBuilder();
    +           for (int i = 0; i < comparators.length; ++i) {
    +                   normalizableKeys.append(String.format("if 
(f%d.supportsNormalizedKey()) {\n" +
    +                           "       if (f%d.invertNormalizedKey() != 
inverted) break;\n" +
    +                           "       nKeys++;\n" +
    +                           "       final int len = 
f%d.getNormalizeKeyLen();\n" +
    +                           "       this.normalizedKeyLengths[%d] = len;\n" 
+
    +                           "       nKeyLen += len;\n" +
    +                           "       if (nKeyLen < 0) {\n" +
    +                           "               nKeyLen = Integer.MAX_VALUE;\n" 
+
    +                           "               break;\n" +
    +                           "       }\n" +
    +                           "} else {\n" +
    +                           "       break;\n" +
    +                           "}\n", i, i, i, i));
    +           }
    +           StringBuilder cloneMembers = new StringBuilder();
    +           for (int i = 0; i < comparators.length; ++i) {
    +                   cloneMembers.append(String.format("f%d = 
toClone.f%d.duplicate();\n", i, i));
    +           }
    +           StringBuilder flatComparators = new StringBuilder();
    +           for (int i = 0; i < comparators.length; ++i) {
    +                   flatComparators.append(String.format(
    +                           "if(f%d instanceof CompositeTypeComparator) 
{\n" +
    +                           "       
((CompositeTypeComparator)f%d).getFlatComparator(flatComparators);\n" +
    +                           "} else {\n" +
    +                           "       flatComparators.add(f%d);\n" +
    +                           "}\n", i, i, i));
    +           }
    +           StringBuilder hashMembers = new StringBuilder();
    +           for (int i = 0; i < keyFields.length; ++i) {
    +                   hashMembers.append(String.format(
    +                           "code *= TupleComparatorBase.HASH_SALT[%d & 
0x1F];\n" +
    +                           "code += this.f%d.hash(((" + typeName + 
")value)." + accessStringForField(keyFields[i]) +
    +                                   ");\n",
    +                           i, i));
    +           }
    +           StringBuilder setReference = new StringBuilder();
    +           for (int i = 0; i < comparators.length; ++i) {
    +                   setReference.append(String.format(
    +                           "this.f%d.setReference(((" + typeName + 
")toCompare)." + accessStringForField(keyFields[i]) + ");\n",
    +                           i));
    +           }
    +           StringBuilder equalToReference = new StringBuilder();
    +           for (int i = 0; i < comparators.length; ++i) {
    +                   equalToReference.append(String.format(
    +                           "if (!this.f%d.equalToReference(((" + typeName 
+ ")candidate)." +
    +                           accessStringForField(keyFields[i]) + ")) {\n" +
    +                           "       return false;\n" +
    +                           "}\n", i));
    +           }
    +           StringBuilder compareToReference = new StringBuilder();
    +           for (int i = 0; i < comparators.length; ++i) {
    +                   compareToReference.append(String.format(
    +                           "cmp = 
this.f%d.compareToReference(other.f%d);\n" +
    +                           "if (cmp != 0) {\n" +
    +                           "       return cmp;\n" +
    +                           "}\n", i, i));
    +           }
    +           StringBuilder compareFields = new StringBuilder();
    +           for (int i = 0; i < comparators.length; ++i) {
    +                   compareFields.append(String.format(
    +                           "cmp = f%d.compare(((" + typeName + ")first)." 
+ accessStringForField(keyFields[i]) + "," +
    +                           "((" + typeName + ")second)." + 
accessStringForField(keyFields[i]) + ");\n" +
    +                           "if (cmp != 0) {\n" +
    +                                   "return cmp;\n" +
    +                           "}\n", i));
    +           }
    +           StringBuilder putNormalizedKeys = new StringBuilder();
    +           for (int i = 0; i < comparators.length; ++i) {
    +                   putNormalizedKeys.append(String.format("if (%d >= 
numLeadingNormalizableKeys || numBytes <= 0) break;\n" +
    --- End diff --
    
    Instead of the  `%d >= numLeadingNormalizableKeys` check, you could have 
the for loop go up to `numLeadingNormalizableKeys`.
    
    Also, why is this a `break` (while false loop in the ftl instead of just 
returning here?)


> GSoC: Code Generation in Serializers
> ------------------------------------
>
>                 Key: FLINK-3599
>                 URL: https://issues.apache.org/jira/browse/FLINK-3599
>             Project: Flink
>          Issue Type: Improvement
>          Components: Type Serialization System
>            Reporter: Márton Balassi
>            Assignee: Gabor Horvath
>              Labels: gsoc2016, mentor
>
> The current implementation of the serializers can be a
> performance bottleneck in some scenarios. These performance problems were
> also reported on the mailing list recently [1].
> E.g. the PojoSerializer uses reflection for accessing the fields, which is 
> slow [2].
> For the complete proposal see [3].
> [1] 
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
> [2] 
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
> [3] 
> https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to