This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a2db1a9363a [HUDI-6617] Make HoodieRecordDelegate implement
KryoSerializable (#9327)
a2db1a9363a is described below
commit a2db1a9363a68d0dacc7f2970f37f016aefc2951
Author: Dongsj <[email protected]>
AuthorDate: Sat Aug 5 20:15:30 2023 +0800
[HUDI-6617] Make HoodieRecordDelegate implement KryoSerializable (#9327)
---
.../hudi/common/model/HoodieRecordDelegate.java | 30 +++++++-
.../common/util/HoodieCommonKryoRegistrar.java | 4 +-
.../common/model/TestHoodieRecordDelegate.java | 90 ++++++++++++++++++++++
3 files changed, 119 insertions(+), 5 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
index d265c15f114..a9323c15988 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordDelegate.java
@@ -20,6 +20,12 @@
package org.apache.hudi.common.model;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
import javax.annotation.Nullable;
@@ -32,19 +38,19 @@ import java.io.Serializable;
* instead of passing back the full {@link HoodieRecord}, this lean delegate
* of it will be passed instead.
*/
-public class HoodieRecordDelegate implements Serializable {
+public class HoodieRecordDelegate implements Serializable, KryoSerializable {
- private final HoodieKey hoodieKey;
+ private HoodieKey hoodieKey;
/**
* Current location of record on storage. Filled in by looking up index
*/
- private final Option<HoodieRecordLocation> currentLocation;
+ private Option<HoodieRecordLocation> currentLocation;
/**
* New location of record on storage, after written.
*/
- private final Option<HoodieRecordLocation> newLocation;
+ private Option<HoodieRecordLocation> newLocation;
private HoodieRecordDelegate(HoodieKey hoodieKey,
@Nullable HoodieRecordLocation currentLocation,
@@ -122,4 +128,20 @@ public class HoodieRecordDelegate implements Serializable {
+ ", newLocation=" + newLocation
+ '}';
}
+
+ @VisibleForTesting
+ @Override
+ public final void write(Kryo kryo, Output output) {
+ kryo.writeObjectOrNull(output, hoodieKey, HoodieKey.class);
+ kryo.writeClassAndObject(output, currentLocation.isPresent() ?
currentLocation.get() : null);
+ kryo.writeClassAndObject(output, newLocation.isPresent() ?
newLocation.get() : null);
+ }
+
+ @VisibleForTesting
+ @Override
+ public final void read(Kryo kryo, Input input) {
+ this.hoodieKey = kryo.readObjectOrNull(input, HoodieKey.class);
+ this.currentLocation = Option.ofNullable((HoodieRecordLocation)
kryo.readClassAndObject(input));
+ this.newLocation = Option.ofNullable((HoodieRecordLocation)
kryo.readClassAndObject(input));
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoRegistrar.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoRegistrar.java
index 5db7c641ac2..42a16b7723a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoRegistrar.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoRegistrar.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordDelegate;
import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
@@ -82,7 +83,8 @@ public class HoodieCommonKryoRegistrar {
HoodieMetadataPayload.class,
HoodieRecordLocation.class,
- HoodieRecordGlobalLocation.class
+ HoodieRecordGlobalLocation.class,
+ HoodieRecordDelegate.class
})
.forEachOrdered(kryo::register);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecordDelegate.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecordDelegate.java
new file mode 100644
index 00000000000..5269ab16948
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecordDelegate.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import org.apache.hudi.common.testutils.AvroBinaryTestPayload;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.HoodieCommonKryoRegistrar;
+import org.apache.hudi.common.util.Option;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieRecordDelegate {
+ private HoodieRecordDelegate hoodieRecordDelegate;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ final List<IndexedRecord> indexedRecords =
testUtil.generateHoodieTestRecords(0, 1);
+ final List<HoodieRecord> hoodieRecords =
+ indexedRecords.stream().map(r -> new HoodieAvroRecord(new
HoodieKey("001", "0000/00/00"),
+ new AvroBinaryTestPayload(Option.of((GenericRecord)
r)))).collect(Collectors.toList());
+ HoodieRecord record = hoodieRecords.get(0);
+ record.setCurrentLocation(new HoodieRecordLocation("001", "file01"));
+ record.setNewLocation(new HoodieRecordLocation("001", "file-01"));
+ hoodieRecordDelegate = HoodieRecordDelegate.fromHoodieRecord(record);
+ }
+
+ @Test
+ public void testKryoSerializeDeserialize() {
+ Kryo kryo = getKryoInstance();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+ kryo.reset();
+ baos.reset();
+ Output output = new Output(baos);
+ hoodieRecordDelegate.write(kryo, output);
+ output.close();
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ Input input = new Input(bais);
+ hoodieRecordDelegate.read(kryo, input);
+ input.close();
+
+ assertEquals(new HoodieKey("001", "0000/00/00"),
hoodieRecordDelegate.getHoodieKey());
+ assertEquals(new HoodieRecordLocation("001", "file01"),
hoodieRecordDelegate.getCurrentLocation().get());
+ assertEquals(new HoodieRecordLocation("001", "file-01"),
hoodieRecordDelegate.getNewLocation().get());
+ }
+
+ public Kryo getKryoInstance() {
+ final Kryo kryo = new Kryo();
+ // This instance of Kryo should not require prior registration of classes
+ kryo.setRegistrationRequired(false);
+ kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new
StdInstantiatorStrategy()));
+ // Handle cases where we may have an odd classloader setup like with
libjars
+ // for hadoop
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+ // Register Hudi's classes
+ new HoodieCommonKryoRegistrar().registerClasses(kryo);
+ return kryo;
+ }
+}