This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 9e124834aae57cc8773f9254d34eb439df91694d Author: Alexey Kudinkin <[email protected]> AuthorDate: Tue Oct 25 22:10:15 2022 -0700 [HUDI-4959] Fixing Avro's `Utf8` serialization in Kryo (#7024) (cherry picked from commit b6c35394c603baa6cb718aa7d7d41b959de115a8) --- .../hudi/common/util/SerializationUtils.java | 39 +++++++++++++++++----- .../hudi/common/util/TestSerializationUtils.java | 29 +++++++++++++++- 2 files changed, 58 insertions(+), 10 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java index 9041db51444..872848a5d49 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java @@ -19,8 +19,10 @@ package org.apache.hudi.common.util; import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; +import org.apache.avro.util.Utf8; import org.objenesis.strategy.StdInstantiatorStrategy; import java.io.ByteArrayOutputStream; @@ -36,9 +38,6 @@ public class SerializationUtils { private static final ThreadLocal<KryoSerializerInstance> SERIALIZER_REF = ThreadLocal.withInitial(KryoSerializerInstance::new); - // Serialize - // ----------------------------------------------------------------------- - /** * <p> * Serializes an {@code Object} to a byte array for storage/serialization. @@ -52,9 +51,6 @@ public class SerializationUtils { return SERIALIZER_REF.get().serialize(obj); } - // Deserialize - // ----------------------------------------------------------------------- - /** * <p> * Deserializes a single {@code Object} from an array of bytes. @@ -112,17 +108,42 @@ public class SerializationUtils { private static class KryoInstantiator implements Serializable { public Kryo newKryo() { - Kryo kryo = new Kryo(); - // ensure that kryo doesn't fail if classes are not registered with kryo. + + // This instance of Kryo should not require prior registration of classes kryo.setRegistrationRequired(false); - // This would be used for object initialization if nothing else works out. kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); // Handle cases where we may have an odd classloader setup like with libjars // for hadoop kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + + // Register serializers + kryo.register(Utf8.class, new AvroUtf8Serializer()); + return kryo; } } + + /** + * NOTE: This {@link Serializer} could deserialize instance of {@link Utf8} serialized + * by implicitly generated Kryo serializer (based on {@link com.esotericsoftware.kryo.serializers.FieldSerializer} + */ + private static class AvroUtf8Serializer extends Serializer<Utf8> { + + @SuppressWarnings("unchecked") + @Override + public void write(Kryo kryo, Output output, Utf8 utf8String) { + Serializer<byte[]> bytesSerializer = kryo.getDefaultSerializer(byte[].class); + bytesSerializer.write(kryo, output, utf8String.getBytes()); + } + + @SuppressWarnings("unchecked") + @Override + public Utf8 read(Kryo kryo, Input input, Class<Utf8> type) { + Serializer<byte[]> bytesSerializer = kryo.getDefaultSerializer(byte[].class); + byte[] bytes = bytesSerializer.read(kryo, input, byte[].class); + return new Utf8(bytes); + } + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java index 9d6c1b81b04..f2714aaf9a2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java @@ -19,15 +19,21 @@ package org.apache.hudi.common.util; import org.apache.avro.util.Utf8; +import org.apache.hudi.common.model.DeleteRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.table.log.block.HoodieDeleteBlock; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedList; import java.util.Objects; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -52,12 +58,33 @@ public class TestSerializationUtils { verifyObject(new LinkedList<>(Arrays.asList(2, 3, 5))); } + @Test + public void testAvroUtf8SerDe() throws IOException { + byte[] firstBytes = SerializationUtils.serialize(new Utf8("test")); + // 4 byte string + 3 bytes length (Kryo uses variable-length encoding) + assertEquals(7, firstBytes.length); + } + + @Test + public void testClassFullyQualifiedNameSerialization() throws IOException { + DeleteRecord deleteRecord = DeleteRecord.create(new HoodieKey("key", "partition")); + HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(new DeleteRecord[]{deleteRecord}, Collections.emptyMap()); + + byte[] firstBytes = SerializationUtils.serialize(deleteBlock); + byte[] secondBytes = SerializationUtils.serialize(deleteBlock); + + assertNotSame(firstBytes, secondBytes); + // NOTE: Here we assert that Kryo doesn't optimize out the fully-qualified class-name + // and always writes it out + assertEquals(ByteBuffer.wrap(firstBytes), ByteBuffer.wrap(secondBytes)); + } + private <T> void verifyObject(T expectedValue) throws IOException { byte[] serializedObject = SerializationUtils.serialize(expectedValue); assertNotNull(serializedObject); assertTrue(serializedObject.length > 0); - final T deserializedValue = SerializationUtils.<T>deserialize(serializedObject); + final T deserializedValue = SerializationUtils.deserialize(serializedObject); if (expectedValue == null) { assertNull(deserializedValue); } else {
