[ 
https://issues.apache.org/jira/browse/BEAM-5092?focusedWorklogId=138044&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-138044
 ]

ASF GitHub Bot logged work on BEAM-5092:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Aug/18 01:16
            Start Date: 25/Aug/18 01:16
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #6268: [BEAM-5092] Prevent 
hash-lookup of schema on every record
URL: https://github.com/apache/beam/pull/6268
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
index 47ced3795ee..7e677a51531 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoder.java
@@ -43,8 +43,8 @@
           .put(TypeName.BYTE, ByteCoder.of())
           .put(TypeName.BYTES, ByteArrayCoder.of())
           .put(TypeName.INT16, BigEndianShortCoder.of())
-          .put(TypeName.INT32, BigEndianIntegerCoder.of())
-          .put(TypeName.INT64, BigEndianLongCoder.of())
+          .put(TypeName.INT32, VarIntCoder.of())
+          .put(TypeName.INT64, VarLongCoder.of())
           .put(TypeName.DECIMAL, BigDecimalCoder.of())
           .put(TypeName.FLOAT, FloatCoder.of())
           .put(TypeName.DOUBLE, DoubleCoder.of())
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
index b8fcb967da8..73430bedf83 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java
@@ -51,6 +51,7 @@
 import net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
 import net.bytebuddy.matcher.ElementMatchers;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.values.Row;
 
