This is an automated email from the ASF dual-hosted git repository.
akudinkin pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-feature-rfc46 by this
push:
new 9a1aa0a683 [MINOR] Make sure all `HoodieRecord`s are appropriately
serializable by Kryo (#6977)
9a1aa0a683 is described below
commit 9a1aa0a6830b79761a9afa1c469772875d713aa1
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Wed Oct 19 21:16:13 2022 -0700
[MINOR] Make sure all `HoodieRecord`s are appropriately serializable by
Kryo (#6977)
* Make sure `HoodieRecord`, `HoodieKey`, `HoodieRecordLocation` are all
`KryoSerializable`
* Revisited `HoodieRecord` serialization hooks to make sure they a) could
not be overridden, b) provide for hooks to properly
serialize record's payload;
Implemented serialization hooks for `HoodieAvroIndexedRecord`;
Implemented serialization hooks for `HoodieEmptyRecord`;
* Revisited `HoodieRecord` serialization hooks to make sure they a) could
not be overridden, b) provide for hooks to properly
serialize record's payload;
Implemented serialization hooks for `HoodieAvroIndexedRecord`;
Implemented serialization hooks for `HoodieEmptyRecord`;
Implemented serialization hooks for `HoodieAvroRecord`;
* Revisited `HoodieSparkRecord` to transiently hold on to the schema so
that it could project row
* Implemented serialization hooks for `HoodieSparkRecord`
* Added `TestHoodieSparkRecord`
* Added tests for Avro-based records
* Added test for `HoodieEmptyRecord`
* Fixed sealing/unsealing for `HoodieRecord` in
`HoodieBackedTableMetadataWriter`
* Properly handle deflated records
* Fixing `Row`s encoding
* Fixed `HoodieRecord` to be properly sealed/unsealed
* Fixed serialization of the `HoodieRecordGlobalLocation`
---
.../metadata/HoodieBackedTableMetadataWriter.java | 2 +
.../hudi/commmon/model/HoodieSparkRecord.java | 105 ++++++++---
.../bulkinsert/RDDSpatialCurveSortPartitioner.java | 2 +-
.../hudi/common/model/HoodieAvroIndexedRecord.java | 30 ++++
.../apache/hudi/common/model/HoodieAvroRecord.java | 16 ++
.../hudi/common/model/HoodieEmptyRecord.java | 34 +++-
.../org/apache/hudi/common/model/HoodieKey.java | 23 ++-
.../org/apache/hudi/common/model/HoodieRecord.java | 53 +++++-
.../common/model/HoodieRecordGlobalLocation.java | 23 ++-
.../hudi/common/model/HoodieRecordLocation.java | 19 +-
.../sink/partitioner/BucketAssignFunction.java | 7 +
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 2 +-
.../scala/org/apache/hudi/LogFileIterator.scala | 2 +-
.../SparkFullBootstrapDataProviderBase.java | 2 +-
.../model/TestHoodieRecordSerialization.scala | 195 +++++++++++++++++++++
15 files changed, 473 insertions(+), 42 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 962875fb92..eda28a5286 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -983,7 +983,9 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
HoodieData<HoodieRecord> rddSinglePartitionRecords = records.map(r -> {
FileSlice slice =
finalFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
fileGroupCount));
+ r.unseal();
r.setCurrentLocation(new
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
+ r.seal();
return r;
});
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
index 9cdccbe407..43000d1964 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
@@ -18,6 +18,10 @@
package org.apache.hudi.commmon.model;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.Schema;
import org.apache.hudi.HoodieInternalRowUtils;
import org.apache.hudi.SparkAdapterSupport$;
@@ -70,9 +74,8 @@ import static org.apache.spark.sql.types.DataTypes.StringType;
* need to be updated (ie serving as an overlay layer on top of
[[UnsafeRow]])</li>
* </ul>
*
-
*/
-public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
+public class HoodieSparkRecord extends HoodieRecord<InternalRow> implements
KryoSerializable {
/**
* Record copy operation to avoid double copying. InternalRow do not need to
copy twice.
@@ -80,41 +83,58 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
private boolean copy;
/**
- * We should use this construction method when we read internalRow from file.
- * The record constructed by this method must be used in iter.
+ * NOTE: {@code HoodieSparkRecord} is holding the schema only in cases when
it would have
+ * to execute {@link UnsafeProjection} so that the {@link InternalRow}
it's holding to
+ * could be projected into {@link UnsafeRow} and be efficiently
serialized subsequently
+ * (by Kryo)
*/
- public HoodieSparkRecord(InternalRow data) {
+ private final transient StructType schema;
+
+ public HoodieSparkRecord(UnsafeRow data) {
+ this(data, null);
+ }
+
+ public HoodieSparkRecord(InternalRow data, StructType schema) {
super(null, data);
- validateRow(data);
+
+ validateRow(data, schema);
this.copy = false;
+ this.schema = schema;
+ }
+
+ public HoodieSparkRecord(HoodieKey key, UnsafeRow data, boolean copy) {
+ this(key, data, null, copy);
}
- public HoodieSparkRecord(HoodieKey key, InternalRow data, boolean copy) {
+ public HoodieSparkRecord(HoodieKey key, InternalRow data, StructType schema,
boolean copy) {
super(key, data);
- validateRow(data);
+
+ validateRow(data, schema);
this.copy = copy;
+ this.schema = schema;
}
- private HoodieSparkRecord(HoodieKey key, InternalRow data, HoodieOperation
operation, boolean copy) {
+ private HoodieSparkRecord(HoodieKey key, InternalRow data, StructType
schema, HoodieOperation operation, boolean copy) {
super(key, data, operation);
- validateRow(data);
+ validateRow(data, schema);
this.copy = copy;
+ this.schema = schema;
}
@Override
public HoodieSparkRecord newInstance() {
- return new HoodieSparkRecord(this.key, this.data, this.operation,
this.copy);
+ return new HoodieSparkRecord(this.key, this.data, this.schema,
this.operation, this.copy);
}
@Override
public HoodieSparkRecord newInstance(HoodieKey key, HoodieOperation op) {
- return new HoodieSparkRecord(key, this.data, op, this.copy);
+ return new HoodieSparkRecord(key, this.data, this.schema, op, this.copy);
}
@Override
public HoodieSparkRecord newInstance(HoodieKey key) {
- return new HoodieSparkRecord(key, this.data, this.operation, this.copy);
+ return new HoodieSparkRecord(key, this.data, this.schema, this.operation,
this.copy);
}
@Override
@@ -155,7 +175,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
UnsafeProjection projection =
HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType,
targetStructType);
- return new HoodieSparkRecord(getKey(), projection.apply(mergeRow),
getOperation(), copy);
+ return new HoodieSparkRecord(getKey(), projection.apply(mergeRow),
targetStructType, getOperation(), copy);
}
@Override
@@ -169,7 +189,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
// TODO add actual rewriting
InternalRow finalRow = new HoodieInternalRow(metaFields, data,
containMetaFields);
- return new HoodieSparkRecord(getKey(), finalRow, getOperation(), copy);
+ return new HoodieSparkRecord(getKey(), finalRow, targetStructType,
getOperation(), copy);
}
@Override
@@ -184,7 +204,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType,
newStructType, renameCols);
HoodieInternalRow finalRow = new HoodieInternalRow(metaFields,
rewrittenRow, containMetaFields);
- return new HoodieSparkRecord(getKey(), finalRow, getOperation(), copy);
+ return new HoodieSparkRecord(getKey(), finalRow, newStructType,
getOperation(), copy);
}
@Override
@@ -199,7 +219,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
}
});
- return new HoodieSparkRecord(getKey(), updatableRow, getOperation(), copy);
+ return new HoodieSparkRecord(getKey(), updatableRow, structType,
getOperation(), copy);
}
@Override
@@ -264,7 +284,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
partition =
data.get(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal(),
StringType).toString();
}
HoodieKey hoodieKey = new HoodieKey(key, partition);
- return new HoodieSparkRecord(hoodieKey, data, getOperation(), copy);
+ return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(),
copy);
}
@Override
@@ -299,6 +319,42 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
}
}
+ /**
+ * NOTE: This method is declared final to make sure there's no polymorphism
and therefore
+ * JIT compiler could perform more aggressive optimizations
+ */
+ @Override
+ protected final void writeRecordPayload(InternalRow payload, Kryo kryo,
Output output) {
+ // NOTE: [[payload]] could be null if record has already been deflated
+ UnsafeRow unsafeRow = convertToUnsafeRow(payload, schema);
+
+ kryo.writeObjectOrNull(output, unsafeRow, UnsafeRow.class);
+ }
+
+ /**
+ * NOTE: This method is declared final to make sure there's no polymorphism
and therefore
+ * JIT compiler could perform more aggressive optimizations
+ */
+ @Override
+ protected final InternalRow readRecordPayload(Kryo kryo, Input input) {
+ // NOTE: After deserialization every object is allocated on the heap,
therefore
+ // we annotate this object as being copied
+ this.copy = true;
+
+ return kryo.readObjectOrNull(input, UnsafeRow.class);
+ }
+
+ private static UnsafeRow convertToUnsafeRow(InternalRow payload, StructType
schema) {
+ if (payload == null) {
+ return null;
+ } else if (payload instanceof UnsafeRow) {
+ return (UnsafeRow) payload;
+ }
+
+ UnsafeProjection unsafeProjection =
HoodieInternalRowUtils.getCachedUnsafeProjection(schema, schema);
+ return unsafeProjection.apply(payload);
+ }
+
private static HoodieInternalRow wrapIntoUpdatableOverlay(InternalRow data,
StructType structType) {
if (data instanceof HoodieInternalRow) {
return (HoodieInternalRow) data;
@@ -351,14 +407,21 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
HoodieOperation operation = withOperationField
? HoodieOperation.fromName(getNullableValAsString(structType,
record.data, HoodieRecord.OPERATION_METADATA_FIELD)) : null;
- return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath),
record.data, operation, record.copy);
+ return new HoodieSparkRecord(new HoodieKey(recKey, partitionPath),
record.data, structType, operation, record.copy);
}
- private static void validateRow(InternalRow data) {
+ private static void validateRow(InternalRow data, StructType schema) {
// NOTE: [[HoodieSparkRecord]] is expected to hold either
// - Instance of [[UnsafeRow]] or
// - Instance of [[HoodieInternalRow]] or
// - Instance of [[ColumnarBatchRow]]
- ValidationUtils.checkState(data instanceof UnsafeRow || data instanceof
HoodieInternalRow ||
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
+ //
+ // In case provided row is anything but [[UnsafeRow]], it's expected
that the
+ // corresponding schema has to be provided as well so that it could
be properly
+ // serialized (in case it would need to be)
+ boolean isValid = data instanceof UnsafeRow
+ || schema != null && (data instanceof HoodieInternalRow ||
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
+
+ ValidationUtils.checkState(isValid);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
index 8673d2f5ba..92fa653061 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
@@ -101,7 +101,7 @@ public class RDDSpatialCurveSortPartitioner<T>
String key =
internalRow.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
String partition =
internalRow.getString(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal());
HoodieKey hoodieKey = new HoodieKey(key, partition);
- return (HoodieRecord) new HoodieSparkRecord(hoodieKey,
internalRow, false);
+ return (HoodieRecord) new HoodieSparkRecord(hoodieKey,
internalRow, structType, false);
});
} else {
throw new UnsupportedOperationException(recordType.name());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index 479c8eb9d6..a9e5091429 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -18,6 +18,10 @@
package org.apache.hudi.common.model;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
@@ -200,4 +204,30 @@ public class HoodieAvroIndexedRecord extends
HoodieRecord<IndexedRecord> {
public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema,
Properties props) {
return Option.of(this);
}
+
+ /**
+ * NOTE: This method is declared final to make sure there's no polymorphism
and therefore
+ * JIT compiler could perform more aggressive optimizations
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ protected final void writeRecordPayload(IndexedRecord payload, Kryo kryo,
Output output) {
+ // NOTE: We're leveraging Spark's default [[GenericAvroSerializer]] to
serialize Avro
+ Serializer<GenericRecord> avroSerializer =
kryo.getSerializer(GenericRecord.class);
+
+ kryo.writeObjectOrNull(output, payload, avroSerializer);
+ }
+
+ /**
+ * NOTE: This method is declared final to make sure there's no polymorphism
and therefore
+ * JIT compiler could perform more aggressive optimizations
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ protected final IndexedRecord readRecordPayload(Kryo kryo, Input input) {
+ // NOTE: We're leveraging Spark's default [[GenericAvroSerializer]] to
serialize Avro
+ Serializer<GenericRecord> avroSerializer =
kryo.getSerializer(GenericRecord.class);
+
+ return kryo.readObjectOrNull(input, GenericRecord.class, avroSerializer);
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 5cbadece6b..de653054cd 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -19,6 +19,9 @@
package org.apache.hudi.common.model;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
@@ -193,4 +196,17 @@ public class HoodieAvroRecord<T extends
HoodieRecordPayload> extends HoodieRecor
return Option.empty();
}
}
+
+ @Override
+ protected final void writeRecordPayload(T payload, Kryo kryo, Output output)
{
+ // NOTE: Since [[orderingVal]] is polymorphic we have to write out its
class
+ // to be able to properly deserialize it
+ kryo.writeClassAndObject(output, payload);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected final T readRecordPayload(Kryo kryo, Input input) {
+ return (T) kryo.readClassAndObject(input);
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
index eca1ad9a3b..8e80ff7c84 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
@@ -18,20 +18,22 @@
package org.apache.hudi.common.model;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.BaseKeyGenerator;
-import org.apache.avro.Schema;
-
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
- private final HoodieRecordType type;
- private final Comparable<?> orderingVal;
+ private HoodieRecordType type;
+ private Comparable<?> orderingVal;
public HoodieEmptyRecord(HoodieKey key, HoodieRecordType type) {
super(key, null);
@@ -152,4 +154,28 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
public Option<Map<String, String>> getMetadata() {
return Option.empty();
}
+
+ /**
+ * NOTE: This method is declared final to make sure there's no polymorphism
and therefore
+ * JIT compiler could perform more aggressive optimizations
+ */
+ @Override
+ protected final void writeRecordPayload(T payload, Kryo kryo, Output output)
{
+ kryo.writeObject(output, type);
+ // NOTE: Since [[orderingVal]] is polymorphic we have to write out its
class
+ // to be able to properly deserialize it
+ kryo.writeClassAndObject(output, orderingVal);
+ }
+
+ /**
+ * NOTE: This method is declared final to make sure there's no polymorphism
and therefore
+ * JIT compiler could perform more aggressive optimizations
+ */
+ @Override
+ protected final T readRecordPayload(Kryo kryo, Input input) {
+ this.type = kryo.readObject(input, HoodieRecordType.class);
+ this.orderingVal = (Comparable<?>) kryo.readClassAndObject(input);
+ // NOTE: [[EmptyRecord]]'s payload is always null
+ return null;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java
index 9030204099..5208cece1c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java
@@ -18,6 +18,11 @@
package org.apache.hudi.common.model;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
import java.io.Serializable;
import java.util.Objects;
@@ -27,13 +32,13 @@ import java.util.Objects;
* - recordKey : a recordKey that acts as primary key for a record.
* - partitionPath : the partition path of a record.
*/
-public class HoodieKey implements Serializable {
+public final class HoodieKey implements Serializable, KryoSerializable {
private String recordKey;
private String partitionPath;
- public HoodieKey() {
- }
+ // Required for serializer
+ public HoodieKey() {}
public HoodieKey(String recordKey, String partitionPath) {
this.recordKey = recordKey;
@@ -81,4 +86,16 @@ public class HoodieKey implements Serializable {
sb.append('}');
return sb.toString();
}
+
+ @Override
+ public void write(Kryo kryo, Output output) {
+ output.writeString(recordKey);
+ output.writeString(partitionPath);
+ }
+
+ @Override
+ public void read(Kryo kryo, Input input) {
+ this.recordKey = input.readString();
+ this.partitionPath = input.readString();
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 363092409d..778186d4bc 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -18,10 +18,12 @@
package org.apache.hudi.common.model;
-import java.util.Collections;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
@@ -29,18 +31,19 @@ import org.apache.hudi.keygen.BaseKeyGenerator;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.Properties;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* A Single Record managed by Hoodie.
*/
-public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterface, Serializable {
+public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterface, KryoSerializable, Serializable {
public static final String COMMIT_TIME_METADATA_FIELD =
HoodieMetadataField.COMMIT_TIME_METADATA_FIELD.getFieldName();
public static final String COMMIT_SEQNO_METADATA_FIELD =
HoodieMetadataField.COMMIT_SEQNO_METADATA_FIELD.getFieldName();
@@ -158,8 +161,7 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
this.sealed = record.sealed;
}
- public HoodieRecord() {
- }
+ public HoodieRecord() {}
public abstract HoodieRecord<T> newInstance();
@@ -282,6 +284,45 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
}
}
+ protected abstract void writeRecordPayload(T payload, Kryo kryo, Output
output);
+
+ protected abstract T readRecordPayload(Kryo kryo, Input input);
+
+ /**
+ * NOTE: This method is declared final to make sure there's no polymorphism
and therefore
+ * JIT compiler could perform more aggressive optimizations
+ */
+ @Override
+ public final void write(Kryo kryo, Output output) {
+ kryo.writeObjectOrNull(output, key, HoodieKey.class);
+ kryo.writeObjectOrNull(output, operation, HoodieOperation.class);
+ // NOTE: We have to write actual class along with the object here,
+ // since [[HoodieRecordLocation]] has inheritors
+ kryo.writeClassAndObject(output, currentLocation);
+ kryo.writeClassAndObject(output, newLocation);
+ // NOTE: Writing out actual record payload is relegated to the actual
+ // implementation
+ writeRecordPayload(data, kryo, output);
+ }
+
+ /**
+ * NOTE: This method is declared final to make sure there's no polymorphism
and therefore
+ * JIT compiler could perform more aggressive optimizations
+ */
+ @Override
+ public final void read(Kryo kryo, Input input) {
+ this.key = kryo.readObjectOrNull(input, HoodieKey.class);
+ this.operation = kryo.readObjectOrNull(input, HoodieOperation.class);
+ this.currentLocation = (HoodieRecordLocation)
kryo.readClassAndObject(input);
+ this.newLocation = (HoodieRecordLocation) kryo.readClassAndObject(input);
+ // NOTE: Reading out actual record payload is relegated to the actual
+ // implementation
+ this.data = readRecordPayload(kryo, input);
+
+ // NOTE: We're always seal object after deserialization
+ this.sealed = true;
+ }
+
/**
* Get column in record to support RDDCustomColumnsSortPartitioner
* @return
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
index f469a1ab45..8c021d902a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordGlobalLocation.java
@@ -18,18 +18,21 @@
package org.apache.hudi.common.model;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
import java.util.Objects;
/**
* Similar with {@link org.apache.hudi.common.model.HoodieRecordLocation} but
with partition path.
*/
-public class HoodieRecordGlobalLocation extends HoodieRecordLocation {
+public final class HoodieRecordGlobalLocation extends HoodieRecordLocation {
private static final long serialVersionUID = 1L;
private String partitionPath;
- public HoodieRecordGlobalLocation() {
- }
+ public HoodieRecordGlobalLocation() {}
public HoodieRecordGlobalLocation(String partitionPath, String instantTime,
String fileId) {
super(instantTime, fileId);
@@ -93,5 +96,19 @@ public class HoodieRecordGlobalLocation extends
HoodieRecordLocation {
public HoodieRecordGlobalLocation copy(String partitionPath) {
return new HoodieRecordGlobalLocation(partitionPath, instantTime, fileId);
}
+
+ @Override
+ public final void write(Kryo kryo, Output output) {
+ super.write(kryo, output);
+
+ kryo.writeObjectOrNull(output, partitionPath, String.class);
+ }
+
+ @Override
+ public void read(Kryo kryo, Input input) {
+ super.read(kryo, input);
+
+ this.partitionPath = kryo.readObject(input, String.class);
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
index 2b1feab39b..8b1dd2b378 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordLocation.java
@@ -18,13 +18,18 @@
package org.apache.hudi.common.model;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoSerializable;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
import java.io.Serializable;
import java.util.Objects;
/**
* Location of a HoodieRecord within the partition it belongs to. Ultimately,
this points to an actual file on disk
*/
-public class HoodieRecordLocation implements Serializable {
+public class HoodieRecordLocation implements Serializable, KryoSerializable {
protected String instantTime;
protected String fileId;
@@ -78,4 +83,16 @@ public class HoodieRecordLocation implements Serializable {
public void setFileId(String fileId) {
this.fileId = fileId;
}
+
+ @Override
+ public void write(Kryo kryo, Output output) {
+ output.writeString(instantTime);
+ output.writeString(fileId);
+ }
+
+ @Override
+ public void read(Kryo kryo, Input input) {
+ this.instantTime = input.readString();
+ this.fileId = input.readString();
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
index 89f89cf5c0..550016a236 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java
@@ -184,8 +184,11 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
// then update the index state using location with new partition
path.
HoodieRecord<?> deleteRecord = new HoodieAvroRecord<>(new
HoodieKey(recordKey, oldLoc.getPartitionPath()),
payloadCreation.createDeletePayload((BaseAvroPayload)
record.getData()));
+
+ deleteRecord.unseal();
deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
deleteRecord.seal();
+
out.collect((O) deleteRecord);
}
location = getNewRecordLocation(partitionPath);
@@ -200,7 +203,11 @@ public class BucketAssignFunction<K, I, O extends
HoodieRecord<?>>
if (isChangingRecords) {
updateIndexState(partitionPath, location);
}
+
+ record.unseal();
record.setCurrentLocation(location);
+ record.seal();
+
out.collect((O) record);
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 99313884c3..2ae18bd860 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -877,7 +877,7 @@ object HoodieSparkSqlWriter {
val partitionPath =
sparkKeyGenerator.getPartitionPath(internalRow, structType)
val key = new HoodieKey(recordKey.toString, partitionPath.toString)
- new HoodieSparkRecord(key, processedRow, false)
+ new HoodieSparkRecord(key, processedRow, structType, false)
}
}.toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]]
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
index 256bc14e82..6ee5856d6a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/LogFileIterator.scala
@@ -238,7 +238,7 @@ class RecordMergingFileIterator(split:
HoodieMergeOnReadFileSplit,
// on the record from the Delta Log
recordMerger.getRecordType match {
case HoodieRecordType.SPARK =>
- val curRecord = new HoodieSparkRecord(curRow)
+ val curRecord = new HoodieSparkRecord(curRow, baseFileReader.schema)
val result = recordMerger.merge(curRecord, baseFileReaderAvroSchema,
newRecord, logFileReaderAvroSchema, payloadProps)
toScalaOption(result)
.map(r => {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
index dc408ee919..e666c89306 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/bootstrap/SparkFullBootstrapDataProviderBase.java
@@ -96,7 +96,7 @@ public abstract class SparkFullBootstrapDataProviderBase
extends FullRecordBoots
String recordKey = sparkKeyGenerator.getRecordKey(internalRow,
structType).toString();
String partitionPath =
sparkKeyGenerator.getPartitionPath(internalRow, structType).toString();
HoodieKey key = new HoodieKey(recordKey, partitionPath);
- return new HoodieSparkRecord(key, internalRow, false);
+ return new HoodieSparkRecord(key, internalRow, structType, false);
});
} else {
throw new UnsupportedOperationException(recordType.name());
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
new file mode 100644
index 0000000000..8329fda093
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model
+
+import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.AvroConversionUtils.{convertStructTypeToAvroSchema,
createInternalRowToAvroConverter}
+import org.apache.hudi.{HoodieInternalRowUtils, SparkAdapterSupport}
+import org.apache.hudi.client.model.HoodieInternalRow
+import org.apache.hudi.commmon.model.HoodieSparkRecord
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import
org.apache.hudi.common.model.TestHoodieRecordSerialization.{OverwriteWithLatestAvroPayloadWithEquality,
cloneUsingKryo, convertToAvroRecord, toUnsafeRow}
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.objects.SerializerSupport
+import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema,
UnsafeRow}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.addMetaFields
+import org.apache.spark.sql.types.{Decimal, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+import java.nio.ByteBuffer
+import java.sql.{Date, Timestamp}
+import java.time.{Instant, LocalDate}
+import java.util.Objects
+
+class TestHoodieRecordSerialization extends SparkClientFunctionalTestHarness {
+
+ private val rowSchema = StructType.fromDDL("a INT, b STRING, c DATE, d
TIMESTAMP, e STRUCT<a: DECIMAL(3, 2)>")
+
+ @Test
+ def testSparkRecord(): Unit = {
+ def routine(row: InternalRow, schema: StructType, serializedSize: Int):
Unit = {
+ val record = row match {
+ case ur: UnsafeRow => new HoodieSparkRecord(ur)
+ case _ => new HoodieSparkRecord(row, schema)
+ }
+
+ // Step 1: Serialize/de- original [[HoodieSparkRecord]]
+ val (cloned, originalBytes) = cloneUsingKryo(record)
+
+ assertEquals(serializedSize, originalBytes.length)
+ // NOTE: That in case when original row isn't an instance of
[[UnsafeRow]]
+ // it would be
+ // - Projected into [[UnsafeRow]] (prior to serialization by
Kryo)
+ // - Re-constructed as [[UnsafeRow]]
+ row match {
+ case _: UnsafeRow => assertEquals(record, cloned)
+ case _ =>
+ val convertedRecord = new HoodieSparkRecord(toUnsafeRow(row, schema))
+ assertEquals(convertedRecord, cloned)
+ }
+
+ // Step 2: Serialize the already cloned record, and assert that ser/de
loop is lossless
+ val (_, clonedBytes) = cloneUsingKryo(cloned)
+ assertEquals(ByteBuffer.wrap(originalBytes),
ByteBuffer.wrap(clonedBytes))
+ }
+
+ val row = Row(1, "test", Date.valueOf(LocalDate.of(2022, 10, 1)),
+ Timestamp.from(Instant.parse("2022-10-01T23:59:59.00Z")),
Row(Decimal.apply(123, 3, 2)))
+
+ val unsafeRow: UnsafeRow = toUnsafeRow(row, rowSchema)
+ val hoodieInternalRow = new HoodieInternalRow(new Array[UTF8String](5),
unsafeRow, false)
+
+ Seq(
+ (unsafeRow, rowSchema, 135),
+ (hoodieInternalRow, addMetaFields(rowSchema), 175)
+ ) foreach { case (row, schema, expectedSize) => routine(row, schema,
expectedSize) }
+ }
+
+ @Test
+ def testAvroRecords(): Unit = {
+ def routine(record: HoodieRecord[_], expectedSize: Int): Unit = {
+ // Step 1: Serialize/de- original [[HoodieRecord]]
+ val (cloned, originalBytes) = cloneUsingKryo(record)
+
+ assertEquals(expectedSize, originalBytes.length)
+ assertEquals(record, cloned)
+
+ // Step 2: Serialize the already cloned record, and assert that ser/de
loop is lossless
+ val (_, clonedBytes) = cloneUsingKryo(cloned)
+ assertEquals(ByteBuffer.wrap(originalBytes),
ByteBuffer.wrap(clonedBytes))
+ }
+
+ val row = new GenericRowWithSchema(Array(1, "test",
Date.valueOf(LocalDate.of(2022, 10, 1)),
+ Timestamp.from(Instant.parse("2022-10-01T23:59:59.00Z")),
Row(Decimal.apply(123, 3, 2))), rowSchema)
+ val avroRecord = convertToAvroRecord(row)
+
+ val key = new HoodieKey("rec-key", "part-path")
+
+ val legacyRecord = toLegacyAvroRecord(avroRecord, key)
+ val avroIndexedRecord = new HoodieAvroIndexedRecord(key, avroRecord)
+
+ Seq(
+ (legacyRecord, 573),
+ (avroIndexedRecord, 442)
+ ) foreach { case (record, expectedSize) => routine(record, expectedSize) }
+ }
+
+ @Test
+ def testEmptyRecord(): Unit = {
+ def routine(record: HoodieRecord[_], expectedSize: Int): Unit = {
+ // Step 1: Serialize/de- original [[HoodieRecord]]
+ val (cloned, originalBytes) = cloneUsingKryo(record)
+
+ assertEquals(expectedSize, originalBytes.length)
+ assertEquals(record, cloned)
+
+ // Step 2: Serialize the already cloned record, and assert that ser/de
loop is lossless
+ val (_, clonedBytes) = cloneUsingKryo(cloned)
+ assertEquals(ByteBuffer.wrap(originalBytes),
ByteBuffer.wrap(clonedBytes))
+ }
+
+ val key = new HoodieKey("rec-key", "part-path")
+
+ Seq(
+ (new HoodieEmptyRecord[GenericRecord](key, HoodieOperation.INSERT, 1,
HoodieRecordType.AVRO), 74),
+ (new HoodieEmptyRecord[GenericRecord](key, HoodieOperation.INSERT, 2,
HoodieRecordType.SPARK), 74)
+ ) foreach { case (record, expectedSize) => routine(record, expectedSize) }
+ }
+
+
+ private def toLegacyAvroRecord(avroRecord: GenericRecord, key: HoodieKey):
HoodieAvroRecord[OverwriteWithLatestAvroPayload] = {
+ val avroRecordPayload = new
OverwriteWithLatestAvroPayloadWithEquality(avroRecord, 0)
+ val legacyRecord = new
HoodieAvroRecord[OverwriteWithLatestAvroPayload](key, avroRecordPayload)
+
+ legacyRecord
+ }
+}
+
+object TestHoodieRecordSerialization {
+
+ private def cloneUsingKryo[T](r: HoodieRecord[T]): (HoodieRecord[T],
Array[Byte]) = {
+ val serializer = SerializerSupport.newSerializer(true)
+
+ val buf = serializer.serialize(r)
+ val cloned: HoodieRecord[T] = serializer.deserialize(buf)
+
+ val bytes = new Array[Byte](buf.remaining())
+ buf.get(bytes)
+
+ (cloned, bytes)
+ }
+
+ private def toUnsafeRow(row: InternalRow, schema: StructType): UnsafeRow = {
+ val project = HoodieInternalRowUtils.getCachedUnsafeProjection(schema,
schema)
+ project(row)
+ }
+
+ private def toUnsafeRow(row: Row, schema: StructType): UnsafeRow = {
+ val encoder = SparkAdapterSupport.sparkAdapter.createSparkRowSerDe(schema)
+ val internalRow = encoder.serializeRow(row)
+ internalRow.asInstanceOf[UnsafeRow]
+ }
+
+ private def convertToAvroRecord(row: Row): GenericRecord = {
+ val schema = convertStructTypeToAvroSchema(row.schema, "testRecord",
"testNamespace")
+
+ createInternalRowToAvroConverter(row.schema, schema, nullable = false)
+ .apply(toUnsafeRow(row, row.schema))
+ }
+
+ class OverwriteWithLatestAvroPayloadWithEquality(avroRecord: GenericRecord,
_orderingVal: Comparable[_])
+ extends OverwriteWithLatestAvroPayload(avroRecord, _orderingVal) {
+ override def equals(obj: Any): Boolean =
+ obj match {
+ case p: OverwriteWithLatestAvroPayloadWithEquality =>
+ Objects.equals(ByteBuffer.wrap(this.recordBytes),
ByteBuffer.wrap(p.recordBytes)) &&
+ Objects.equals(this.orderingVal, p.orderingVal)
+ case _ =>
+ false
+ }
+
+ override def hashCode(): Int = Objects.hash(avroRecord,
_orderingVal.asInstanceOf[AnyRef])
+ }
+
+}