[ https://issues.apache.org/jira/browse/FLINK-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15473799#comment-15473799 ]
ASF GitHub Bot commented on FLINK-3599: --------------------------------------- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/2211#discussion_r77999725 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField; + +public final class PojoComparatorGenerator<T> { + private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated"; + + private transient Field[] keyFields; + private transient Integer[] keyFieldIds; + private final TypeComparator<Object>[] comparators; + private final TypeSerializer<T> serializer; + private final Class<T> type; + private final ExecutionConfig config; + private String code; + + public PojoComparatorGenerator(Field[] keyFields, TypeComparator<?>[] comparators, TypeSerializer<T> serializer, + Class<T> type, Integer[] keyFieldIds, ExecutionConfig config) { + this.keyFields = keyFields; + this.comparators = (TypeComparator<Object>[]) comparators; + + this.type = type; + this.serializer = serializer; + this.keyFieldIds = keyFieldIds; + this.config = config; + } + + public TypeComparator<T> createComparator() { + // Multiple comparators can be generated for each type based on a list of keys. The list of keys and the type + // name should determine the generated comparator. This information is used for caching (avoiding + // recompilation). Note that, the name of the field is not sufficient because nested POJOs might have a field + // with the name. + StringBuilder keyBuilder = new StringBuilder(); + for(Integer i : keyFieldIds) { + keyBuilder.append(i); + keyBuilder.append("_"); + } + final String className = type.getCanonicalName().replace('.', '_') + "_GeneratedComparator" + + keyBuilder.toString(); + final String fullClassName = packageName + "." + className; + Class<?> comparatorClazz; + code = InstantiationUtil.getCodeForCachedClass(fullClassName); + if (code == null) { + generateCode(className); + } + if (config.isWrapGeneratedClassesEnabled()) { + return new GenTypeComparatorProxy<>(type, fullClassName, code, comparators, serializer); + } + try { + comparatorClazz = InstantiationUtil.compile(type.getClassLoader(), fullClassName, code); + } catch (Exception e) { + throw new RuntimeException("Unable to generate comparator: " + className, e); + } + Constructor<?>[] ctors = comparatorClazz.getConstructors(); + assert ctors.length == 1; + try { + return (TypeComparator<T>) ctors[0].newInstance(new Object[]{comparators, serializer, type}); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate comparator using: " + ctors[0].getName(), e); + } + } + + + private void generateCode(String className) { + String typeName = type.getCanonicalName(); + StringBuilder members = new StringBuilder(); + for (int i = 0; i < comparators.length; ++i) { + members.append(String.format("final TypeComparator f%d;\n", i)); + } + StringBuilder initMembers = new StringBuilder(); + for (int i = 0; i < comparators.length; ++i) { + initMembers.append(String.format("f%d = comparators[%d];\n", i, i)); + } + StringBuilder normalizableKeys = new StringBuilder(); + for (int i = 0; i < comparators.length; ++i) { + normalizableKeys.append(String.format("if (f%d.supportsNormalizedKey()) {\n" + + " if (f%d.invertNormalizedKey() != inverted) break;\n" + + " nKeys++;\n" + + " final int len = f%d.getNormalizeKeyLen();\n" + + " this.normalizedKeyLengths[%d] = len;\n" + + " nKeyLen += len;\n" + + " if (nKeyLen < 0) {\n" + + " nKeyLen = Integer.MAX_VALUE;\n" + + " break;\n" + + " }\n" + + "} else {\n" + + " break;\n" + + "}\n", i, i, i, i)); + } + StringBuilder cloneMembers = new StringBuilder(); + for (int i = 0; i < comparators.length; ++i) { + cloneMembers.append(String.format("f%d = toClone.f%d.duplicate();\n", i, i)); + } + StringBuilder flatComparators = new StringBuilder(); + for (int i = 0; i < comparators.length; ++i) { + flatComparators.append(String.format( + "if(f%d instanceof CompositeTypeComparator) {\n" + + " ((CompositeTypeComparator)f%d).getFlatComparator(flatComparators);\n" + + "} else {\n" + + " flatComparators.add(f%d);\n" + + "}\n", i, i, i)); + } + StringBuilder hashMembers = new StringBuilder(); + for (int i = 0; i < keyFields.length; ++i) { + hashMembers.append(String.format( + "code *= TupleComparatorBase.HASH_SALT[%d & 0x1F];\n" + + "code += this.f%d.hash(((" + typeName + ")value)." + accessStringForField(keyFields[i]) + + ");\n", + i, i)); + } + StringBuilder setReference = new StringBuilder(); + for (int i = 0; i < comparators.length; ++i) { + setReference.append(String.format( + "this.f%d.setReference(((" + typeName + ")toCompare)." + accessStringForField(keyFields[i]) + ");\n", + i)); + } + StringBuilder equalToReference = new StringBuilder(); + for (int i = 0; i < comparators.length; ++i) { + equalToReference.append(String.format( + "if (!this.f%d.equalToReference(((" + typeName + ")candidate)." + + accessStringForField(keyFields[i]) + ")) {\n" + + " return false;\n" + + "}\n", i)); + } + StringBuilder compareToReference = new StringBuilder(); + for (int i = 0; i < comparators.length; ++i) { + compareToReference.append(String.format( + "cmp = this.f%d.compareToReference(other.f%d);\n" + + "if (cmp != 0) {\n" + + " return cmp;\n" + + "}\n", i, i)); + } + StringBuilder compareFields = new StringBuilder(); + for (int i = 0; i < comparators.length; ++i) { + compareFields.append(String.format( + "cmp = f%d.compare(((" + typeName + ")first)." + accessStringForField(keyFields[i]) + "," + + "((" + typeName + ")second)." + accessStringForField(keyFields[i]) + ");\n" + + "if (cmp != 0) {\n" + + "return cmp;\n" + + "}\n", i)); + } + StringBuilder putNormalizedKeys = new StringBuilder(); + for (int i = 0; i < comparators.length; ++i) { + putNormalizedKeys.append(String.format("if (%d >= numLeadingNormalizableKeys || numBytes <= 0) break;\n" + --- End diff -- Instead of the `%d >= numLeadingNormalizableKeys` check, you could have the for loop go up to `numLeadingNormalizableKeys`. Also, why is this a `break` (while false loop in the ftl instead of just returning here?) > GSoC: Code Generation in Serializers > ------------------------------------ > > Key: FLINK-3599 > URL: https://issues.apache.org/jira/browse/FLINK-3599 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System > Reporter: Márton Balassi > Assignee: Gabor Horvath > Labels: gsoc2016, mentor > > The current implementation of the serializers can be a > performance bottleneck in some scenarios. These performance problems were > also reported on the mailing list recently [1]. > E.g. the PojoSerializer uses reflection for accessing the fields, which is > slow [2]. > For the complete proposal see [3]. > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html > [2] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369 > [3] > https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk -- This message was sent by Atlassian JIRA (v6.3.4#6332)