@@ -147,9 +148,12 @@
 
   private static DynamicType.Builder<Coder> implementMethods(
       Schema schema, DynamicType.Builder<Coder> builder) {
+    boolean hasNullableFields = 
schema.getFields().stream().anyMatch(Field::getNullable);
     return builder
         .defineMethod("getSchema", Schema.class, Visibility.PRIVATE, 
Ownership.STATIC)
         .intercept(FixedValue.reference(schema))
+        .defineMethod("hasNullableFields", boolean.class, Visibility.PRIVATE, 
Ownership.STATIC)
+        .intercept(FixedValue.reference(hasNullableFields))
         .method(ElementMatchers.named("encode"))
         .intercept(new EncodeInstruction())
         .method(ElementMatchers.named("decode"))
@@ -176,6 +180,13 @@ public ByteCodeAppender appender(Target 
implementationTarget) {
                 MethodVariableAccess.REFERENCE.loadFrom(1),
                 // OutputStream.
                 MethodVariableAccess.REFERENCE.loadFrom(2),
+                // hasNullableFields
+                MethodInvocation.invoke(
+                    implementationContext
+                        .getInstrumentedType()
+                        .getDeclaredMethods()
+                        .filter(ElementMatchers.named("hasNullableFields"))
+                        .getOnly()),
                 // Call EncodeInstruction.encodeDelegate
                 MethodInvocation.invoke(
                     LOADED_TYPE
@@ -197,9 +208,10 @@ public InstrumentedType prepare(InstrumentedType 
instrumentedType) {
     // The encode method of the generated Coder delegates to this method to 
evaluate all of the
     // per-field Coders.
     @SuppressWarnings("unchecked")
-    static void encodeDelegate(Coder[] coders, Row value, OutputStream 
outputStream)
+    static void encodeDelegate(
+        Coder[] coders, Row value, OutputStream outputStream, boolean 
hasNullableFields)
         throws IOException {
-      NULL_LIST_CODER.encode(scanNullFields(value), outputStream);
+      NULL_LIST_CODER.encode(scanNullFields(value, hasNullableFields), 
outputStream);
       for (int idx = 0; idx < value.getFieldCount(); ++idx) {
         Object fieldValue = value.getValue(idx);
         if (value.getValue(idx) != null) {
@@ -210,11 +222,13 @@ static void encodeDelegate(Coder[] coders, Row value, 
OutputStream outputStream)
 
     // Figure out which fields of the Row are null, and returns a BitSet. This 
allows us to save
     // on encoding each null field separately.
-    private static BitSet scanNullFields(Row row) {
+    private static BitSet scanNullFields(Row row, boolean hasNullableFields) {
       BitSet nullFields = new BitSet(row.getFieldCount());
-      for (int idx = 0; idx < row.getFieldCount(); ++idx) {
-        if (row.getValue(idx) == null) {
-          nullFields.set(idx);
+      if (hasNullableFields) {
+        for (int idx = 0; idx < row.getFieldCount(); ++idx) {
+          if (row.getValue(idx) == null) {
+            nullFields.set(idx);
+          }
         }
       }
       return nullFields;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
index 7171878789b..4879fa99c5c 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/GetterBasedSchemaProvider.java
@@ -26,6 +26,7 @@
 import java.lang.reflect.Type;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -54,27 +55,74 @@
     // why is that Java reflection does not guarantee the order in which it 
returns fields and
     // methods, and these schemas are often based on reflective analysis of 
classes. Therefore it's
     // important to capture the schema once here, so all invocations of the 
toRowFunction see the
-    // same version of the schema. If schemaFor were to be called inside the 
function, different
+    // same version of the schema. If schemaFor were to be called inside the 
lambda below, different
     // workers would see different versions of the schema.
     Schema schema = schemaFor(typeDescriptor);
-    return o -> 
Row.withSchema(schema).withFieldValueGetters(fieldValueGetterFactory(), 
o).build();
+
+    // Since we know that this factory is always called from inside the lambda 
with the same schema,
+    // return a caching factory that caches the first value seen for each 
class. This prevents
+    // having to lookup the getter list each time createGetters is called.
+    FieldValueGetterFactory getterFactory =
+        new FieldValueGetterFactory() {
+          @Nullable
+          private transient ConcurrentHashMap<Class, List<FieldValueGetter>> 
gettersMap = null;
+
+          private final FieldValueGetterFactory innerFactory = 
fieldValueGetterFactory();
+
+          @Override
+          public List<FieldValueGetter> createGetters(Class<?> targetClass, 
Schema schema) {
+            if (gettersMap == null) {
+              gettersMap = new ConcurrentHashMap<>();
+            }
+            List<FieldValueGetter> getters = gettersMap.get(targetClass);
+            if (getters != null) {
+              return getters;
+            }
+            getters = innerFactory.createGetters(targetClass, schema);
+            gettersMap.put(targetClass, getters);
+            return getters;
+          }
+        };
+    return o -> Row.withSchema(schema).withFieldValueGetters(getterFactory, 
o).build();
   }
 
   @Override
   @SuppressWarnings("unchecked")
   public <T> SerializableFunction<Row, T> fromRowFunction(TypeDescriptor<T> 
typeDescriptor) {
+    FieldValueSetterFactory setterFactory =
+        new FieldValueSetterFactory() {
+          @Nullable
+          private volatile ConcurrentHashMap<Class, List<FieldValueSetter>> 
settersMap = null;
+
+          private final FieldValueSetterFactory innerFactory = 
fieldValueSetterFactory();
+
+          @Override
+          public List<FieldValueSetter> createSetters(Class<?> targetClass, 
Schema schema) {
+            if (settersMap == null) {
+              settersMap = new ConcurrentHashMap<>();
+            }
+            List<FieldValueSetter> setters = settersMap.get(targetClass);
+            if (setters != null) {
+              return setters;
+            }
+            setters = innerFactory.createSetters(targetClass, schema);
+            settersMap.put(targetClass, setters);
+            return setters;
+          }
+        };
+
     return r -> {
       if (r instanceof RowWithGetters) {
         // Efficient path: simply extract the underlying POJO instead of 
creating a new one.
         return (T) ((RowWithGetters) r).getGetterTarget();
       } else {
         // Use the setters to copy values from the Row to a new instance of 
the class.
-        return fromRow(r, (Class<T>) typeDescriptor.getType());
+        return fromRow(r, (Class<T>) typeDescriptor.getType(), setterFactory);
       }
     };
   }
 
-  private <T> T fromRow(Row row, Class<T> clazz) {
+  private <T> T fromRow(Row row, Class<T> clazz, FieldValueSetterFactory 
setterFactory) {
     T object;
     try {
       object = clazz.getDeclaredConstructor().newInstance();
@@ -86,7 +134,7 @@
     }
 
     Schema schema = row.getSchema();
-    List<FieldValueSetter> setters = 
fieldValueSetterFactory().createSetters(clazz, schema);
+    List<FieldValueSetter> setters = setterFactory.createSetters(clazz, 
schema);
     checkState(
         setters.size() == row.getFieldCount(),
         "Did not have a matching number of setters and fields.");
@@ -96,15 +144,6 @@
     for (int i = 0; i < row.getFieldCount(); ++i) {
       FieldType type = schema.getField(i).getType();
       FieldValueSetter setter = setters.get(i);
-      if (setter == null) {
-        throw new RuntimeException(
-            "NULL SETTER FOR "
-                + clazz.getSimpleName()
-                + " field name "
-                + schema.getField(i).getName()
-                + " schema "
-                + schema);
-      }
       setter.set(
           object,
           fromValue(
@@ -113,7 +152,8 @@
               setter.type(),
               setter.elementType(),
               setter.mapKeyType(),
-              setter.mapValueType()));
+              setter.mapValueType(),
+              setterFactory));
     }
     return object;
   }
@@ -121,39 +161,62 @@
   @SuppressWarnings("unchecked")
   @Nullable
   private <T> T fromValue(
-      FieldType type, T value, Type fieldType, Type elemenentType, Type 
keyType, Type valueType) {
+      FieldType type,
+      T value,
+      Type fieldType,
+      Type elemenentType,
+      Type keyType,
+      Type valueType,
+      FieldValueSetterFactory setterFactory) {
     if (value == null) {
       return null;
     }
     if (TypeName.ROW.equals(type.getTypeName())) {
-      return (T) fromRow((Row) value, (Class) fieldType);
+      return (T) fromRow((Row) value, (Class) fieldType, setterFactory);
     } else if (TypeName.ARRAY.equals(type.getTypeName())) {
-      return (T) fromListValue(type.getCollectionElementType(), (List) value, 
elemenentType);
+      return (T)
+          fromListValue(
+              type.getCollectionElementType(), (List) value, elemenentType, 
setterFactory);
     } else if (TypeName.MAP.equals(type.getTypeName())) {
       return (T)
           fromMapValue(
-              type.getMapKeyType(), type.getMapValueType(), (Map) value, 
keyType, valueType);
+              type.getMapKeyType(),
+              type.getMapValueType(),
+              (Map) value,
+              keyType,
+              valueType,
+              setterFactory);
     } else {
       return value;
     }
   }
 
   @SuppressWarnings("unchecked")
-  private <T> List fromListValue(FieldType elementType, List<T> rowList, Type 
elementClass) {
+  private <T> List fromListValue(
+      FieldType elementType,
+      List<T> rowList,
+      Type elementClass,
+      FieldValueSetterFactory setterFactory) {
     List list = Lists.newArrayList();
     for (T element : rowList) {
-      list.add(fromValue(elementType, element, elementClass, null, null, 
null));
+      list.add(fromValue(elementType, element, elementClass, null, null, null, 
setterFactory));
     }
     return list;
   }
 
   @SuppressWarnings("unchecked")
   private Map<?, ?> fromMapValue(
-      FieldType keyType, FieldType valueType, Map<?, ?> map, Type keyClass, 
Type valueClass) {
+      FieldType keyType,
+      FieldType valueType,
+      Map<?, ?> map,
+      Type keyClass,
+      Type valueClass,
+      FieldValueSetterFactory setterFactory) {
     Map newMap = Maps.newHashMap();
     for (Map.Entry<?, ?> entry : map.entrySet()) {
-      Object key = fromValue(keyType, entry.getKey(), keyClass, null, null, 
null);
-      Object value = fromValue(valueType, entry.getValue(), valueClass, null, 
null, null);
+      Object key = fromValue(keyType, entry.getKey(), keyClass, null, null, 
null, setterFactory);
+      Object value =
+          fromValue(valueType, entry.getValue(), valueClass, null, null, null, 
setterFactory);
       newMap.put(key, value);
     }
     return newMap;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index d745df74f8d..2c12a7f1a60 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -378,7 +378,8 @@ public Builder addArray(Object... values) {
 
     public Builder attachValues(List<Object> values) {
       this.attached = true;
-      return addValues(values);
+      this.values = values;
+      return this;
     }
 
     public Builder withFieldValueGetters(
@@ -564,7 +565,7 @@ public Row build() {
       if (!this.values.isEmpty()) {
         List<Object> storageValues = attached ? this.values : verify(schema, 
this.values);
         checkState(getterTarget == null, "withGetterTarget requires getters.");
-        return new RowWithStorage(schema, verify(schema, storageValues));
+        return new RowWithStorage(schema, storageValues);
       } else if (fieldValueGetterFactory != null) {
         checkState(getterTarget != null, "getters require withGetterTarget.");
         return new RowWithGetters(schema, fieldValueGetterFactory, 
getterTarget);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 138044)
    Time Spent: 9h 10m  (was: 9h)

> Nexmark 10x performance regression
> ----------------------------------
>
>                 Key: BEAM-5092
>                 URL: https://issues.apache.org/jira/browse/BEAM-5092
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-core
>            Reporter: Andrew Pilloud
>            Assignee: Reuven Lax
>            Priority: Critical
>          Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> There looks to be a 10x performance hit on the DirectRunner and Flink nexmark 
> jobs. It first showed up in this build:
> [https://builds.apache.org/view/A-D/view/Beam/job/beam_PostCommit_Java_Nexmark_Direct/151/changes]
> [https://apache-beam-testing.appspot.com/explore?dashboard=5084698770407424]
> [https://apache-beam-testing.appspot.com/explore?dashboard=5699257587728384]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to