n3nash commented on a change in pull request #649: migrating kryo's dependency
from twitter chill to plain kryo library
URL: https://github.com/apache/incubator-hudi/pull/649#discussion_r278727822
##########
File path:
hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java
##########
@@ -16,156 +16,147 @@
package com.uber.hoodie.common.util;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.reflectasm.ConstructorAccess;
import com.uber.hoodie.exception.HoodieSerializationException;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import org.objenesis.instantiator.ObjectInstantiator;
+
/**
- * (NOTE: Adapted from Apache commons-lang3)
- * This class defines API's to serde an object.
+ * {@link SerializationUtils} class internally uses {@link Kryo} serializer
for serializing /
+ * deserializing objects.
*/
public class SerializationUtils {
+
+ // Caching kryo serializer to avoid creating kryo instance for every serde
operation
+ private static final ThreadLocal<KryoSerializerInstance> serializerRef =
+ ThreadLocal.withInitial(() -> new KryoSerializerInstance());
+
// Serialize
//-----------------------------------------------------------------------
/**
- * <p>Serializes an {@code Object} to the specified stream.</p>
- *
- * <p>The stream will be closed once the object is written.
- * This avoids the need for a finally clause, and maybe also exception
- * handling, in the application code.</p>
- *
- * <p>The stream passed in is not buffered internally within this method.
- * This is the responsibility of your application if desired.</p>
- *
- * @param obj the object to serialize to bytes, may be null
- * @param outputStream the stream to write to, must not be null
- * @throws IllegalArgumentException if {@code outputStream} is {@code null}
- * @throws HoodieSerializationException (runtime) if the serialization fails
- */
- public static void serialize(final Serializable obj, final OutputStream
outputStream) {
- if (outputStream == null) {
- throw new IllegalArgumentException("The OutputStream must not be null");
- }
- ObjectOutputStream out = null;
- try {
- // stream closed in the finally
- out = new ObjectOutputStream(outputStream);
- out.writeObject(obj);
-
- } catch (final IOException ex) {
- throw new HoodieSerializationException("unable to serialize object", ex);
- } finally {
- try {
- if (out != null) {
- out.close();
- }
- } catch (final IOException ex) { // NOPMD
- // ignore close exception
- }
- }
- }
-
- /**
- * <p>Serializes an {@code Object} to a byte array for
- * storage/serialization.</p>
+ * <p>Serializes an {@code Object} to a byte array for
storage/serialization.</p>
*
* @param obj the object to serialize to bytes
* @return a byte[] with the converted Serializable
- * @throws HoodieSerializationException (runtime) if the serialization fails
+ * @throws IOException if the serialization fails
*/
- public static byte[] serialize(final Serializable obj) {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
- serialize(obj, baos);
- return baos.toByteArray();
+ public static byte[] serialize(final Object obj) throws IOException {
+ return serializerRef.get().serialize(obj);
}
// Deserialize
//-----------------------------------------------------------------------
/**
- * <p>
- * Deserializes an {@code Object} from the specified stream.
- * </p>
+ * <p> Deserializes a single {@code Object} from an array of bytes. </p>
*
- * <p>
- * The stream will be closed once the object is written. This avoids the
need for a finally clause, and maybe also
- * exception handling, in the application code.
- * </p>
- *
- * <p>
- * The stream passed in is not buffered internally within this method. This
is the responsibility of your
- * application if desired.
- * </p>
- *
- * <p>
- * If the call site incorrectly types the return value, a {@link
ClassCastException} is thrown from the call site.
- * Without Generics in this declaration, the call site must type cast and
can cause the same ClassCastException.
- * Note that in both cases, the ClassCastException is in the call site, not
in this method.
- * </p>
+ * <p> If the call site incorrectly types the return value, a {@link
ClassCastException} is thrown
+ * from the call site. Without Generics in this declaration, the call site
must type cast and can
+ * cause the same ClassCastException. Note that in both cases, the
ClassCastException is in the
+ * call site, not in this method. </p>
*
* @param <T> the object type to be deserialized
- * @param inputStream the serialized object input stream, must not be null
+ * @param objectData the serialized object, must not be null
* @return the deserialized object
- * @throws IllegalArgumentException if {@code inputStream} is {@code null}
+ * @throws IllegalArgumentException if {@code objectData} is {@code null}
* @throws HoodieSerializationException (runtime) if the serialization fails
*/
- public static <T> T deserialize(final InputStream inputStream) {
- if (inputStream == null) {
- throw new IllegalArgumentException("The InputStream must not be null");
+ public static <T> T deserialize(final byte[] objectData) {
+ if (objectData == null) {
+ throw new IllegalArgumentException("The byte[] must not be null");
}
- ObjectInputStream in = null;
- try {
- // stream closed in the finally
- in = new ObjectInputStream(inputStream);
- @SuppressWarnings("unchecked") // may fail with CCE if serialised form
is incorrect
- final T obj = (T) in.readObject();
- return obj;
-
- } catch (final ClassCastException ex) {
- throw new HoodieSerializationException("cannot cast class", ex);
- } catch (final ClassNotFoundException ex) {
- throw new HoodieSerializationException("class not found", ex);
- } catch (final IOException ex) {
- throw new HoodieSerializationException("unable to deserialize to
object", ex);
- } finally {
- try {
- if (in != null) {
- in.close();
- }
- } catch (final IOException ex) { // NOPMD
- // ignore close exception
- }
+ return (T) serializerRef.get().deserialize(objectData);
+ }
+
+ private static class KryoSerializerInstance implements Serializable {
+ public static final int KRYO_SERIALIZER_INITIAL_BUFFER_SIZE = 1048576;
+ private final Kryo kryo;
+ // Caching ByteArrayOutputStream to avoid recreating it for every operation
+ private final ByteArrayOutputStream baos;
+
+ KryoSerializerInstance() {
+ KryoInstantiator kryoInstantiator = new KryoInstantiator();
+ kryo = kryoInstantiator.newKryo();
+ baos = new ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE);
+ kryo.setRegistrationRequired(false);
+ }
+
+ byte[] serialize(Object obj) throws IOException {
+ kryo.reset();
+ baos.reset();
+ Output output = new Output(baos);
+ this.kryo.writeClassAndObject(output, obj);
+ output.close();
+ return baos.toByteArray();
+ }
+
+ Object deserialize(byte[] objectData) {
+ return this.kryo.readClassAndObject(new Input(objectData));
}
}
/**
- * <p>
- * Deserializes a single {@code Object} from an array of bytes.
- * </p>
- *
- * <p>
- * If the call site incorrectly types the return value, a {@link
ClassCastException} is thrown from the call site.
- * Without Generics in this declaration, the call site must type cast and
can cause the same ClassCastException.
- * Note that in both cases, the ClassCastException is in the call site, not
in this method.
- * </p>
- *
- * @param <T> the object type to be deserialized
- * @param objectData the serialized object, must not be null
- * @return the deserialized object
- * @throws IllegalArgumentException if {@code objectData} is {@code null}
- * @throws HoodieSerializationException (runtime) if the serialization fails
+ * This class has a no-arg constructor, suitable for use with reflection
instantiation.
+ * For Details checkout com.twitter.chill.KryoBase.
*/
- public static <T> T deserialize(final byte[] objectData) {
- if (objectData == null) {
- throw new IllegalArgumentException("The byte[] must not be null");
+ private static class KryoInstantiator implements Serializable {
+
+ public Kryo newKryo() {
+
+ Kryo kryo = new KryoBase();
+ // ensure that kryo doesn't fail if classes are not registered with kryo.
+ kryo.setRegistrationRequired(false);
+ kryo.setInstantiatorStrategy(new
org.objenesis.strategy.StdInstantiatorStrategy());
+ // Handle cases where we may have an odd classloader setup like with
libjars
+ // for hadoop
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
Review comment:
Can you explain how kryo chooses it's default class loader ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services