n3nash commented on a change in pull request #649: migrating kryo's dependency 
from twitter chill to plain kryo library
URL: https://github.com/apache/incubator-hudi/pull/649#discussion_r278729153
 
 

 ##########
 File path: 
hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java
 ##########
 @@ -16,156 +16,147 @@
 
 package com.uber.hoodie.common.util;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.reflectasm.ConstructorAccess;
 import com.uber.hoodie.exception.HoodieSerializationException;
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
 import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import org.objenesis.instantiator.ObjectInstantiator;
+
 
 /**
- * (NOTE: Adapted from Apache commons-lang3)
- * This class defines API's to serde an object.
+ * {@link SerializationUtils} class internally uses {@link Kryo} serializer 
for serializing /
+ * deserializing objects.
  */
 public class SerializationUtils {
+
+  // Caching kryo serializer to avoid creating kryo instance for every serde 
operation
+  private static final ThreadLocal<KryoSerializerInstance> serializerRef =
+      ThreadLocal.withInitial(() -> new KryoSerializerInstance());
+
   // Serialize
   //-----------------------------------------------------------------------
 
   /**
-   * <p>Serializes an {@code Object} to the specified stream.</p>
-   *
-   * <p>The stream will be closed once the object is written.
-   * This avoids the need for a finally clause, and maybe also exception
-   * handling, in the application code.</p>
-   *
-   * <p>The stream passed in is not buffered internally within this method.
-   * This is the responsibility of your application if desired.</p>
-   *
-   * @param obj the object to serialize to bytes, may be null
-   * @param outputStream the stream to write to, must not be null
-   * @throws IllegalArgumentException if {@code outputStream} is {@code null}
-   * @throws HoodieSerializationException (runtime) if the serialization fails
-   */
-  public static void serialize(final Serializable obj, final OutputStream 
outputStream) {
-    if (outputStream == null) {
-      throw new IllegalArgumentException("The OutputStream must not be null");
-    }
-    ObjectOutputStream out = null;
-    try {
-      // stream closed in the finally
-      out = new ObjectOutputStream(outputStream);
-      out.writeObject(obj);
-
-    } catch (final IOException ex) {
-      throw new HoodieSerializationException("unable to serialize object", ex);
-    } finally {
-      try {
-        if (out != null) {
-          out.close();
-        }
-      } catch (final IOException ex) { // NOPMD
-        // ignore close exception
-      }
-    }
-  }
-
-  /**
-   * <p>Serializes an {@code Object} to a byte array for
-   * storage/serialization.</p>
+   * <p>Serializes an {@code Object} to a byte array for 
storage/serialization.</p>
    *
    * @param obj the object to serialize to bytes
    * @return a byte[] with the converted Serializable
-   * @throws HoodieSerializationException (runtime) if the serialization fails
+   * @throws IOException if the serialization fails
    */
-  public static byte[] serialize(final Serializable obj) {
-    final ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
-    serialize(obj, baos);
-    return baos.toByteArray();
+  public static byte[] serialize(final Object obj) throws IOException {
+    return serializerRef.get().serialize(obj);
   }
 
   // Deserialize
   //-----------------------------------------------------------------------
 
   /**
-   * <p>
-   * Deserializes an {@code Object} from the specified stream.
-   * </p>
+   * <p> Deserializes a single {@code Object} from an array of bytes. </p>
    *
-   * <p>
-   * The stream will be closed once the object is written. This avoids the 
need for a finally clause, and maybe also
-   * exception handling, in the application code.
-   * </p>
-   *
-   * <p>
-   * The stream passed in is not buffered internally within this method. This 
is the responsibility of your
-   * application if desired.
-   * </p>
-   *
-   * <p>
-   * If the call site incorrectly types the return value, a {@link 
ClassCastException} is thrown from the call site.
-   * Without Generics in this declaration, the call site must type cast and 
can cause the same ClassCastException.
-   * Note that in both cases, the ClassCastException is in the call site, not 
in this method.
-   * </p>
+   * <p> If the call site incorrectly types the return value, a {@link 
ClassCastException} is thrown
+   * from the call site. Without Generics in this declaration, the call site 
must type cast and can
+   * cause the same ClassCastException. Note that in both cases, the 
ClassCastException is in the
+   * call site, not in this method. </p>
    *
    * @param <T> the object type to be deserialized
-   * @param inputStream the serialized object input stream, must not be null
+   * @param objectData the serialized object, must not be null
    * @return the deserialized object
-   * @throws IllegalArgumentException if {@code inputStream} is {@code null}
+   * @throws IllegalArgumentException if {@code objectData} is {@code null}
    * @throws HoodieSerializationException (runtime) if the serialization fails
    */
-  public static <T> T deserialize(final InputStream inputStream) {
-    if (inputStream == null) {
-      throw new IllegalArgumentException("The InputStream must not be null");
+  public static <T> T deserialize(final byte[] objectData) {
+    if (objectData == null) {
+      throw new IllegalArgumentException("The byte[] must not be null");
     }
-    ObjectInputStream in = null;
-    try {
-      // stream closed in the finally
-      in = new ObjectInputStream(inputStream);
-      @SuppressWarnings("unchecked") // may fail with CCE if serialised form 
is incorrect
-      final T obj = (T) in.readObject();
-      return obj;
-
-    } catch (final ClassCastException ex) {
-      throw new HoodieSerializationException("cannot cast class", ex);
-    } catch (final ClassNotFoundException ex) {
-      throw new HoodieSerializationException("class not found", ex);
-    } catch (final IOException ex) {
-      throw new HoodieSerializationException("unable to deserialize to 
object", ex);
-    } finally {
-      try {
-        if (in != null) {
-          in.close();
-        }
-      } catch (final IOException ex) { // NOPMD
-        // ignore close exception
-      }
+    return (T) serializerRef.get().deserialize(objectData);
+  }
+
+  private static class KryoSerializerInstance implements Serializable {
+    public static final int KRYO_SERIALIZER_INITIAL_BUFFER_SIZE = 1048576;
+    private final Kryo kryo;
+    // Caching ByteArrayOutputStream to avoid recreating it for every operation
+    private final ByteArrayOutputStream baos;
+
+    KryoSerializerInstance() {
+      KryoInstantiator kryoInstantiator = new KryoInstantiator();
+      kryo = kryoInstantiator.newKryo();
+      baos = new ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE);
+      kryo.setRegistrationRequired(false);
+    }
+
+    byte[] serialize(Object obj) throws IOException {
+      kryo.reset();
+      baos.reset();
+      Output output = new Output(baos);
+      this.kryo.writeClassAndObject(output, obj);
+      output.close();
+      return baos.toByteArray();
+    }
+
+    Object deserialize(byte[] objectData) {
+      return this.kryo.readClassAndObject(new Input(objectData));
     }
   }
 
   /**
-   * <p>
-   * Deserializes a single {@code Object} from an array of bytes.
-   * </p>
-   *
-   * <p>
-   * If the call site incorrectly types the return value, a {@link 
ClassCastException} is thrown from the call site.
-   * Without Generics in this declaration, the call site must type cast and 
can cause the same ClassCastException.
-   * Note that in both cases, the ClassCastException is in the call site, not 
in this method.
-   * </p>
-   *
-   * @param <T> the object type to be deserialized
-   * @param objectData the serialized object, must not be null
-   * @return the deserialized object
-   * @throws IllegalArgumentException if {@code objectData} is {@code null}
-   * @throws HoodieSerializationException (runtime) if the serialization fails
+   * This class has a no-arg constructor, suitable for use with reflection 
instantiation.
+   * For Details checkout com.twitter.chill.KryoBase.
    */
-  public static <T> T deserialize(final byte[] objectData) {
-    if (objectData == null) {
-      throw new IllegalArgumentException("The byte[] must not be null");
+  private static class KryoInstantiator implements Serializable {
+
+    public Kryo newKryo() {
+
+      Kryo kryo = new KryoBase();
+      // ensure that kryo doesn't fail if classes are not registered with kryo.
+      kryo.setRegistrationRequired(false);
+      kryo.setInstantiatorStrategy(new 
org.objenesis.strategy.StdInstantiatorStrategy());
+      // Handle cases where we may have an odd classloader setup like with 
libjars
+      // for hadoop
+      kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+      return kryo;
+    }
+
+    private static class KryoBase extends Kryo {
+      @Override
+      protected Serializer newDefaultSerializer(Class type) {
+        final Serializer serializer = super.newDefaultSerializer(type);
+        if (serializer instanceof FieldSerializer) {
+          final FieldSerializer fieldSerializer = (FieldSerializer) serializer;
+          fieldSerializer.setIgnoreSyntheticFields(true);
+        }
+        return serializer;
+      }
+
+      @Override
+      protected ObjectInstantiator newInstantiator(Class type) {
+        return () -> {
+          // First try reflectasm - it is fastest way to instantiate an object.
+          try {
+            final ConstructorAccess access = ConstructorAccess.get(type);
+            return access.newInstance();
+          } catch (Throwable t) {
+            // ignore this exception. We may want to try other way.
 
 Review comment:
   Can you add warn logs here and below in the catch ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to