This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a2db1a9363a [HUDI-6617] Make HoodieRecordDelegate implement 
KryoSerializable (#9327)
a2db1a9363a is described below

commit a2db1a9363a68d0dacc7f2970f37f016aefc2951
Author: Dongsj <[email protected]>
AuthorDate: Sat Aug 5 20:15:30 2023 +0800

    [HUDI-6617] Make HoodieRecordDelegate implement KryoSerializable (#9327)
---
 .../hudi/common/model/HoodieRecordDelegate.java    | 30 +++++++-
 .../common/util/HoodieCommonKryoRegistrar.java     |  4 +-
 .../common/model/TestHoodieRecordDelegate.java     | 90 ++++++++++++++++++++++
 3 files changed, 119 insertions(+), 5 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
index d265c15f114..a9323c15988 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
@@ -20,6 +20,12 @@
 package org.apache.hudi.common.model;
 
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 
 import javax.annotation.Nullable;
 
@@ -32,19 +38,19 @@ import java.io.Serializable;
  * instead of passing back the full {@link HoodieRecord}, this lean delegate
  * of it will be passed instead.
  */
-public class HoodieRecordDelegate implements Serializable {
+public class HoodieRecordDelegate implements Serializable, KryoSerializable {
 
-  private final HoodieKey hoodieKey;
+  private HoodieKey hoodieKey;
 
   /**
    * Current location of record on storage. Filled in by looking up index
    */
-  private final Option<HoodieRecordLocation> currentLocation;
+  private Option<HoodieRecordLocation> currentLocation;
 
   /**
    * New location of record on storage, after written.
    */
-  private final Option<HoodieRecordLocation> newLocation;
+  private Option<HoodieRecordLocation> newLocation;
 
   private HoodieRecordDelegate(HoodieKey hoodieKey,
                                @Nullable HoodieRecordLocation currentLocation,
@@ -122,4 +128,20 @@ public class HoodieRecordDelegate implements Serializable {
         + ", newLocation=" + newLocation
         + '}';
   }
+
+  @VisibleForTesting
+  @Override
+  public final void write(Kryo kryo, Output output) {
+    kryo.writeObjectOrNull(output, hoodieKey, HoodieKey.class);
+    kryo.writeClassAndObject(output, currentLocation.isPresent() ? 
currentLocation.get() : null);
+    kryo.writeClassAndObject(output, newLocation.isPresent() ? 
newLocation.get() : null);
+  }
+
+  @VisibleForTesting
+  @Override
+  public final void read(Kryo kryo, Input input) {
+    this.hoodieKey = kryo.readObjectOrNull(input, HoodieKey.class);
+    this.currentLocation = Option.ofNullable((HoodieRecordLocation) 
kryo.readClassAndObject(input));
+    this.newLocation = Option.ofNullable((HoodieRecordLocation) 
kryo.readClassAndObject(input));
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoRegistrar.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoRegistrar.java
index 5db7c641ac2..42a16b7723a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoRegistrar.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoRegistrar.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieEmptyRecord;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
 import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.PartialUpdateAvroPayload;
@@ -82,7 +83,8 @@ public class HoodieCommonKryoRegistrar {
         HoodieMetadataPayload.class,
 
         HoodieRecordLocation.class,
-        HoodieRecordGlobalLocation.class
+        HoodieRecordGlobalLocation.class,
+        HoodieRecordDelegate.class
     })
         .forEachOrdered(kryo::register);
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecordDelegate.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecordDelegate.java
new file mode 100644
index 00000000000..5269ab16948
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecordDelegate.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.testutils.AvroBinaryTestPayload;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.HoodieCommonKryoRegistrar;
+import org.apache.hudi.common.util.Option;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieRecordDelegate {
+  private HoodieRecordDelegate hoodieRecordDelegate;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    SchemaTestUtil testUtil = new SchemaTestUtil();
+    final List<IndexedRecord> indexedRecords = 
testUtil.generateHoodieTestRecords(0, 1);
+    final List<HoodieRecord> hoodieRecords =
+        indexedRecords.stream().map(r -> new HoodieAvroRecord(new 
HoodieKey("001", "0000/00/00"),
+            new AvroBinaryTestPayload(Option.of((GenericRecord) 
r)))).collect(Collectors.toList());
+    HoodieRecord record = hoodieRecords.get(0);
+    record.setCurrentLocation(new HoodieRecordLocation("001", "file01"));
+    record.setNewLocation(new HoodieRecordLocation("001", "file-01"));
+    hoodieRecordDelegate = HoodieRecordDelegate.fromHoodieRecord(record);
+  }
+
+  @Test
+  public void testKryoSerializeDeserialize() {
+    Kryo kryo = getKryoInstance();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+    kryo.reset();
+    baos.reset();
+    Output output = new Output(baos);
+    hoodieRecordDelegate.write(kryo, output);
+    output.close();
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    Input input = new Input(bais);
+    hoodieRecordDelegate.read(kryo, input);
+    input.close();
+
+    assertEquals(new HoodieKey("001", "0000/00/00"), 
hoodieRecordDelegate.getHoodieKey());
+    assertEquals(new HoodieRecordLocation("001", "file01"), 
hoodieRecordDelegate.getCurrentLocation().get());
+    assertEquals(new HoodieRecordLocation("001", "file-01"), 
hoodieRecordDelegate.getNewLocation().get());
+  }
+
+  public Kryo getKryoInstance() {
+    final Kryo kryo = new Kryo();
+    // This instance of Kryo should not require prior registration of classes
+    kryo.setRegistrationRequired(false);
+    kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new 
StdInstantiatorStrategy()));
+    // Handle cases where we may have an odd classloader setup like with 
libjars
+    // for hadoop
+    kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+    // Register Hudi's classes
+    new HoodieCommonKryoRegistrar().registerClasses(kryo);
+    return kryo;
+  }
+}

Reply via email to