This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 211c315e57f [HUDI-5760] Use Avro as serde for delete log blocks (#9315)
211c315e57f is described below
commit 211c315e57f840d567dabfc2263012e201a6d868
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Aug 1 17:52:02 2023 -0700
[HUDI-5760] Use Avro as serde for delete log blocks (#9315)
This commit changes the format of delete log blocks and upgrades the log
block version from 2 to 3. After this PR, the delete records (record key,
partition path, and ordering value) are serialized in Avro and then written
into the delete log block, instead of using Kyro which is a serialization
framework for Java. This makes delete blocks deserializable in programming
languages other than Java, as Avro is supported by common programming languages
such as Java, C/C++, Python, and Rust.
Based on the micro-benchmark results, Avro serde is moderately slower than
Kryo, except for serializing 100k+ entries. The difference is only at
100-millisecond level at most for 1 million entries. The bytes generated by
Avro are 10% to 20% larger than that by Kryo.
---
.../apache/hudi/index/bloom/HoodieBloomIndex.java | 6 +-
.../src/main/avro/HoodieDeleteRecordList.avsc | 76 +++++++++++
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 137 +++++++++++++++++++
.../common/table/log/block/HoodieDeleteBlock.java | 73 +++++++++-
.../common/table/log/block/HoodieLogBlock.java | 3 +-
.../hudi/metadata/HoodieMetadataPayload.java | 138 ++-----------------
.../hudi/metadata/HoodieTableMetadataUtil.java | 6 +-
.../org/apache/hudi/avro/TestHoodieAvroUtils.java | 151 ++++++++++++++++++++-
.../common/functional/TestHoodieLogFormat.java | 127 ++++++++++++++++-
.../table/log/block/TestHoodieDeleteBlock.java | 144 ++++++++++++++++++++
.../hudi/common/testutils/SchemaTestUtil.java | 55 +++++++-
.../format/delete-block-v2-content-10-records.data | Bin 0 -> 605 bytes
12 files changed, 768 insertions(+), 148 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
index eca347a75bc..fcd7135833f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
@@ -54,9 +54,9 @@ import java.util.stream.Stream;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
+import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static
org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions;
-import static
org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
/**
@@ -235,8 +235,8 @@ public class HoodieBloomIndex extends HoodieIndex<Object,
Object> {
new BloomIndexFileInfo(
FSUtils.getFileId(entry.getKey().getRight()),
// NOTE: Here we assume that the type of the primary key field
is string
- (String)
unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
- (String)
unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
+ (String) unwrapAvroValueWrapper(entry.getValue().getMinValue()),
+ (String) unwrapAvroValueWrapper(entry.getValue().getMaxValue())
)));
}
diff --git a/hudi-common/src/main/avro/HoodieDeleteRecordList.avsc
b/hudi-common/src/main/avro/HoodieDeleteRecordList.avsc
new file mode 100644
index 00000000000..356fbe51dfe
--- /dev/null
+++ b/hudi-common/src/main/avro/HoodieDeleteRecordList.avsc
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+ "namespace": "org.apache.hudi.avro.model",
+ "type": "record",
+ "name": "HoodieDeleteRecordList",
+ "doc": "A list of delete records stored in the delete block in log block
version 3",
+ "fields": [
+ {
+ "name": "deleteRecordList",
+ "doc": "A list of Hudi records to delete",
+ "type": {
+ "type": "array",
+ "items": {
+ "name": "HoodieDeleteRecord",
+ "type": "record",
+ "doc": "Individual Hudi record to delete",
+ "fields": [
+ {
+ "name": "recordKey",
+ "doc": "Record key in String",
+ "type": [
+ "null",
+ "string"
+ ],
+ "default": null
+ },
+ {
+ "name": "partitionPath",
+ "doc": "Partition path in String",
+ "type": [
+ "null",
+ "string"
+ ],
+ "default": null
+ },
+ {
+ "name": "orderingVal",
+ "doc": "Ordering value determining the order of merging on the
same key",
+ "type": [
+ "null",
+ "org.apache.hudi.avro.model.BooleanWrapper",
+ "org.apache.hudi.avro.model.IntWrapper",
+ "org.apache.hudi.avro.model.LongWrapper",
+ "org.apache.hudi.avro.model.FloatWrapper",
+ "org.apache.hudi.avro.model.DoubleWrapper",
+ "org.apache.hudi.avro.model.BytesWrapper",
+ "org.apache.hudi.avro.model.StringWrapper",
+ "org.apache.hudi.avro.model.DateWrapper",
+ "org.apache.hudi.avro.model.DecimalWrapper",
+ "org.apache.hudi.avro.model.TimeMicrosWrapper",
+ "org.apache.hudi.avro.model.TimestampMicrosWrapper"
+ ],
+ "default": null
+ }
+ ]
+ }
+ }
+ }
+ ]
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 52099f67306..b7651b1a18d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -18,6 +18,16 @@
package org.apache.hudi.avro;
+import org.apache.hudi.avro.model.BooleanWrapper;
+import org.apache.hudi.avro.model.BytesWrapper;
+import org.apache.hudi.avro.model.DateWrapper;
+import org.apache.hudi.avro.model.DecimalWrapper;
+import org.apache.hudi.avro.model.DoubleWrapper;
+import org.apache.hudi.avro.model.FloatWrapper;
+import org.apache.hudi.avro.model.IntWrapper;
+import org.apache.hudi.avro.model.LongWrapper;
+import org.apache.hudi.avro.model.StringWrapper;
+import org.apache.hudi.avro.model.TimestampMicrosWrapper;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -31,6 +41,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.SchemaCompatibilityException;
+import org.apache.hudi.util.Lazy;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Conversions;
@@ -55,6 +66,7 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
import org.apache.hadoop.util.VersionUtil;
import java.io.ByteArrayInputStream;
@@ -67,6 +79,7 @@ import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
+import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZonedDateTime;
@@ -91,7 +104,10 @@ import static
org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.isNullable;
import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.resolveUnionSchema;
+import static org.apache.hudi.common.util.DateTimeUtils.instantToMicros;
+import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal;
/**
* Helper class to do common stuff across Avro.
@@ -102,6 +118,33 @@ public class HoodieAvroUtils {
private static final ThreadLocal<BinaryEncoder> BINARY_ENCODER =
ThreadLocal.withInitial(() -> null);
private static final ThreadLocal<BinaryDecoder> BINARY_DECODER =
ThreadLocal.withInitial(() -> null);
+ private static final Conversions.DecimalConversion AVRO_DECIMAL_CONVERSION =
new Conversions.DecimalConversion();
+ /**
+ * NOTE: PLEASE READ CAREFULLY
+ * <p>
+ * In Avro 1.10 generated builders rely on {@code SpecificData.getForSchema}
invocation that in turn
+ * does use reflection to load the code-gen'd class corresponding to the
Avro record model. This has
+ * serious adverse effects in terms of performance when gets executed on the
hot-path (both, in terms
+ * of runtime and efficiency).
+ * <p>
+ * To work this around instead of using default code-gen'd builder invoking
{@code SpecificData.getForSchema},
+ * we instead rely on overloaded ctor accepting another instance of the
builder: {@code Builder(Builder)},
+ * which bypasses such invocation. Following corresponding builder's stubs
are statically initialized
+ * to be used exactly for that purpose.
+ * <p>
+ * You can find more details in HUDI-3834.
+ */
+ private static final Lazy<StringWrapper.Builder> STRING_WRAPPER_BUILDER_STUB
= Lazy.lazily(StringWrapper::newBuilder);
+ private static final Lazy<BytesWrapper.Builder> BYTES_WRAPPER_BUILDER_STUB =
Lazy.lazily(BytesWrapper::newBuilder);
+ private static final Lazy<DoubleWrapper.Builder> DOUBLE_WRAPPER_BUILDER_STUB
= Lazy.lazily(DoubleWrapper::newBuilder);
+ private static final Lazy<FloatWrapper.Builder> FLOAT_WRAPPER_BUILDER_STUB =
Lazy.lazily(FloatWrapper::newBuilder);
+ private static final Lazy<LongWrapper.Builder> LONG_WRAPPER_BUILDER_STUB =
Lazy.lazily(LongWrapper::newBuilder);
+ private static final Lazy<IntWrapper.Builder> INT_WRAPPER_BUILDER_STUB =
Lazy.lazily(IntWrapper::newBuilder);
+ private static final Lazy<BooleanWrapper.Builder>
BOOLEAN_WRAPPER_BUILDER_STUB = Lazy.lazily(BooleanWrapper::newBuilder);
+ private static final Lazy<TimestampMicrosWrapper.Builder>
TIMESTAMP_MICROS_WRAPPER_BUILDER_STUB =
Lazy.lazily(TimestampMicrosWrapper::newBuilder);
+ private static final Lazy<DecimalWrapper.Builder>
DECIMAL_WRAPPER_BUILDER_STUB = Lazy.lazily(DecimalWrapper::newBuilder);
+ private static final Lazy<DateWrapper.Builder> DATE_WRAPPER_BUILDER_STUB =
Lazy.lazily(DateWrapper::newBuilder);
+
private static final long MILLIS_PER_DAY = 86400000L;
//Export for test
@@ -1149,4 +1192,98 @@ public class HoodieAvroUtils {
public static boolean gteqAvro1_10() {
return VersionUtil.compareVersions(AVRO_VERSION, "1.10") >= 0;
}
+
+ /**
+ * Wraps a value into Avro type wrapper.
+ *
+ * @param value Java value.
+ * @return A wrapped value with Avro type wrapper.
+ */
+ public static Object wrapValueIntoAvro(Comparable<?> value) {
+ if (value == null) {
+ return null;
+ } else if (value instanceof Date || value instanceof LocalDate) {
+ // NOTE: Due to breaking changes in code-gen b/w Avro 1.8.2 and 1.10, we
can't
+ // rely on logical types to do proper encoding of the native Java
types,
+ // and hereby have to encode value manually
+ LocalDate localDate = value instanceof LocalDate
+ ? (LocalDate) value
+ : ((Date) value).toLocalDate();
+ return DateWrapper.newBuilder(DATE_WRAPPER_BUILDER_STUB.get())
+ .setValue((int) localDate.toEpochDay())
+ .build();
+ } else if (value instanceof BigDecimal) {
+ Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema();
+ BigDecimal upcastDecimal = tryUpcastDecimal((BigDecimal) value,
(LogicalTypes.Decimal) valueSchema.getLogicalType());
+ return DecimalWrapper.newBuilder(DECIMAL_WRAPPER_BUILDER_STUB.get())
+ .setValue(AVRO_DECIMAL_CONVERSION.toBytes(upcastDecimal,
valueSchema, valueSchema.getLogicalType()))
+ .build();
+ } else if (value instanceof Timestamp) {
+ // NOTE: Due to breaking changes in code-gen b/w Avro 1.8.2 and 1.10, we
can't
+ // rely on logical types to do proper encoding of the native Java
types,
+ // and hereby have to encode value manually
+ Instant instant = ((Timestamp) value).toInstant();
+ return
TimestampMicrosWrapper.newBuilder(TIMESTAMP_MICROS_WRAPPER_BUILDER_STUB.get())
+ .setValue(instantToMicros(instant))
+ .build();
+ } else if (value instanceof Boolean) {
+ return
BooleanWrapper.newBuilder(BOOLEAN_WRAPPER_BUILDER_STUB.get()).setValue((Boolean)
value).build();
+ } else if (value instanceof Integer) {
+ return
IntWrapper.newBuilder(INT_WRAPPER_BUILDER_STUB.get()).setValue((Integer)
value).build();
+ } else if (value instanceof Long) {
+ return
LongWrapper.newBuilder(LONG_WRAPPER_BUILDER_STUB.get()).setValue((Long)
value).build();
+ } else if (value instanceof Float) {
+ return
FloatWrapper.newBuilder(FLOAT_WRAPPER_BUILDER_STUB.get()).setValue((Float)
value).build();
+ } else if (value instanceof Double) {
+ return
DoubleWrapper.newBuilder(DOUBLE_WRAPPER_BUILDER_STUB.get()).setValue((Double)
value).build();
+ } else if (value instanceof ByteBuffer) {
+ return
BytesWrapper.newBuilder(BYTES_WRAPPER_BUILDER_STUB.get()).setValue((ByteBuffer)
value).build();
+ } else if (value instanceof String || value instanceof Utf8) {
+ return
StringWrapper.newBuilder(STRING_WRAPPER_BUILDER_STUB.get()).setValue(value.toString()).build();
+ } else {
+ throw new UnsupportedOperationException(String.format("Unsupported type
of the value (%s)", value.getClass()));
+ }
+ }
+
+ /**
+ * Unwraps Avro value wrapper into Java value.
+ *
+ * @param avroValueWrapper A wrapped value with Avro type wrapper.
+ * @return Java value.
+ */
+ public static Comparable<?> unwrapAvroValueWrapper(Object avroValueWrapper) {
+ if (avroValueWrapper == null) {
+ return null;
+ } else if (avroValueWrapper instanceof DateWrapper) {
+ return LocalDate.ofEpochDay(((DateWrapper) avroValueWrapper).getValue());
+ } else if (avroValueWrapper instanceof DecimalWrapper) {
+ Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema();
+ return AVRO_DECIMAL_CONVERSION.fromBytes(((DecimalWrapper)
avroValueWrapper).getValue(), valueSchema, valueSchema.getLogicalType());
+ } else if (avroValueWrapper instanceof TimestampMicrosWrapper) {
+ return microsToInstant(((TimestampMicrosWrapper)
avroValueWrapper).getValue());
+ } else if (avroValueWrapper instanceof BooleanWrapper) {
+ return ((BooleanWrapper) avroValueWrapper).getValue();
+ } else if (avroValueWrapper instanceof IntWrapper) {
+ return ((IntWrapper) avroValueWrapper).getValue();
+ } else if (avroValueWrapper instanceof LongWrapper) {
+ return ((LongWrapper) avroValueWrapper).getValue();
+ } else if (avroValueWrapper instanceof FloatWrapper) {
+ return ((FloatWrapper) avroValueWrapper).getValue();
+ } else if (avroValueWrapper instanceof DoubleWrapper) {
+ return ((DoubleWrapper) avroValueWrapper).getValue();
+ } else if (avroValueWrapper instanceof BytesWrapper) {
+ return ((BytesWrapper) avroValueWrapper).getValue();
+ } else if (avroValueWrapper instanceof StringWrapper) {
+ return ((StringWrapper) avroValueWrapper).getValue();
+ } else if (avroValueWrapper instanceof GenericRecord) {
+ // NOTE: This branch could be hit b/c Avro records could be reconstructed
+ // as {@code GenericRecord)
+ // TODO add logical type decoding
+ GenericRecord record = (GenericRecord) avroValueWrapper;
+ return (Comparable<?>) record.get("value");
+ } else {
+ throw new UnsupportedOperationException(String.format("Unsupported type
of the value (%s)", avroValueWrapper.getClass()));
+ }
+ }
+
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
index d920495f971..23ce76c5ef4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
@@ -18,13 +18,24 @@
package org.apache.hudi.common.table.log.block;
+import org.apache.hudi.avro.model.HoodieDeleteRecord;
+import org.apache.hudi.avro.model.HoodieDeleteRecordList;
import org.apache.hudi.common.fs.SizeAwareDataInputStream;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SerializationUtils;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.Lazy;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.hadoop.fs.FSDataInputStream;
import java.io.ByteArrayInputStream;
@@ -34,12 +45,25 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
+import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
/**
* Delete block contains a list of keys to be deleted from scanning the blocks
so far.
*/
public class HoodieDeleteBlock extends HoodieLogBlock {
+ /**
+ * These static builders are added to avoid performance issue in Avro 1.10.
+ * You can find more details in HoodieAvroUtils, HUDI-3834, and AVRO-3048.
+ */
+ private static final Lazy<HoodieDeleteRecordList.Builder>
HOODIE_DELETE_RECORD_LIST_BUILDER_STUB =
+ Lazy.lazily(HoodieDeleteRecordList::newBuilder);
+ private static final Lazy<HoodieDeleteRecord.Builder>
HOODIE_DELETE_RECORD_BUILDER_STUB =
+ Lazy.lazily(HoodieDeleteRecord::newBuilder);
private DeleteRecord[] recordsToDelete;
@@ -49,8 +73,8 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
}
public HoodieDeleteBlock(Option<byte[]> content, FSDataInputStream
inputStream, boolean readBlockLazily,
- Option<HoodieLogBlockContentLocation> blockContentLocation,
Map<HeaderMetadataType, String> header,
- Map<HeaderMetadataType, String> footer) {
+ Option<HoodieLogBlockContentLocation>
blockContentLocation, Map<HeaderMetadataType, String> header,
+ Map<HeaderMetadataType, String> footer) {
super(header, footer, blockContentLocation, content, inputStream,
readBlockLazily);
}
@@ -68,9 +92,8 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos);
- // TODO(HUDI-5760) avoid using Kryo for serialization here
- byte[] bytesToWrite = SerializationUtils.serialize(getRecordsToDelete());
output.writeInt(version);
+ byte[] bytesToWrite = (version <= 2) ? serializeV2() : serializeV3();
output.writeInt(bytesToWrite.length);
output.write(bytesToWrite);
return baos.toByteArray();
@@ -98,14 +121,50 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
}
}
- // TODO(HUDI-5760) avoid using Kryo for serialization here
- private static DeleteRecord[] deserialize(int version, byte[] data) {
+ private byte[] serializeV2() throws IOException {
+ // Serialization for log block version 2
+ return SerializationUtils.serialize(getRecordsToDelete());
+ }
+
+ private byte[] serializeV3() throws IOException {
+ DatumWriter<HoodieDeleteRecordList> writer = new
SpecificDatumWriter<>(HoodieDeleteRecordList.class);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
+ // Serialization for log block version 3 and above
+ HoodieDeleteRecordList.Builder recordListBuilder =
HOODIE_DELETE_RECORD_LIST_BUILDER_STUB.get();
+ HoodieDeleteRecord.Builder recordBuilder =
HOODIE_DELETE_RECORD_BUILDER_STUB.get();
+ List<HoodieDeleteRecord> deleteRecordList =
Arrays.stream(getRecordsToDelete())
+ .map(record -> HoodieDeleteRecord.newBuilder(recordBuilder)
+ .setRecordKey(record.getRecordKey())
+ .setPartitionPath(record.getPartitionPath())
+ .setOrderingVal(wrapValueIntoAvro(record.getOrderingValue()))
+ .build())
+ .collect(Collectors.toList());
+ writer.write(HoodieDeleteRecordList.newBuilder(recordListBuilder)
+ .setDeleteRecordList(deleteRecordList)
+ .build(), encoder);
+ encoder.flush();
+ return baos.toByteArray();
+ }
+
+ private static DeleteRecord[] deserialize(int version, byte[] data) throws
IOException {
if (version == 1) {
// legacy version
HoodieKey[] keys = SerializationUtils.<HoodieKey[]>deserialize(data);
return
Arrays.stream(keys).map(DeleteRecord::create).toArray(DeleteRecord[]::new);
- } else {
+ } else if (version == 2) {
return SerializationUtils.<DeleteRecord[]>deserialize(data);
+ } else {
+ DatumReader<HoodieDeleteRecordList> reader = new
SpecificDatumReader<>(HoodieDeleteRecordList.class);
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, 0,
data.length, null);
+ List<HoodieDeleteRecord> deleteRecordList = reader.read(null, decoder)
+ .getDeleteRecordList();
+ return deleteRecordList.stream()
+ .map(record -> DeleteRecord.create(
+ record.getRecordKey(),
+ record.getPartitionPath(),
+ unwrapAvroValueWrapper(record.getOrderingVal())))
+ .toArray(DeleteRecord[]::new);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index b85febb9a0f..84142d76800 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -50,8 +50,9 @@ public abstract class HoodieLogBlock {
* The current version of the log block. Anytime the logBlock format changes
this version needs to be bumped and
* corresponding changes need to be made to {@link HoodieLogBlockVersion}
TODO : Change this to a class, something
* like HoodieLogBlockVersionV1/V2 and implement/override operations there
+ * Current log block version is V3.
*/
- public static int version = 2;
+ public static int version = 3;
// Header for each log block
private final Map<HeaderMetadataType, String> logBlockHeader;
// Footer for each log block
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 9b4698c0450..bffe8e32141 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -18,21 +18,11 @@
package org.apache.hudi.metadata;
-import org.apache.hudi.avro.model.BooleanWrapper;
-import org.apache.hudi.avro.model.BytesWrapper;
-import org.apache.hudi.avro.model.DateWrapper;
-import org.apache.hudi.avro.model.DecimalWrapper;
-import org.apache.hudi.avro.model.DoubleWrapper;
-import org.apache.hudi.avro.model.FloatWrapper;
import org.apache.hudi.avro.model.HoodieMetadataBloomFilter;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieMetadataFileInfo;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRecordIndexInfo;
-import org.apache.hudi.avro.model.IntWrapper;
-import org.apache.hudi.avro.model.LongWrapper;
-import org.apache.hudi.avro.model.StringWrapper;
-import org.apache.hudi.avro.model.TimestampMicrosWrapper;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
@@ -51,25 +41,18 @@ import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.io.storage.HoodieAvroHFileReader;
import org.apache.hudi.util.Lazy;
-import org.apache.avro.Conversions;
-import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import javax.annotation.Nullable;
+
import java.io.IOException;
-import java.math.BigDecimal;
import java.nio.ByteBuffer;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.time.Instant;
-import java.time.LocalDate;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
@@ -83,15 +66,14 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.hudi.common.util.DateTimeUtils.instantToMicros;
-import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
+import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
+import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.hadoop.CachingPath.createRelativePathUnsafe;
import static
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionIdentifier;
-import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal;
/**
* MetadataTable records are persisted with the schema defined in
HoodieMetadata.avsc.
@@ -161,8 +143,6 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
public static final String COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE =
"totalUncompressedSize";
public static final String COLUMN_STATS_FIELD_IS_DELETED = FIELD_IS_DELETED;
- private static final Conversions.DecimalConversion AVRO_DECIMAL_CONVERSION =
new Conversions.DecimalConversion();
-
/**
* HoodieMetadata record index payload field ids
*/
@@ -193,17 +173,6 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
* You can find more details in HUDI-3834.
*/
private static final Lazy<HoodieMetadataColumnStats.Builder>
METADATA_COLUMN_STATS_BUILDER_STUB =
Lazy.lazily(HoodieMetadataColumnStats::newBuilder);
- private static final Lazy<StringWrapper.Builder> STRING_WRAPPER_BUILDER_STUB
= Lazy.lazily(StringWrapper::newBuilder);
- private static final Lazy<BytesWrapper.Builder> BYTES_WRAPPER_BUILDER_STUB =
Lazy.lazily(BytesWrapper::newBuilder);
- private static final Lazy<DoubleWrapper.Builder> DOUBLE_WRAPPER_BUILDER_STUB
= Lazy.lazily(DoubleWrapper::newBuilder);
- private static final Lazy<FloatWrapper.Builder> FLOAT_WRAPPER_BUILDER_STUB =
Lazy.lazily(FloatWrapper::newBuilder);
- private static final Lazy<LongWrapper.Builder> LONG_WRAPPER_BUILDER_STUB =
Lazy.lazily(LongWrapper::newBuilder);
- private static final Lazy<IntWrapper.Builder> INT_WRAPPER_BUILDER_STUB =
Lazy.lazily(IntWrapper::newBuilder);
- private static final Lazy<BooleanWrapper.Builder>
BOOLEAN_WRAPPER_BUILDER_STUB = Lazy.lazily(BooleanWrapper::newBuilder);
- private static final Lazy<TimestampMicrosWrapper.Builder>
TIMESTAMP_MICROS_WRAPPER_BUILDER_STUB =
Lazy.lazily(TimestampMicrosWrapper::newBuilder);
- private static final Lazy<DecimalWrapper.Builder>
DECIMAL_WRAPPER_BUILDER_STUB = Lazy.lazily(DecimalWrapper::newBuilder);
- private static final Lazy<DateWrapper.Builder> DATE_WRAPPER_BUILDER_STUB =
Lazy.lazily(DateWrapper::newBuilder);
-
private String key = null;
private int type = 0;
private Map<String, HoodieMetadataFileInfo> filesystemMetadata = null;
@@ -268,8 +237,8 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
// AVRO-2377 1.9.2 Modified the type of
org.apache.avro.Schema#FIELD_RESERVED to Collections.unmodifiableSet.
// This causes Kryo to fail when deserializing a GenericRecord,
See HUDI-5484.
// We should avoid using GenericRecord and convert GenericRecord
into a serializable type.
-
.setMinValue(wrapStatisticValue(unwrapStatisticValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))))
-
.setMaxValue(wrapStatisticValue(unwrapStatisticValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))))
+
.setMinValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MIN_VALUE))))
+
.setMaxValue(wrapValueIntoAvro(unwrapAvroValueWrapper(columnStatsRecord.get(COLUMN_STATS_FIELD_MAX_VALUE))))
.setValueCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_VALUE_COUNT))
.setNullCount((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_NULL_COUNT))
.setTotalSize((Long)
columnStatsRecord.get(COLUMN_STATS_FIELD_TOTAL_SIZE))
@@ -660,8 +629,8 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
HoodieMetadataColumnStats.newBuilder()
.setFileName(new
Path(columnRangeMetadata.getFilePath()).getName())
.setColumnName(columnRangeMetadata.getColumnName())
-
.setMinValue(wrapStatisticValue(columnRangeMetadata.getMinValue()))
-
.setMaxValue(wrapStatisticValue(columnRangeMetadata.getMaxValue()))
+
.setMinValue(wrapValueIntoAvro(columnRangeMetadata.getMinValue()))
+
.setMaxValue(wrapValueIntoAvro(columnRangeMetadata.getMaxValue()))
.setNullCount(columnRangeMetadata.getNullCount())
.setValueCount(columnRangeMetadata.getValueCount())
.setTotalSize(columnRangeMetadata.getTotalSize())
@@ -689,16 +658,16 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
Comparable minValue =
(Comparable) Stream.of(
- (Comparable)
unwrapStatisticValueWrapper(prevColumnStats.getMinValue()),
- (Comparable)
unwrapStatisticValueWrapper(newColumnStats.getMinValue()))
+ (Comparable)
unwrapAvroValueWrapper(prevColumnStats.getMinValue()),
+ (Comparable)
unwrapAvroValueWrapper(newColumnStats.getMinValue()))
.filter(Objects::nonNull)
.min(Comparator.naturalOrder())
.orElse(null);
Comparable maxValue =
(Comparable) Stream.of(
- (Comparable)
unwrapStatisticValueWrapper(prevColumnStats.getMaxValue()),
- (Comparable)
unwrapStatisticValueWrapper(newColumnStats.getMaxValue()))
+ (Comparable)
unwrapAvroValueWrapper(prevColumnStats.getMaxValue()),
+ (Comparable)
unwrapAvroValueWrapper(newColumnStats.getMaxValue()))
.filter(Objects::nonNull)
.max(Comparator.naturalOrder())
.orElse(null);
@@ -706,8 +675,8 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
return
HoodieMetadataColumnStats.newBuilder(METADATA_COLUMN_STATS_BUILDER_STUB.get())
.setFileName(newColumnStats.getFileName())
.setColumnName(newColumnStats.getColumnName())
- .setMinValue(wrapStatisticValue(minValue))
- .setMaxValue(wrapStatisticValue(maxValue))
+ .setMinValue(wrapValueIntoAvro(minValue))
+ .setMaxValue(wrapValueIntoAvro(maxValue))
.setValueCount(prevColumnStats.getValueCount() +
newColumnStats.getValueCount())
.setNullCount(prevColumnStats.getNullCount() +
newColumnStats.getNullCount())
.setTotalSize(prevColumnStats.getTotalSize() +
newColumnStats.getTotalSize())
@@ -853,87 +822,6 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
return sb.toString();
}
- private static Object wrapStatisticValue(Comparable<?> statValue) {
- if (statValue == null) {
- return null;
- } else if (statValue instanceof Date || statValue instanceof LocalDate) {
- // NOTE: Due to breaking changes in code-gen b/w Avro 1.8.2 and 1.10, we
can't
- // rely on logical types to do proper encoding of the native Java
types,
- // and hereby have to encode statistic manually
- LocalDate localDate = statValue instanceof LocalDate
- ? (LocalDate) statValue
- : ((Date) statValue).toLocalDate();
- return DateWrapper.newBuilder(DATE_WRAPPER_BUILDER_STUB.get())
- .setValue((int) localDate.toEpochDay())
- .build();
- } else if (statValue instanceof BigDecimal) {
- Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema();
- BigDecimal upcastDecimal = tryUpcastDecimal((BigDecimal) statValue,
(LogicalTypes.Decimal) valueSchema.getLogicalType());
- return DecimalWrapper.newBuilder(DECIMAL_WRAPPER_BUILDER_STUB.get())
- .setValue(AVRO_DECIMAL_CONVERSION.toBytes(upcastDecimal,
valueSchema, valueSchema.getLogicalType()))
- .build();
- } else if (statValue instanceof Timestamp) {
- // NOTE: Due to breaking changes in code-gen b/w Avro 1.8.2 and 1.10, we
can't
- // rely on logical types to do proper encoding of the native Java
types,
- // and hereby have to encode statistic manually
- Instant instant = ((Timestamp) statValue).toInstant();
- return
TimestampMicrosWrapper.newBuilder(TIMESTAMP_MICROS_WRAPPER_BUILDER_STUB.get())
- .setValue(instantToMicros(instant))
- .build();
- } else if (statValue instanceof Boolean) {
- return
BooleanWrapper.newBuilder(BOOLEAN_WRAPPER_BUILDER_STUB.get()).setValue((Boolean)
statValue).build();
- } else if (statValue instanceof Integer) {
- return
IntWrapper.newBuilder(INT_WRAPPER_BUILDER_STUB.get()).setValue((Integer)
statValue).build();
- } else if (statValue instanceof Long) {
- return
LongWrapper.newBuilder(LONG_WRAPPER_BUILDER_STUB.get()).setValue((Long)
statValue).build();
- } else if (statValue instanceof Float) {
- return
FloatWrapper.newBuilder(FLOAT_WRAPPER_BUILDER_STUB.get()).setValue((Float)
statValue).build();
- } else if (statValue instanceof Double) {
- return
DoubleWrapper.newBuilder(DOUBLE_WRAPPER_BUILDER_STUB.get()).setValue((Double)
statValue).build();
- } else if (statValue instanceof ByteBuffer) {
- return
BytesWrapper.newBuilder(BYTES_WRAPPER_BUILDER_STUB.get()).setValue((ByteBuffer)
statValue).build();
- } else if (statValue instanceof String || statValue instanceof Utf8) {
- return
StringWrapper.newBuilder(STRING_WRAPPER_BUILDER_STUB.get()).setValue(statValue.toString()).build();
- } else {
- throw new UnsupportedOperationException(String.format("Unsupported type
of the statistic (%s)", statValue.getClass()));
- }
- }
-
- public static Comparable<?> unwrapStatisticValueWrapper(Object
statValueWrapper) {
- if (statValueWrapper == null) {
- return null;
- } else if (statValueWrapper instanceof DateWrapper) {
- return LocalDate.ofEpochDay(((DateWrapper) statValueWrapper).getValue());
- } else if (statValueWrapper instanceof DecimalWrapper) {
- Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema();
- return AVRO_DECIMAL_CONVERSION.fromBytes(((DecimalWrapper)
statValueWrapper).getValue(), valueSchema, valueSchema.getLogicalType());
- } else if (statValueWrapper instanceof TimestampMicrosWrapper) {
- return microsToInstant(((TimestampMicrosWrapper)
statValueWrapper).getValue());
- } else if (statValueWrapper instanceof BooleanWrapper) {
- return ((BooleanWrapper) statValueWrapper).getValue();
- } else if (statValueWrapper instanceof IntWrapper) {
- return ((IntWrapper) statValueWrapper).getValue();
- } else if (statValueWrapper instanceof LongWrapper) {
- return ((LongWrapper) statValueWrapper).getValue();
- } else if (statValueWrapper instanceof FloatWrapper) {
- return ((FloatWrapper) statValueWrapper).getValue();
- } else if (statValueWrapper instanceof DoubleWrapper) {
- return ((DoubleWrapper) statValueWrapper).getValue();
- } else if (statValueWrapper instanceof BytesWrapper) {
- return ((BytesWrapper) statValueWrapper).getValue();
- } else if (statValueWrapper instanceof StringWrapper) {
- return ((StringWrapper) statValueWrapper).getValue();
- } else if (statValueWrapper instanceof GenericRecord) {
- // NOTE: This branch could be hit b/c Avro records could be reconstructed
- // as {@code GenericRecord)
- // TODO add logical type decoding
- GenericRecord record = (GenericRecord) statValueWrapper;
- return (Comparable<?>) record.get("value");
- } else {
- throw new UnsupportedOperationException(String.format("Unsupported type
of the statistic (%s)", statValueWrapper.getClass()));
- }
- }
-
private static void validatePayload(int type, Map<String,
HoodieMetadataFileInfo> filesystemMetadata) {
if (type == METADATA_TYPE_FILE_LIST) {
filesystemMetadata.forEach((fileName, fileInfo) -> {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index e37d7c1daff..201e12d312f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -99,10 +99,10 @@ import static
org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
import static
org.apache.hudi.avro.HoodieAvroUtils.convertValueForSpecificDataTypes;
import static
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
+import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
import static
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH;
import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
-import static
org.apache.hudi.metadata.HoodieMetadataPayload.unwrapStatisticValueWrapper;
import static
org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME;
import static
org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
import static
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
@@ -247,8 +247,8 @@ public class HoodieTableMetadataUtil {
return HoodieColumnRangeMetadata.<Comparable>create(
columnStats.getFileName(),
columnStats.getColumnName(),
- unwrapStatisticValueWrapper(columnStats.getMinValue()),
- unwrapStatisticValueWrapper(columnStats.getMaxValue()),
+ unwrapAvroValueWrapper(columnStats.getMinValue()),
+ unwrapAvroValueWrapper(columnStats.getMaxValue()),
columnStats.getNullCount(),
columnStats.getValueCount(),
columnStats.getTotalSize(),
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 4f92660e377..1db3c7c289c 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -18,6 +18,16 @@
package org.apache.hudi.avro;
+import org.apache.hudi.avro.model.BooleanWrapper;
+import org.apache.hudi.avro.model.BytesWrapper;
+import org.apache.hudi.avro.model.DateWrapper;
+import org.apache.hudi.avro.model.DecimalWrapper;
+import org.apache.hudi.avro.model.DoubleWrapper;
+import org.apache.hudi.avro.model.FloatWrapper;
+import org.apache.hudi.avro.model.IntWrapper;
+import org.apache.hudi.avro.model.LongWrapper;
+import org.apache.hudi.avro.model.StringWrapper;
+import org.apache.hudi.avro.model.TimestampMicrosWrapper;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.exception.HoodieException;
@@ -27,14 +37,29 @@ import org.apache.avro.AvroRuntimeException;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.util.Utf8;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -42,9 +67,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
import static org.apache.hudi.avro.HoodieAvroUtils.sanitizeName;
+import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
+import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -112,6 +140,25 @@ public class TestHoodieAvroUtils {
+
"{\"name\":\"ss\",\"type\":{\"name\":\"ss\",\"type\":\"record\",\"fields\":["
+ "{\"name\":\"fn\",\"type\":[\"null\" ,\"string\"],\"default\":
null},{\"name\":\"ln\",\"type\":[\"null\" ,\"string\"],\"default\": null}]}}]}";
+ private static String SCHEMA_WITH_AVRO_TYPES =
"{\"name\":\"TestRecordAvroTypes\",\"type\":\"record\",\"fields\":["
+ // Primitive types
+ + "{\"name\":\"booleanField\",\"type\":\"boolean\"},"
+ + "{\"name\":\"intField\",\"type\":\"int\"},"
+ + "{\"name\":\"longField\",\"type\":\"long\"},"
+ + "{\"name\":\"floatField\",\"type\":\"float\"},"
+ + "{\"name\":\"doubleField\",\"type\":\"double\"},"
+ + "{\"name\":\"bytesField\",\"type\":\"bytes\"},"
+ + "{\"name\":\"stringField\",\"type\":\"string\"},"
+ // Logical types
+ +
"{\"name\":\"decimalField\",\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":20,\"scale\":5},"
+ +
"{\"name\":\"timeMillisField\",\"type\":\"int\",\"logicalType\":\"time-millis\"},"
+ +
"{\"name\":\"timeMicrosField\",\"type\":\"long\",\"logicalType\":\"time-micros\"},"
+ +
"{\"name\":\"timestampMillisField\",\"type\":\"long\",\"logicalType\":\"timestamp-millis\"},"
+ +
"{\"name\":\"timestampMicrosField\",\"type\":\"long\",\"logicalType\":\"timestamp-micros\"},"
+ +
"{\"name\":\"localTimestampMillisField\",\"type\":\"long\",\"logicalType\":\"local-timestamp-millis\"},"
+ +
"{\"name\":\"localTimestampMicrosField\",\"type\":\"long\",\"logicalType\":\"local-timestamp-micros\"}"
+ + "]}";
+
@Test
public void testPropsPresent() {
Schema schema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(EXAMPLE_SCHEMA));
@@ -451,7 +498,109 @@ public class TestHoodieAvroUtils {
assertTrue(fieldNames1.contains("timestamp"));
assertTrue(assertThrows(HoodieException.class, () ->
- HoodieAvroUtils.generateProjectionSchema(originalSchema,
Arrays.asList("_row_key", "timestamp", "fake_field")))
+ HoodieAvroUtils.generateProjectionSchema(originalSchema,
Arrays.asList("_row_key", "timestamp", "fake_field")))
.getMessage().contains("Field fake_field not found in log schema.
Query cannot proceed!"));
}
+
+ @Test
+ public void testWrapAndUnwrapAvroValues() throws IOException {
+ Schema schema = new Schema.Parser().parse(SCHEMA_WITH_AVRO_TYPES);
+ GenericRecord record = new GenericData.Record(schema);
+ Map<String, Class> expectedWrapperClass = new HashMap<>();
+
+ record.put("booleanField", true);
+ expectedWrapperClass.put("booleanField", BooleanWrapper.class);
+ record.put("intField", 698);
+ expectedWrapperClass.put("intField", IntWrapper.class);
+ record.put("longField", 192485030493L);
+ expectedWrapperClass.put("longField", LongWrapper.class);
+ record.put("floatField", 18.125f);
+ expectedWrapperClass.put("floatField", FloatWrapper.class);
+ record.put("doubleField", 94385932.342104);
+ expectedWrapperClass.put("doubleField", DoubleWrapper.class);
+ record.put("bytesField", ByteBuffer.wrap(new byte[] {1, 20, 0, 60, 2,
108}));
+ expectedWrapperClass.put("bytesField", BytesWrapper.class);
+ record.put("stringField", "abcdefghijk");
+ expectedWrapperClass.put("stringField", StringWrapper.class);
+ record.put("decimalField", ByteBuffer.wrap("9213032.4966".getBytes()));
+ expectedWrapperClass.put("decimalField", BytesWrapper.class);
+ record.put("timeMillisField", 57996136);
+ expectedWrapperClass.put("timeMillisField", IntWrapper.class);
+ record.put("timeMicrosField", 57996136930L);
+ expectedWrapperClass.put("timeMicrosField", LongWrapper.class);
+ record.put("timestampMillisField", 1690828731156L);
+ expectedWrapperClass.put("timestampMillisField", LongWrapper.class);
+ record.put("timestampMicrosField", 1690828731156982L);
+ expectedWrapperClass.put("timestampMicrosField", LongWrapper.class);
+ record.put("localTimestampMillisField", 1690828731156L);
+ expectedWrapperClass.put("localTimestampMillisField", LongWrapper.class);
+ record.put("localTimestampMicrosField", 1690828731156982L);
+ expectedWrapperClass.put("localTimestampMicrosField", LongWrapper.class);
+
+ GenericDatumWriter<GenericRecord> writer = new
GenericDatumWriter<>(schema);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
+ writer.write(record, encoder);
+ encoder.flush();
+ byte[] data = baos.toByteArray();
+
+ GenericDatumReader<GenericRecord> reader = new
GenericDatumReader<>(schema);
+ BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, 0,
data.length, null);
+ GenericRecord deserializedRecord = reader.read(null, decoder);
+ Map<String, Object> fieldValueMapping =
deserializedRecord.getSchema().getFields().stream()
+ .collect(Collectors.toMap(
+ Schema.Field::name,
+ field -> deserializedRecord.get(field.name())
+ ));
+
+ for (String fieldName : fieldValueMapping.keySet()) {
+ Object value = fieldValueMapping.get(fieldName);
+ Object wrapperValue = wrapValueIntoAvro((Comparable) value);
+ assertTrue(expectedWrapperClass.get(fieldName).isInstance(wrapperValue));
+ if (value instanceof Utf8) {
+ assertEquals(value.toString(), ((GenericRecord) wrapperValue).get(0));
+ assertEquals(value.toString(), unwrapAvroValueWrapper(wrapperValue));
+ } else {
+ assertEquals(value, ((GenericRecord) wrapperValue).get(0));
+ assertEquals(value, unwrapAvroValueWrapper(wrapperValue));
+ }
+ }
+ }
+
+ public static Stream<Arguments> javaValueParams() {
+ Object[][] data =
+ new Object[][] {
+ {new Timestamp(1690766971000L), TimestampMicrosWrapper.class},
+ {new Date(1672560000000L), DateWrapper.class},
+ {LocalDate.of(2023, 1, 1), DateWrapper.class},
+ {new BigDecimal("12345678901234.2948"), DecimalWrapper.class}
+ };
+ return Stream.of(data).map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("javaValueParams")
+ public void testWrapAndUnwrapJavaValues(Comparable value, Class
expectedWrapper) {
+ Object wrapperValue = wrapValueIntoAvro(value);
+ assertTrue(expectedWrapper.isInstance(wrapperValue));
+ if (value instanceof Timestamp) {
+ assertEquals(((Timestamp) value).getTime() * 1000L,
+ ((GenericRecord) wrapperValue).get(0));
+ assertEquals(((Timestamp) value).getTime(),
+ ((Instant) unwrapAvroValueWrapper(wrapperValue)).toEpochMilli());
+ } else if (value instanceof Date) {
+ assertEquals((int) ChronoUnit.DAYS.between(
+ LocalDate.ofEpochDay(0), ((Date) value).toLocalDate()),
+ ((GenericRecord) wrapperValue).get(0));
+ assertEquals(((Date) value).toLocalDate(),
unwrapAvroValueWrapper(wrapperValue));
+ } else if (value instanceof LocalDate) {
+ assertEquals((int) ChronoUnit.DAYS.between(LocalDate.ofEpochDay(0),
(LocalDate) value),
+ ((GenericRecord) wrapperValue).get(0));
+ assertEquals(value, unwrapAvroValueWrapper(wrapperValue));
+ } else {
+ assertEquals("0.000000000000000",
+ ((BigDecimal) value)
+ .subtract((BigDecimal)
unwrapAvroValueWrapper(wrapperValue)).toPlainString());
+ }
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index c613b52dd63..2931fc0eea6 100755
---
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -73,7 +73,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
-
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@@ -89,6 +88,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import java.io.IOException;
+import java.io.InputStream;
import java.io.UncheckedIOException;
import java.lang.reflect.Field;
import java.net.URISyntaxException;
@@ -101,6 +101,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -1809,6 +1810,130 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
0, 0, Option.empty());
}
+ @ParameterizedTest
+ @MethodSource("testArguments")
+ public void
testLogReaderWithDifferentVersionsOfDeleteBlocks(ExternalSpillableMap.DiskMapType
diskMapType,
+ boolean
isCompressionEnabled,
+ boolean
readBlocksLazily,
+ boolean
enableOptimizedLogBlocksScan)
+ throws IOException, URISyntaxException, InterruptedException {
+ Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+ // Set a small threshold so that every block is a new version
+ Writer writer =
+
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+
.withFileId("test-fileid1").overBaseCommit("100").withFs(fs).build();
+ List<String> deleteKeyListInV2Block = Arrays.asList(
+ "d448e1b8-a0d4-45c0-bf2d-a9e16ff3c8ce",
+ "df3f71cd-5b68-406c-bb70-861179444adb",
+ "cf64885c-af32-463b-8f1b-2f31a39b1afa",
+ "9884e134-0d60-46e8-8a1e-36db0e455c4a",
+ "698544b8-defa-4fa7-ac15-8963f7d0784d",
+ "081c279e-fc6a-4e05-89b7-3136e4cad488",
+ "1041fac7-8a54-47e6-8a2d-d1a650301699",
+ "69c003f8-386d-40a0-9c61-5a903d1d6ac2",
+ "e574d164-f8c4-47cf-b150-264c2364f10e",
+ "d76007d2-9dc8-46ff-bf6f-0789c6ffffc0");
+
+ // Write 1: add 100 records
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ // Generate 100 records with 10 records to be deleted in V2 delete block
in commit 102
+ List<String> recordKeyList = testUtil.genRandomUUID(100,
deleteKeyListInV2Block);
+ List<IndexedRecord> records1 =
+ testUtil.generateHoodieTestRecords(0, recordKeyList, "0000/00/00",
"100");
+ List<IndexedRecord> copyOfRecords1 = records1.stream()
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+ header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+ HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE,
records1, header);
+ writer.appendBlock(dataBlock);
+
+ // Write 2: add another 100 records
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
+ List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
+ List<IndexedRecord> copyOfRecords2 = records2.stream()
+ .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record,
schema)).collect(Collectors.toList());
+ dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
+ writer.appendBlock(dataBlock);
+
+ // Delete 10 keys in V2 delete block
+ byte[] contentBytes = new byte[605];
+ InputStream inputStream = TestHoodieLogFormat.class
+
.getResourceAsStream("/format/delete-block-v2-content-10-records.data");
+ inputStream.read(contentBytes);
+
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
+ writer.appendBlock(new HoodieDeleteBlock(
+ Option.of(contentBytes), null, true, Option.empty(), header,
Collections.EMPTY_MAP));
+
+ // Delete 60 keys in V3 delete block
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
+ List<DeleteRecord> deletedRecords = copyOfRecords2.stream()
+ .map(s -> (DeleteRecord.create(((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+ ((GenericRecord)
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
+ .collect(Collectors.toList()).subList(0, 60);
+ writer.appendBlock(new HoodieDeleteBlock(deletedRecords.toArray(new
DeleteRecord[0]), header));
+
+ copyOfRecords2.addAll(copyOfRecords1);
+ List<String> originalKeys =
+ copyOfRecords2.stream().map(s -> ((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString())
+ .collect(Collectors.toList());
+
+ List<String> allLogFiles =
+ FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1",
HoodieLogFile.DELTA_EXTENSION, "100")
+ .map(s -> s.getPath().toString()).collect(Collectors.toList());
+
+ FileCreateUtils.createDeltaCommit(basePath, "100", fs);
+ FileCreateUtils.createDeltaCommit(basePath, "101", fs);
+ FileCreateUtils.createDeltaCommit(basePath, "102", fs);
+ FileCreateUtils.createDeltaCommit(basePath, "103", fs);
+
+ try (HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(basePath)
+ .withLogFilePaths(allLogFiles)
+ .withReaderSchema(schema)
+ .withLatestInstantTime("103")
+ .withMaxMemorySizeInBytes(10240L)
+ .withReadBlocksLazily(readBlocksLazily)
+ .withReverseReader(false)
+ .withBufferSize(BUFFER_SIZE)
+ .withSpillableMapBasePath(spillableBasePath)
+ .withDiskMapType(diskMapType)
+ .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
+ .withOptimizedLogBlocksScan(enableOptimizedLogBlocksScan)
+ .build()) {
+ assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200
records");
+ final List<String> readKeys = new ArrayList<>(200);
+ final List<String> recordKeys = new ArrayList<>(200);
+ final List<Boolean> emptyPayloads = new ArrayList<>();
+ scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
+ scanner.forEach(s -> {
+ try {
+ if (!((HoodieRecordPayload) s.getData()).getInsertValue(schema, new
Properties()).isPresent()) {
+ emptyPayloads.add(true);
+ } else {
+ recordKeys.add(s.getKey().getRecordKey());
+ }
+ } catch (IOException io) {
+ throw new UncheckedIOException(io);
+ }
+ });
+ assertEquals(200, readKeys.size(), "Stream collect should return all 200
records");
+ assertEquals(70, emptyPayloads.size(), "Stream collect should return all
70 records with empty payloads");
+ Collections.sort(originalKeys);
+ Collections.sort(readKeys);
+ assertEquals(originalKeys, readKeys, "200 records should be scanned
regardless of deletes or not");
+
+ originalKeys.removeAll(deleteKeyListInV2Block);
+ originalKeys.removeAll(
+
deletedRecords.stream().map(DeleteRecord::getRecordKey).collect(Collectors.toList()));
+ Collections.sort(originalKeys);
+ Collections.sort(recordKeys);
+ assertEquals(originalKeys, recordKeys, "Only 130 records should exist
after deletion");
+ }
+ }
+
@Test
public void testAvroLogRecordReaderWithRollbackOlderBlocks()
throws IOException, URISyntaxException, InterruptedException {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
new file mode 100644
index 00000000000..ccba018e64f
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieDeleteBlock.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/**
+ * Tests serialization and deserialization of Hudi delete log block.
+ */
+public class TestHoodieDeleteBlock {
+ private static String KEY_PREFIX = "key";
+ private static String PARTITION_PATH = "2023-01-01";
+
+ private static Random random = new Random();
+
+ @Test
+ public void testSerializeAndDeserializeV3DeleteBlock() throws IOException {
+ DeleteRecord[] deleteRecords = IntStream.range(0, 100)
+ .mapToObj(i -> DeleteRecord.create(KEY_PREFIX + i, PARTITION_PATH,
random.nextLong()))
+ .toArray(DeleteRecord[]::new);
+ testDeleteBlockWithValidation(deleteRecords);
+ }
+
+ @Test
+ public void testDeserializeV2DeleteBlock() {
+ // The content is Kryo serialized with V2 delete block format
+ byte[] contentBytes = new byte[] {
+ 0, 0, 0, 2, 0, 0, 0, -88, 1, 0, 91, 76, 111, 114, 103, 46, 97, 112,
97, 99, 104, 101, 46, 104,
+ 117, 100, 105, 46, 99, 111, 109, 109, 111, 110, 46, 109, 111, 100,
101, 108, 46, 68, 101, 108,
+ 101, 116, 101, 82, 101, 99, 111, 114, 100, -69, 1, 3, 1, 1, 111, 114,
103, 46, 97, 112, 97, 99,
+ 104, 101, 46, 104, 117, 100, 105, 46, 99, 111, 109, 109, 111, 110, 46,
109, 111, 100, 101, 108,
+ 46, 68, 101, 108, 101, 116, 101, 82, 101, 99, 111, 114, -28, 1, 1, 2,
111, 114, 103, 46, 97, 112,
+ 97, 99, 104, 101, 46, 104, 117, 100, 105, 46, 99, 111, 109, 109, 111,
110, 46, 109, 111, 100,
+ 101, 108, 46, 72, 111, 111, 100, 105, 101, 75, 101, -7, 1, 1, 50, 48,
50, 51, 45, 48, 49, 45,
+ 48, -79, 1, 107, 101, 121, -79, 2, -30, 91, 1, 1, 1, 1, 2, 1, 5, 1,
107, 101, 121, -78, 2, -60,
+ -73, 1
+ };
+
+ DeleteRecord[] deleteRecords = IntStream.range(1, 3)
+ .mapToObj(i -> DeleteRecord.create(KEY_PREFIX + i, PARTITION_PATH, i *
5873))
+ .toArray(DeleteRecord[]::new);
+ HoodieDeleteBlock deserializeDeleteBlock = new HoodieDeleteBlock(
+ Option.of(contentBytes), null, true, Option.empty(), new HashMap<>(),
new HashMap<>());
+ DeleteRecord[] deserializedDeleteRecords =
deserializeDeleteBlock.getRecordsToDelete();
+
assertEquals(Arrays.stream(deleteRecords).sorted(Comparator.comparing(DeleteRecord::getRecordKey))
+ .collect(Collectors.toList()),
+
Arrays.stream(deserializedDeleteRecords).sorted(Comparator.comparing(DeleteRecord::getRecordKey))
+ .collect(Collectors.toList()));
+ }
+
+ public static Stream<Arguments> orderingValueParams() {
+ Object[][] data =
+ new Object[][] {
+ {new Boolean[] {false, true}},
+ {new Integer[] {Integer.MIN_VALUE, 14235, 2147465340,
Integer.MAX_VALUE}},
+ {new Long[] {Long.MIN_VALUE, -233498L, 2930275823L,
Long.MAX_VALUE}},
+ {new Float[] {Float.MIN_VALUE, 0.125f, Float.MAX_VALUE}},
+ {new Double[] {Double.MIN_VALUE, 0.125, 809.25, Double.MAX_VALUE}},
+ {new String[] {"val1", "val2", "val3", null}},
+ {new Timestamp[] {new Timestamp(1690766971000L), new
Timestamp(1672536571000L)}},
+ {new LocalDate[] {LocalDate.of(2023, 1, 1), LocalDate.of(1980, 7,
1)}},
+ {new BigDecimal[] {new BigDecimal("12345678901234.2948"),
+ new BigDecimal("23456789012345.4856")}}
+ };
+ return Stream.of(data).map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("orderingValueParams")
+ public void testOrderingValueInDeleteRecords(Comparable[] orderingValues)
throws IOException {
+ DeleteRecord[] deleteRecords = new DeleteRecord[orderingValues.length];
+ for (int i = 0; i < orderingValues.length; i++) {
+ deleteRecords[i] = DeleteRecord.create(
+ KEY_PREFIX + i, PARTITION_PATH, orderingValues[i]);
+ }
+ testDeleteBlockWithValidation(deleteRecords);
+ }
+
+ public void testDeleteBlockWithValidation(DeleteRecord[] deleteRecords)
throws IOException {
+ HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(deleteRecords, new
HashMap<>());
+ byte[] contentBytes = deleteBlock.getContentBytes();
+ HoodieDeleteBlock deserializeDeleteBlock = new HoodieDeleteBlock(
+ Option.of(contentBytes), null, true, Option.empty(), new HashMap<>(),
new HashMap<>());
+ DeleteRecord[] deserializedDeleteRecords =
deserializeDeleteBlock.getRecordsToDelete();
+ assertEquals(deleteRecords.length, deserializedDeleteRecords.length);
+ for (int i = 0; i < deleteRecords.length; i++) {
+ assertEquals(deleteRecords[i].getHoodieKey(),
deserializedDeleteRecords[i].getHoodieKey());
+ if (deleteRecords[i].getOrderingValue() != null) {
+ if (deleteRecords[i].getOrderingValue() instanceof Timestamp) {
+ assertEquals(((Timestamp)
deleteRecords[i].getOrderingValue()).getTime(),
+ ((Instant)
deserializedDeleteRecords[i].getOrderingValue()).toEpochMilli());
+ } else if (deleteRecords[i].getOrderingValue() instanceof BigDecimal) {
+ assertEquals("0.000000000000000",
+ ((BigDecimal) deleteRecords[i].getOrderingValue())
+ .subtract((BigDecimal)
deserializedDeleteRecords[i].getOrderingValue()).toPlainString());
+ } else {
+ assertEquals(deleteRecords[i].getOrderingValue(),
+ deserializedDeleteRecords[i].getOrderingValue());
+ }
+ } else {
+ assertNull(deserializedDeleteRecords[i].getOrderingValue());
+ }
+ }
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
index 69a2881f400..8f3cbe5b19f 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/SchemaTestUtil.java
@@ -48,11 +48,14 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -136,17 +139,55 @@ public final class SchemaTestUtil {
return fs.getPath(array[1]);
}
+ /**
+ * Generates a list of random UUIDs
+ *
+ * @param size Number of UUIDs to return.
+ * @param existingUUIDs Existing UUIDs to include.
+ * @return A list of UUIDs.
+ */
+ public List<String> genRandomUUID(int size, List<String> existingUUIDs) {
+ Set<String> uuidSet = new HashSet<>(existingUUIDs);
+ while (uuidSet.size() < size) {
+ uuidSet.add(genRandomUUID());
+ }
+ return new ArrayList<>(uuidSet);
+ }
+
public List<IndexedRecord> generateHoodieTestRecords(int from, int limit)
throws IOException, URISyntaxException {
- List<IndexedRecord> records = generateTestRecords(from, limit);
String instantTime = HoodieActiveTimeline.createNewInstantTime();
+ List<String> recorKeyList = genRandomUUID(limit, Collections.emptyList());
+ return generateHoodieTestRecords(from, recorKeyList, "0000/00/00",
instantTime);
+ }
+
+ /**
+ * Generates test records.
+ *
+ * @param from Offset to start picking records.
+ * @param recordKeyList Record keys to use.
+ * @param partitionPath Partition path to use.
+ * @param instantTime Hudi instant time.
+ * @return A list of {@link IndexedRecord}.
+ * @throws IOException
+ * @throws URISyntaxException
+ */
+ public List<IndexedRecord> generateHoodieTestRecords(int from,
+ List<String>
recordKeyList,
+ String partitionPath,
+ String instantTime)
+ throws IOException, URISyntaxException {
+ List<IndexedRecord> records = generateTestRecords(from,
recordKeyList.size());
Schema hoodieFieldsSchema =
HoodieAvroUtils.addMetadataFields(getSimpleSchema());
- return records.stream().map(s ->
HoodieAvroUtils.rewriteRecord((GenericRecord) s, hoodieFieldsSchema)).map(p -> {
- p.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, genRandomUUID());
- p.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, "0000/00/00");
- p.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, instantTime);
- return p;
- }).collect(Collectors.toList());
+ List<IndexedRecord> recordsWithMetaFields = new ArrayList<>();
+ for (int i = 0; i < recordKeyList.size(); i++) {
+ GenericRecord newRecord = HoodieAvroUtils.rewriteRecord((GenericRecord)
records.get(i), hoodieFieldsSchema);
+ newRecord.put(HoodieRecord.RECORD_KEY_METADATA_FIELD,
recordKeyList.get(i));
+ newRecord.put(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionPath);
+ newRecord.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, instantTime);
+ recordsWithMetaFields.add(newRecord);
+ }
+ return recordsWithMetaFields;
}
public List<HoodieRecord> generateHoodieTestRecords(int from, int limit,
Schema schema)
diff --git
a/hudi-common/src/test/resources/format/delete-block-v2-content-10-records.data
b/hudi-common/src/test/resources/format/delete-block-v2-content-10-records.data
new file mode 100644
index 00000000000..58d5d9a53bd
Binary files /dev/null and
b/hudi-common/src/test/resources/format/delete-block-v2-content-10-records.data
differ