[ https://issues.apache.org/jira/browse/BEAM-5092?focusedWorklogId=138044&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-138044 ]
ASF GitHub Bot logged work on BEAM-5092: ---------------------------------------- Author: ASF GitHub Bot Created on: 25/Aug/18 01:16 Start Date: 25/Aug/18 01:16 Worklog Time Spent: 10m Work Description: tweise closed pull request #6268: [BEAM-5092] Prevent hash-lookup of schema on every record URL: https://github.com/apache/beam/pull/6268 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java index 47ced3795ee..7e677a51531 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java @@ -43,8 +43,8 @@ .put(TypeName.BYTE, ByteCoder.of()) .put(TypeName.BYTES, ByteArrayCoder.of()) .put(TypeName.INT16, BigEndianShortCoder.of()) - .put(TypeName.INT32, BigEndianIntegerCoder.of()) - .put(TypeName.INT64, BigEndianLongCoder.of()) + .put(TypeName.INT32, VarIntCoder.of()) + .put(TypeName.INT64, VarLongCoder.of()) .put(TypeName.DECIMAL, BigDecimalCoder.of()) .put(TypeName.FLOAT, FloatCoder.of()) .put(TypeName.DOUBLE, DoubleCoder.of()) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java index b8fcb967da8..73430bedf83 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java @@ -51,6 +51,7 @@ import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess; import net.bytebuddy.matcher.ElementMatchers; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; import org.apache.beam.sdk.schemas.Schema.TypeName; import org.apache.beam.sdk.values.Row; @@ -147,9 +148,12 @@ private static DynamicType.Builder<Coder> implementMethods( Schema schema, DynamicType.Builder<Coder> builder) { + boolean hasNullableFields = schema.getFields().stream().anyMatch(Field::getNullable); return builder .defineMethod("getSchema", Schema.class, Visibility.PRIVATE, Ownership.STATIC) .intercept(FixedValue.reference(schema)) + .defineMethod("hasNullableFields", boolean.class, Visibility.PRIVATE, Ownership.STATIC) + .intercept(FixedValue.reference(hasNullableFields)) .method(ElementMatchers.named("encode")) .intercept(new EncodeInstruction()) .method(ElementMatchers.named("decode")) @@ -176,6 +180,13 @@ public ByteCodeAppender appender(Target implementationTarget) { MethodVariableAccess.REFERENCE.loadFrom(1), // OutputStream. MethodVariableAccess.REFERENCE.loadFrom(2), + // hasNullableFields + MethodInvocation.invoke( + implementationContext + .getInstrumentedType() + .getDeclaredMethods() + .filter(ElementMatchers.named("hasNullableFields")) + .getOnly()), // Call EncodeInstruction.encodeDelegate MethodInvocation.invoke( LOADED_TYPE @@ -197,9 +208,10 @@ public InstrumentedType prepare(InstrumentedType instrumentedType) { // The encode method of the generated Coder delegates to this method to evaluate all of the // per-field Coders. @SuppressWarnings("unchecked") - static void encodeDelegate(Coder[] coders, Row value, OutputStream outputStream) + static void encodeDelegate( + Coder[] coders, Row value, OutputStream outputStream, boolean hasNullableFields) throws IOException { - NULL_LIST_CODER.encode(scanNullFields(value), outputStream); + NULL_LIST_CODER.encode(scanNullFields(value, hasNullableFields), outputStream); for (int idx = 0; idx < value.getFieldCount(); ++idx) { Object fieldValue = value.getValue(idx); if (value.getValue(idx) != null) { @@ -210,11 +222,13 @@ static void encodeDelegate(Coder[] coders, Row value, OutputStream outputStream) // Figure out which fields of the Row are null, and returns a BitSet. This allows us to save // on encoding each null field separately. - private static BitSet scanNullFields(Row row) { + private static BitSet scanNullFields(Row row, boolean hasNullableFields) { BitSet nullFields = new BitSet(row.getFieldCount()); - for (int idx = 0; idx < row.getFieldCount(); ++idx) { - if (row.getValue(idx) == null) { - nullFields.set(idx); + if (hasNullableFields) { + for (int idx = 0; idx < row.getFieldCount(); ++idx) { + if (row.getValue(idx) == null) { + nullFields.set(idx); + } } } return nullFields; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java index 7171878789b..4879fa99c5c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java @@ -26,6 +26,7 @@ import java.lang.reflect.Type; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -54,27 +55,74 @@ // why is that Java reflection does not guarantee the order in which it returns fields and // methods, and these schemas are often based on reflective analysis of classes. Therefore it's // important to capture the schema once here, so all invocations of the toRowFunction see the - // same version of the schema. If schemaFor were to be called inside the function, different + // same version of the schema. If schemaFor were to be called inside the lambda below, different // workers would see different versions of the schema. Schema schema = schemaFor(typeDescriptor); - return o -> Row.withSchema(schema).withFieldValueGetters(fieldValueGetterFactory(), o).build(); + + // Since we know that this factory is always called from inside the lambda with the same schema, + // return a caching factory that caches the first value seen for each class. This prevents + // having to lookup the getter list each time createGetters is called. + FieldValueGetterFactory getterFactory = + new FieldValueGetterFactory() { + @Nullable + private transient ConcurrentHashMap<Class, List<FieldValueGetter>> gettersMap = null; + + private final FieldValueGetterFactory innerFactory = fieldValueGetterFactory(); + + @Override + public List<FieldValueGetter> createGetters(Class<?> targetClass, Schema schema) { + if (gettersMap == null) { + gettersMap = new ConcurrentHashMap<>(); + } + List<FieldValueGetter> getters = gettersMap.get(targetClass); + if (getters != null) { + return getters; + } + getters = innerFactory.createGetters(targetClass, schema); + gettersMap.put(targetClass, getters); + return getters; + } + }; + return o -> Row.withSchema(schema).withFieldValueGetters(getterFactory, o).build(); } @Override @SuppressWarnings("unchecked") public <T> SerializableFunction<Row, T> fromRowFunction(TypeDescriptor<T> typeDescriptor) { + FieldValueSetterFactory setterFactory = + new FieldValueSetterFactory() { + @Nullable + private volatile ConcurrentHashMap<Class, List<FieldValueSetter>> settersMap = null; + + private final FieldValueSetterFactory innerFactory = fieldValueSetterFactory(); + + @Override + public List<FieldValueSetter> createSetters(Class<?> targetClass, Schema schema) { + if (settersMap == null) { + settersMap = new ConcurrentHashMap<>(); + } + List<FieldValueSetter> setters = settersMap.get(targetClass); + if (setters != null) { + return setters; + } + setters = innerFactory.createSetters(targetClass, schema); + settersMap.put(targetClass, setters); + return setters; + } + }; + return r -> { if (r instanceof RowWithGetters) { // Efficient path: simply extract the underlying POJO instead of creating a new one. return (T) ((RowWithGetters) r).getGetterTarget(); } else { // Use the setters to copy values from the Row to a new instance of the class. - return fromRow(r, (Class<T>) typeDescriptor.getType()); + return fromRow(r, (Class<T>) typeDescriptor.getType(), setterFactory); } }; } - private <T> T fromRow(Row row, Class<T> clazz) { + private <T> T fromRow(Row row, Class<T> clazz, FieldValueSetterFactory setterFactory) { T object; try { object = clazz.getDeclaredConstructor().newInstance(); @@ -86,7 +134,7 @@ } Schema schema = row.getSchema(); - List<FieldValueSetter> setters = fieldValueSetterFactory().createSetters(clazz, schema); + List<FieldValueSetter> setters = setterFactory.createSetters(clazz, schema); checkState( setters.size() == row.getFieldCount(), "Did not have a matching number of setters and fields."); @@ -96,15 +144,6 @@ for (int i = 0; i < row.getFieldCount(); ++i) { FieldType type = schema.getField(i).getType(); FieldValueSetter setter = setters.get(i); - if (setter == null) { - throw new RuntimeException( - "NULL SETTER FOR " - + clazz.getSimpleName() - + " field name " - + schema.getField(i).getName() - + " schema " - + schema); - } setter.set( object, fromValue( @@ -113,7 +152,8 @@ setter.type(), setter.elementType(), setter.mapKeyType(), - setter.mapValueType())); + setter.mapValueType(), + setterFactory)); } return object; } @@ -121,39 +161,62 @@ @SuppressWarnings("unchecked") @Nullable private <T> T fromValue( - FieldType type, T value, Type fieldType, Type elemenentType, Type keyType, Type valueType) { + FieldType type, + T value, + Type fieldType, + Type elemenentType, + Type keyType, + Type valueType, + FieldValueSetterFactory setterFactory) { if (value == null) { return null; } if (TypeName.ROW.equals(type.getTypeName())) { - return (T) fromRow((Row) value, (Class) fieldType); + return (T) fromRow((Row) value, (Class) fieldType, setterFactory); } else if (TypeName.ARRAY.equals(type.getTypeName())) { - return (T) fromListValue(type.getCollectionElementType(), (List) value, elemenentType); + return (T) + fromListValue( + type.getCollectionElementType(), (List) value, elemenentType, setterFactory); } else if (TypeName.MAP.equals(type.getTypeName())) { return (T) fromMapValue( - type.getMapKeyType(), type.getMapValueType(), (Map) value, keyType, valueType); + type.getMapKeyType(), + type.getMapValueType(), + (Map) value, + keyType, + valueType, + setterFactory); } else { return value; } } @SuppressWarnings("unchecked") - private <T> List fromListValue(FieldType elementType, List<T> rowList, Type elementClass) { + private <T> List fromListValue( + FieldType elementType, + List<T> rowList, + Type elementClass, + FieldValueSetterFactory setterFactory) { List list = Lists.newArrayList(); for (T element : rowList) { - list.add(fromValue(elementType, element, elementClass, null, null, null)); + list.add(fromValue(elementType, element, elementClass, null, null, null, setterFactory)); } return list; } @SuppressWarnings("unchecked") private Map<?, ?> fromMapValue( - FieldType keyType, FieldType valueType, Map<?, ?> map, Type keyClass, Type valueClass) { + FieldType keyType, + FieldType valueType, + Map<?, ?> map, + Type keyClass, + Type valueClass, + FieldValueSetterFactory setterFactory) { Map newMap = Maps.newHashMap(); for (Map.Entry<?, ?> entry : map.entrySet()) { - Object key = fromValue(keyType, entry.getKey(), keyClass, null, null, null); - Object value = fromValue(valueType, entry.getValue(), valueClass, null, null, null); + Object key = fromValue(keyType, entry.getKey(), keyClass, null, null, null, setterFactory); + Object value = + fromValue(valueType, entry.getValue(), valueClass, null, null, null, setterFactory); newMap.put(key, value); } return newMap; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java index d745df74f8d..2c12a7f1a60 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java @@ -378,7 +378,8 @@ public Builder addArray(Object... values) { public Builder attachValues(List<Object> values) { this.attached = true; - return addValues(values); + this.values = values; + return this; } public Builder withFieldValueGetters( @@ -564,7 +565,7 @@ public Row build() { if (!this.values.isEmpty()) { List<Object> storageValues = attached ? this.values : verify(schema, this.values); checkState(getterTarget == null, "withGetterTarget requires getters."); - return new RowWithStorage(schema, verify(schema, storageValues)); + return new RowWithStorage(schema, storageValues); } else if (fieldValueGetterFactory != null) { checkState(getterTarget != null, "getters require withGetterTarget."); return new RowWithGetters(schema, fieldValueGetterFactory, getterTarget); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 138044) Time Spent: 9h 10m (was: 9h) > Nexmark 10x performance regression > ---------------------------------- > > Key: BEAM-5092 > URL: https://issues.apache.org/jira/browse/BEAM-5092 > Project: Beam > Issue Type: New Feature > Components: sdk-java-core > Reporter: Andrew Pilloud > Assignee: Reuven Lax > Priority: Critical > Time Spent: 9h 10m > Remaining Estimate: 0h > > There looks to be a 10x performance hit on the DirectRunner and Flink nexmark > jobs. It first showed up in this build: > [https://builds.apache.org/view/A-D/view/Beam/job/beam_PostCommit_Java_Nexmark_Direct/151/changes] > [https://apache-beam-testing.appspot.com/explore?dashboard=5084698770407424] > [https://apache-beam-testing.appspot.com/explore?dashboard=5699257587728384] -- This message was sent by Atlassian JIRA (v7.6.3#76005)