This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 26ecb287684 [HUDI-6801] Implement merging partial updates from log
files for MOR tables (#9883)
26ecb287684 is described below
commit 26ecb2876842fa2d6c79aa1a0e6f55277f25cba8
Author: Y Ethan Guo <[email protected]>
AuthorDate: Mon Oct 30 23:26:30 2023 -0700
[HUDI-6801] Implement merging partial updates from log files for MOR tables
(#9883)
This commit adds the logic of merging partial updates in the new file group
reader with Spark record merger.
---
.../org/apache/hudi/HoodieSparkRecordMerger.java | 40 ++++
.../hudi/common/model/HoodieSparkRecord.java | 19 +-
.../io/storage/HoodieSparkFileReaderFactory.java | 2 +-
.../hudi/io/storage/HoodieSparkParquetReader.java | 2 +-
.../apache/hudi/merge/SparkRecordMergingUtils.java | 234 +++++++++++++++++++++
.../hudi/BaseSparkInternalRowReaderContext.java | 4 +-
.../SparkFileFormatInternalRowReaderContext.scala | 52 ++++-
.../hudi/util/CloseableInternalRowIterator.scala | 1 +
.../hudi/common/engine/HoodieReaderContext.java | 15 +-
.../java/org/apache/hudi/common/fs/FSUtils.java | 5 +-
.../hudi/common/model/HoodieRecordMerger.java | 62 +++++-
.../table/log/block/HoodieAvroDataBlock.java | 2 +-
.../common/table/log/block/HoodieDataBlock.java | 14 +-
.../read/HoodieBaseFileGroupRecordBuffer.java | 43 +++-
.../read/HoodieKeyBasedFileGroupRecordBuffer.java | 20 +-
.../HoodiePositionBasedFileGroupRecordBuffer.java | 18 +-
.../org/apache/hudi/common/fs/TestFSUtils.java | 3 +
.../table/read/TestHoodieFileGroupReaderBase.java | 35 +--
.../read/TestHoodieFileGroupReaderOnSpark.scala | 19 +-
.../sql/hudi/TestPartialUpdateForMergeInto.scala | 225 +++++++++++---------
20 files changed, 643 insertions(+), 172 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
index 4b6bdbae46d..a488e7174e5 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.merge.SparkRecordMergingUtils;
import org.apache.avro.Schema;
@@ -76,6 +77,45 @@ public class HoodieSparkRecordMerger implements
HoodieRecordMerger {
}
}
+ @Override
+ public Option<Pair<HoodieRecord, Schema>> partialMerge(HoodieRecord older,
Schema oldSchema, HoodieRecord newer, Schema newSchema, Schema readerSchema,
TypedProperties props) throws IOException {
+ ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecordType.SPARK);
+ ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecordType.SPARK);
+
+ if (newer instanceof HoodieSparkRecord) {
+ HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
+ if (newSparkRecord.isDeleted()) {
+ // Delete record
+ return Option.empty();
+ }
+ } else {
+ if (newer.getData() == null) {
+ // Delete record
+ return Option.empty();
+ }
+ }
+
+ if (older instanceof HoodieSparkRecord) {
+ HoodieSparkRecord oldSparkRecord = (HoodieSparkRecord) older;
+ if (oldSparkRecord.isDeleted()) {
+ // use natural order for delete record
+ return Option.of(Pair.of(newer, newSchema));
+ }
+ } else {
+ if (older.getData() == null) {
+ // use natural order for delete record
+ return Option.of(Pair.of(newer, newSchema));
+ }
+ }
+ if (older.getOrderingValue(oldSchema,
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+ return Option.of(SparkRecordMergingUtils.mergePartialRecords(
+ (HoodieSparkRecord) newer, newSchema, (HoodieSparkRecord) older,
oldSchema, readerSchema, props));
+ } else {
+ return Option.of(SparkRecordMergingUtils.mergePartialRecords(
+ (HoodieSparkRecord) older, oldSchema, (HoodieSparkRecord) newer,
newSchema, readerSchema, props));
+ }
+ }
+
@Override
public HoodieRecordType getRecordType() {
return HoodieRecordType.SPARK;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index 127759c8a22..3d59ad27257 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -18,10 +18,6 @@
package org.apache.hudi.common.model;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import org.apache.avro.Schema;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.client.model.HoodieInternalRow;
import org.apache.hudi.common.util.ConfigUtils;
@@ -32,23 +28,30 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.SparkKeyGeneratorInterface;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
import org.apache.spark.sql.HoodieInternalRowUtils;
import org.apache.spark.sql.HoodieUnsafeRowUtils;
import org.apache.spark.sql.HoodieUnsafeRowUtils.NestedFieldPath;
import org.apache.spark.sql.catalyst.CatalystTypeConverters;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.expressions.JoinedRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
-import scala.Function1;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
+import scala.Function1;
+
import static
org.apache.hudi.common.table.HoodieTableConfig.POPULATE_META_FIELDS;
import static
org.apache.spark.sql.HoodieInternalRowUtils.getCachedUnsafeProjection;
import static org.apache.spark.sql.types.DataTypes.BooleanType;
@@ -434,13 +437,17 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
// NOTE: [[HoodieSparkRecord]] is expected to hold either
// - Instance of [[UnsafeRow]] or
// - Instance of [[HoodieInternalRow]] or
+ // - Instance of [[GenericInternalRow]]
// - Instance of [[ColumnarBatchRow]]
//
// In case provided row is anything but [[UnsafeRow]], it's expected
that the
// corresponding schema has to be provided as well so that it could
be properly
// serialized (in case it would need to be)
boolean isValid = data == null || data instanceof UnsafeRow
- || schema != null && (data instanceof HoodieInternalRow ||
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
+ || schema != null && (
+ data instanceof HoodieInternalRow
+ || data instanceof GenericInternalRow
+ ||
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
ValidationUtils.checkState(isValid);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
index de7810be8ae..c34e2037dac 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileReaderFactory.java
@@ -29,7 +29,7 @@ import java.io.IOException;
public class HoodieSparkFileReaderFactory extends HoodieFileReaderFactory {
- protected HoodieFileReader newParquetFileReader(Configuration conf, Path
path) {
+ public HoodieFileReader newParquetFileReader(Configuration conf, Path path) {
conf.setIfUnset(SQLConf.PARQUET_BINARY_AS_STRING().key(),
SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString());
conf.setIfUnset(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(),
SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString());
conf.setIfUnset(SQLConf.CASE_SENSITIVE().key(),
SQLConf.CASE_SENSITIVE().defaultValueString());
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index 046fa98fbea..79444d179ce 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -115,7 +115,7 @@ public class HoodieSparkParquetReader implements
HoodieSparkFileReader {
});
}
- private ClosableIterator<InternalRow> getInternalRowIterator(Schema
readerSchema, Schema requestedSchema) throws IOException {
+ public ClosableIterator<InternalRow> getInternalRowIterator(Schema
readerSchema, Schema requestedSchema) throws IOException {
if (requestedSchema == null) {
requestedSchema = readerSchema;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
new file mode 100644
index 00000000000..f02ebfc8293
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.merge;
+
+import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Util class to merge records that may contain partial updates.
+ * This can be used by any Spark {@link HoodieRecordMerger} implementation.
+ */
+public class SparkRecordMergingUtils {
+ private static final Map<Schema, Map<Integer, StructField>>
FIELD_ID_TO_FIELD_MAPPING_CACHE = new ConcurrentHashMap<>();
+ private static final Map<Schema, Map<String, Integer>>
FIELD_NAME_TO_ID_MAPPING_CACHE = new ConcurrentHashMap<>();
+ private static final Map<Pair<Pair<Schema, Schema>, Schema>,
+ Pair<Map<Integer, StructField>, Pair<StructType, Schema>>>
MERGED_SCHEMA_CACHE = new ConcurrentHashMap<>();
+
+ /**
+ * Merges records which can contain partial updates.
+ * <p>
+ * For example, the reader schema is
+ * {[
+ * {"name":"id", "type":"string"},
+ * {"name":"ts", "type":"long"},
+ * {"name":"name", "type":"string"},
+ * {"name":"price", "type":"double"},
+ * {"name":"tags", "type":"string"}
+ * ]}
+ * The older and newer records can be (omitting Hudi meta fields):
+ * <p>
+ * (1) older (complete record update):
+ * id | ts | name | price | tags
+ * 1 | 10 | apple | 2.3 | fruit
+ * <p>
+ * newer (partial record update):
+ * ts | price
+ * 16 | 2.8
+ * <p>
+ * The merging result is (updated values from newer replaces the ones in the
older):
+ * <p>
+ * id | ts | name | price | tags
+ * 1 | 16 | apple | 2.8 | fruit
+ * <p>
+ * (2) older (partial record update):
+ * ts | price
+ * 10 | 2.8
+ * <p>
+ * newer (partial record update):
+ * ts | tag
+ * 16 | fruit,juicy
+ * <p>
+ * The merging result is (two partial updates are merged together, and
values of overlapped
+ * fields come from the newer):
+ * <p>
+ * ts | price | tags
+ * 16 | 2.8 | fruit,juicy
+ *
+ * @param older Older {@link HoodieSparkRecord}.
+ * @param oldSchema Schema of the older record.
+ * @param newer Newer {@link HoodieSparkRecord}.
+ * @param newSchema Schema of the newer record.
+ * @param readerSchema Reader schema containing all the fields to read. This
is used to maintain
+ * the ordering of the fields of the merged record.
+ * @param props Configuration in {@link TypedProperties}.
+ * @return The merged record and schema.
+ */
+ public static Pair<HoodieRecord, Schema>
mergePartialRecords(HoodieSparkRecord older,
+ Schema
oldSchema,
+
HoodieSparkRecord newer,
+ Schema
newSchema,
+ Schema
readerSchema,
+ TypedProperties
props) {
+ // The merged schema contains fields that only appear in either older
and/or newer record
+ Pair<Map<Integer, StructField>, Pair<StructType, Schema>> mergedSchemaPair
=
+ getCachedMergedSchema(oldSchema, newSchema, readerSchema);
+ boolean isNewerPartial = isPartial(newSchema,
mergedSchemaPair.getRight().getRight());
+ if (isNewerPartial) {
+ InternalRow oldRow = older.getData();
+ InternalRow newPartialRow = newer.getData();
+
+ Map<Integer, StructField> mergedIdToFieldMapping =
mergedSchemaPair.getLeft();
+ Map<String, Integer> newPartialNameToIdMapping =
getCachedFieldNameToIdMapping(newSchema);
+ List<Object> values = new ArrayList<>(mergedIdToFieldMapping.size());
+ for (int fieldId = 0; fieldId < mergedIdToFieldMapping.size();
fieldId++) {
+ StructField structField = mergedIdToFieldMapping.get(fieldId);
+ Integer ordInPartialUpdate =
newPartialNameToIdMapping.get(structField.name());
+ if (ordInPartialUpdate != null) {
+ // The field exists in the newer record; picks the value from newer
record
+ values.add(newPartialRow.get(ordInPartialUpdate,
structField.dataType()));
+ } else {
+ // The field does not exist in the newer record; picks the value
from older record
+ values.add(oldRow.get(fieldId, structField.dataType()));
+ }
+ }
+ InternalRow mergedRow = new GenericInternalRow(values.toArray());
+
+ HoodieSparkRecord mergedSparkRecord = new HoodieSparkRecord(
+ mergedRow, mergedSchemaPair.getRight().getLeft());
+ return Pair.of(mergedSparkRecord,
mergedSchemaPair.getRight().getRight());
+ } else {
+ return Pair.of(newer, newSchema);
+ }
+ }
+
+ /**
+ * @param avroSchema Avro schema.
+ * @return The field ID to {@link StructField} instance mapping.
+ */
+ public static Map<Integer, StructField>
getCachedFieldIdToFieldMapping(Schema avroSchema) {
+ return FIELD_ID_TO_FIELD_MAPPING_CACHE.computeIfAbsent(avroSchema, schema
-> {
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
+ Map<Integer, StructField> schemaFieldIdMapping = new HashMap<>();
+ int fieldId = 0;
+
+ for (StructField field : structType.fields()) {
+ schemaFieldIdMapping.put(fieldId, field);
+ fieldId++;
+ }
+
+ return schemaFieldIdMapping;
+ });
+ }
+
+ /**
+ * @param avroSchema Avro schema.
+ * @return The field name to ID mapping.
+ */
+ public static Map<String, Integer> getCachedFieldNameToIdMapping(Schema
avroSchema) {
+ return FIELD_NAME_TO_ID_MAPPING_CACHE.computeIfAbsent(avroSchema, schema
-> {
+ StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
+ Map<String, Integer> schemaFieldIdMapping = new HashMap<>();
+ int fieldId = 0;
+
+ for (StructField field : structType.fields()) {
+ schemaFieldIdMapping.put(field.name(), fieldId);
+ fieldId++;
+ }
+
+ return schemaFieldIdMapping;
+ });
+ }
+
+ /**
+ * Merges the two schemas so the merged schema contains all the fields from
the two schemas,
+ * with the same ordering of fields based on the provided reader schema.
+ *
+ * @param oldSchema Old schema.
+ * @param newSchema New schema.
+ * @param readerSchema Reader schema containing all the fields to read.
+ * @return The ID to {@link StructField} instance mapping of the merged
schema, and the
+ * {@link StructType} and Avro schema of the merged schema.
+ */
+ public static Pair<Map<Integer, StructField>, Pair<StructType, Schema>>
getCachedMergedSchema(Schema oldSchema,
+
Schema newSchema,
+
Schema readerSchema) {
+ return MERGED_SCHEMA_CACHE.computeIfAbsent(
+ Pair.of(Pair.of(oldSchema, newSchema), readerSchema), schemaPair -> {
+ Schema schema1 = schemaPair.getLeft().getLeft();
+ Schema schema2 = schemaPair.getLeft().getRight();
+ Schema refSchema = schemaPair.getRight();
+ Map<String, Integer> nameToIdMapping1 =
getCachedFieldNameToIdMapping(schema1);
+ Map<String, Integer> nameToIdMapping2 =
getCachedFieldNameToIdMapping(schema2);
+ // Mapping of field ID/position to the StructField instance of the
readerSchema
+ Map<Integer, StructField> refFieldIdToFieldMapping =
getCachedFieldIdToFieldMapping(refSchema);
+ // This field name set contains all the fields that appear
+ // either in the oldSchema and/or the newSchema
+ Set<String> fieldNameSet = new HashSet<>();
+ fieldNameSet.addAll(nameToIdMapping1.keySet());
+ fieldNameSet.addAll(nameToIdMapping2.keySet());
+ int fieldId = 0;
+ Map<Integer, StructField> mergedMapping = new HashMap<>();
+ List<StructField> mergedFieldList = new ArrayList<>();
+ // Iterates over the fields based on the original ordering of the
fields of the
+ // readerSchema using the field ID/position from 0
+ for (int i = 0; i < refFieldIdToFieldMapping.size(); i++) {
+ StructField field = refFieldIdToFieldMapping.get(i);
+ if (fieldNameSet.contains(field.name())) {
+ mergedMapping.put(fieldId, field);
+ mergedFieldList.add(field);
+ fieldId++;
+ }
+ }
+ StructType mergedStructType = new
StructType(mergedFieldList.toArray(new StructField[0]));
+ Schema mergedSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(
+ mergedStructType, readerSchema.getName(),
readerSchema.getNamespace());
+ return Pair.of(mergedMapping, Pair.of(mergedStructType,
mergedSchema));
+ });
+ }
+
+ /**
+ * @param schema Avro schema to check.
+ * @param mergedSchema The merged schema for the merged record.
+ * @return whether the Avro schema is partial compared to the merged schema.
+ */
+ public static boolean isPartial(Schema schema, Schema mergedSchema) {
+ return !schema.equals(mergedSchema);
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 221d734dfb7..5360535620f 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -94,8 +94,7 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
@Override
public HoodieRecord<InternalRow> constructHoodieRecord(Option<InternalRow>
rowOption,
- Map<String, Object>
metadataMap,
- Schema schema) {
+ Map<String, Object>
metadataMap) {
if (!rowOption.isPresent()) {
return new HoodieEmptyRecord<>(
new HoodieKey((String) metadataMap.get(INTERNAL_META_RECORD_KEY),
@@ -103,6 +102,7 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
HoodieRecord.HoodieRecordType.SPARK);
}
+ Schema schema = (Schema) metadataMap.get(INTERNAL_META_SCHEMA);
InternalRow row = rowOption.get();
return new HoodieSparkRecord(row,
HoodieInternalRowUtils.getCachedSchema(schema));
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 25ea9d3332b..af3d3fd239c 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -20,16 +20,23 @@
package org.apache.hudi
import org.apache.avro.Schema
+import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.common.engine.HoodieReaderContext
-import org.apache.hudi.common.util.collection.ClosableIterator
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util.collection.{ClosableIterator,
CloseableMappingIterator}
+import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory,
HoodieSparkParquetReader}
import org.apache.hudi.util.CloseableInternalRowIterator
+import org.apache.spark.sql.HoodieInternalRowUtils
+import org.apache.spark.sql.avro.HoodieAvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{HoodieInternalRowUtils, SparkSession}
+
+import scala.collection.mutable
/**
* Implementation of {@link HoodieReaderContext} to read {@link InternalRow}s
with
@@ -37,12 +44,14 @@ import org.apache.spark.sql.{HoodieInternalRowUtils,
SparkSession}
*
* This uses Spark parquet reader to read parquet data files or parquet log
blocks.
*
- * @param baseFileReader A reader that transforms a {@link PartitionedFile}
to an iterator of {@link InternalRow}
- * @param partitionValues The values for a partition in which the file group
lives.
+ * @param baseFileReader A reader that transforms a {@link PartitionedFile}
to an iterator of {@link InternalRow}.
+ * @param partitionValues The values for a partition in which the file group
lives.
*/
class SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile
=> Iterator[InternalRow],
partitionValues: InternalRow)
extends BaseSparkInternalRowReaderContext {
lazy val sparkAdapter = SparkAdapterSupport.sparkAdapter
+ lazy val sparkFileReaderFactory = new HoodieSparkFileReaderFactory
+ val deserializerMap: mutable.Map[Schema, HoodieAvroDeserializer] =
mutable.Map()
override def getFileRecordIterator(filePath: Path,
start: Long,
@@ -50,7 +59,38 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile =>
dataSchema: Schema,
requiredSchema: Schema,
conf: Configuration):
ClosableIterator[InternalRow] = {
- val fileInfo =
sparkAdapter.getSparkPartitionedFileUtils.createPartitionedFile(partitionValues,
filePath, start, length)
- new CloseableInternalRowIterator(baseFileReader.apply(fileInfo))
+ val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
+ .createPartitionedFile(partitionValues, filePath, start, length)
+ if (FSUtils.isLogFile(filePath)) {
+ val structType: StructType =
HoodieInternalRowUtils.getCachedSchema(dataSchema)
+ val projection: UnsafeProjection =
HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType)
+ new CloseableMappingIterator[InternalRow, UnsafeRow](
+ sparkFileReaderFactory.newParquetFileReader(conf,
filePath).asInstanceOf[HoodieSparkParquetReader]
+ .getInternalRowIterator(dataSchema, dataSchema),
+ new java.util.function.Function[InternalRow, UnsafeRow] {
+ override def apply(data: InternalRow): UnsafeRow = {
+ // NOTE: We have to do [[UnsafeProjection]] of incoming
[[InternalRow]] to convert
+ // it to [[UnsafeRow]] holding just raw bytes
+ projection.apply(data)
+ }
+ }).asInstanceOf[ClosableIterator[InternalRow]]
+ } else {
+ new CloseableInternalRowIterator(baseFileReader.apply(fileInfo))
+ }
+ }
+
+ /**
+ * Converts an Avro record, e.g., serialized in the log files, to an
[[InternalRow]].
+ *
+ * @param avroRecord The Avro record.
+ * @return An [[InternalRow]].
+ */
+ override def convertAvroRecord(avroRecord: IndexedRecord): InternalRow = {
+ val schema = avroRecord.getSchema
+ val structType = HoodieInternalRowUtils.getCachedSchema(schema)
+ val deserializer = deserializerMap.getOrElseUpdate(schema, {
+ sparkAdapter.createAvroDeserializer(schema, structType)
+ })
+ deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow]
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/CloseableInternalRowIterator.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/CloseableInternalRowIterator.scala
index 2b01b270a7c..30a5e93fb63 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/CloseableInternalRowIterator.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/CloseableInternalRowIterator.scala
@@ -46,6 +46,7 @@ class CloseableInternalRowIterator(iterator: Iterator[_])
extends ClosableIterat
override def next: InternalRow = {
if (!entryTypeKnown) {
+ entryTypeKnown = true
// First entry
val nextVal = iterator.next
seqInBatch = 0
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 9ccdfd863c2..d561b059183 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -52,6 +53,7 @@ public abstract class HoodieReaderContext<T> {
public static final String INTERNAL_META_ORDERING_FIELD = "_2";
public static final String INTERNAL_META_OPERATION = "_3";
public static final String INTERNAL_META_INSTANT_TIME = "_4";
+ public static final String INTERNAL_META_SCHEMA = "_5";
/**
* Gets the file system based on the file path and configuration.
@@ -77,6 +79,14 @@ public abstract class HoodieReaderContext<T> {
public abstract ClosableIterator<T> getFileRecordIterator(
Path filePath, long start, long length, Schema dataSchema, Schema
requiredSchema, Configuration conf);
+ /**
+ * Converts an Avro record, e.g., serialized in the log files, to an
engine-specific record.
+ *
+ * @param avroRecord The Avro record.
+ * @return An engine-specific record in Type {@link T}.
+ */
+ public abstract T convertAvroRecord(IndexedRecord avroRecord);
+
/**
* @param mergerStrategy Merger strategy UUID.
* @return {@link HoodieRecordMerger} to use.
@@ -121,12 +131,10 @@ public abstract class HoodieReaderContext<T> {
*
* @param recordOption An option of the record in engine-specific type if
exists.
* @param metadataMap The record metadata.
- * @param schema The Avro schema of the record.
* @return A new instance of {@link HoodieRecord}.
*/
public abstract HoodieRecord<T> constructHoodieRecord(Option<T> recordOption,
- Map<String, Object>
metadataMap,
- Schema schema);
+ Map<String, Object>
metadataMap);
/**
* Seals the engine-specific record to make sure the data referenced in
memory do not change.
@@ -163,6 +171,7 @@ public abstract class HoodieReaderContext<T> {
public Map<String, Object> generateMetadataForRecord(T record, Schema
schema) {
Map<String, Object> meta = new HashMap<>();
meta.put(INTERNAL_META_RECORD_KEY, getRecordKey(record, schema));
+ meta.put(INTERNAL_META_SCHEMA, schema);
return meta;
}
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index 60d8d09f3d5..5d9896f8f2a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -21,6 +21,7 @@ package org.apache.hudi.common.fs;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.fs.inline.InLineFileSystem;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -467,7 +468,9 @@ public class FSUtils {
}
public static boolean isLogFile(Path logPath) {
- return isLogFile(logPath.getName());
+ String scheme = logPath.toUri().getScheme();
+ return isLogFile(InLineFileSystem.SCHEME.equals(scheme)
+ ? InLineFSUtils.getOuterFilePathFromInlinePath(logPath).getName() :
logPath.getName());
}
public static boolean isLogFile(String fileName) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
index 66a2ad77f2e..7a259d0bd9a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java
@@ -18,7 +18,6 @@
package org.apache.hudi.common.model;
-import org.apache.avro.Schema;
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.common.config.TypedProperties;
@@ -26,6 +25,8 @@ import
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.avro.Schema;
+
import java.io.IOException;
import java.io.Serializable;
@@ -43,9 +44,68 @@ public interface HoodieRecordMerger extends Serializable {
* This method converges combineAndGetUpdateValue and precombine from
HoodiePayload.
* It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we
can translate as having 3 versions A, B, C
* of the single record, both orders of operations applications have to
yield the same result)
+ * This method takes only full records for merging.
*/
Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException;
+ /**
+ * Merges records which can contain partial updates, i.e., only subset of
fields and values are
+ * present in the record representing the updates, and absent fields are not
updated. The fields
+ * exist in older and newer records indicate the fields with changed values.
When merging, only
+ * the changed fields should be included in the merging results.
+ * <p>
+ * For example, the reader schema is
+ * {[
+ * {"name":"id", "type":"string"},
+ * {"name":"ts", "type":"long"},
+ * {"name":"name", "type":"string"},
+ * {"name":"price", "type":"double"},
+ * {"name":"tags", "type":"string"}
+ * ]}
+ * The older and newer records can be (omitting Hudi meta fields):
+ * <p>
+ * (1) older (complete record update):
+ * id | ts | name | price | tags
+ * 1 | 10 | apple | 2.3 | fruit
+ * <p>
+ * newer (partial record update):
+ * ts | price
+ * 16 | 2.8
+ * <p>
+ * In this case, in the newer record, only "ts" and "price" fields are
updated. With the default
+ * merging strategy, the newer record updates the older record and the
merging result is
+ * <p>
+ * id | ts | name | price | tags
+ * 1 | 16 | apple | 2.8 | fruit
+ * <p>
+ * (2) older (partial record update):
+ * ts | price
+ * 10 | 2.8
+ * <p>
+ * newer (partial record update):
+ * ts | tag
+ * 16 | fruit,juicy
+ * <p>
+ * In this case, in the older record, only "ts" and "price" fields are
updated. In the newer
+ * record, only "ts" and "tag" fields are updated. With the default merging
strategy, all the
+ * changed fields should be included in the merging results.
+ * <p>
+ * ts | price | tags
+ * 16 | 2.8 | fruit,juicy
+ *
+ * @param older Older record.
+ * @param oldSchema Schema of the older record.
+ * @param newer Newer record.
+ * @param newSchema Schema of the newer record.
+ * @param readerSchema Reader schema containing all the fields to read. This
is used to maintain
+ * the ordering of the fields of the merged record.
+ * @param props Configuration in {@link TypedProperties}.
+ * @return The merged record and schema.
+ * @throws IOException upon merging error.
+ */
+ default Option<Pair<HoodieRecord, Schema>> partialMerge(HoodieRecord older,
Schema oldSchema, HoodieRecord newer, Schema newSchema, Schema readerSchema,
TypedProperties props) throws IOException {
+ throw new UnsupportedOperationException("Partial merging logic is not
implemented.");
+ }
/**
* In some cases a business logic does some checks before flushing a merged
record to the disk.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index ca568c0f38c..156bebb4495 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -150,7 +150,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
protected <T> ClosableIterator<T> deserializeRecords(HoodieReaderContext<T>
readerContext, byte[] content) throws IOException {
checkState(this.readerSchema != null, "Reader's schema has to be
non-null");
RecordIterator iterator = RecordIterator.getInstance(this, content);
- return new CloseableMappingIterator<>(iterator, data -> (T) data);
+ return new CloseableMappingIterator<>(iterator, data ->
readerContext.convertAvroRecord(data));
}
private static class RecordIterator implements
ClosableIterator<IndexedRecord> {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index a0f9016f409..3e4025aa307 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -122,8 +122,13 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
this.shouldWriteRecordPositions = false;
this.records = Option.empty();
this.keyFieldName = keyFieldName;
- // If no reader-schema has been provided assume writer-schema as one
- this.readerSchema = readerSchema.orElseGet(() ->
getWriterSchema(super.getLogBlockHeader()));
+ this.readerSchema = containsPartialUpdates()
+ // When the data block contains partial updates, we need to strictly
use the writer schema
+ // from the log block header, as we need to use the partial schema to
indicate which
+ // fields are updated during merging.
+ ? getWriterSchema(super.getLogBlockHeader())
+ // If no reader-schema has been provided assume writer-schema as one
+ : readerSchema.orElseGet(() ->
getWriterSchema(super.getLogBlockHeader()));
this.enablePointLookups = enablePointLookups;
}
@@ -145,6 +150,11 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
return keyFieldName;
}
+ public boolean containsPartialUpdates() {
+ return getLogBlockHeader().containsKey(HeaderMetadataType.IS_PARTIAL)
+ &&
Boolean.parseBoolean(getLogBlockHeader().get(HeaderMetadataType.IS_PARTIAL));
+ }
+
protected static Schema getWriterSchema(Map<HeaderMetadataType, String>
logBlockHeader) {
return new
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA));
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 991428ae87c..4a1bd08e4ef 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -46,6 +46,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA;
+
public abstract class HoodieBaseFileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T> {
protected final HoodieReaderContext<T> readerContext;
protected final Schema readerSchema;
@@ -58,6 +60,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
protected ClosableIterator<T> baseFileIterator;
protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
protected T nextRecord;
+ protected boolean enablePartialMerging = false;
public HoodieBaseFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
Schema readerSchema,
@@ -122,13 +125,25 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
// Merge and store the combined record
// Note that the incoming `record` is from an older commit, so it should
be put as
// the `older` in the merge API
- HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(
- readerContext.constructHoodieRecord(Option.of(record), metadata,
readerSchema),
- readerSchema,
- readerContext.constructHoodieRecord(
- existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), readerSchema),
- readerSchema,
- payloadProps).get().getLeft();
+ HoodieRecord<T> combinedRecord;
+ if (enablePartialMerging) {
+ combinedRecord = (HoodieRecord<T>) recordMerger.partialMerge(
+ readerContext.constructHoodieRecord(Option.of(record), metadata),
+ (Schema) metadata.get(INTERNAL_META_SCHEMA),
+ readerContext.constructHoodieRecord(
+ existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
+ (Schema)
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
+ readerSchema,
+ payloadProps).get().getLeft();
+ } else {
+ combinedRecord = (HoodieRecord<T>) recordMerger.merge(
+ readerContext.constructHoodieRecord(Option.of(record), metadata),
+ (Schema) metadata.get(INTERNAL_META_SCHEMA),
+ readerContext.constructHoodieRecord(
+ existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
+ (Schema)
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
+ payloadProps).get().getLeft();
+ }
// If pre-combine returns existing record, no need to update it
if (combinedRecord.getData() !=
existingRecordMetadataPair.getLeft().get()) {
return Option.of(combinedRecord.getData());
@@ -210,9 +225,17 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
return newer;
}
- Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.merge(
- readerContext.constructHoodieRecord(older, olderInfoMap,
baseFileSchema), baseFileSchema,
- readerContext.constructHoodieRecord(newer, newerInfoMap,
readerSchema), readerSchema, payloadProps);
+ Option<Pair<HoodieRecord, Schema>> mergedRecord;
+ if (enablePartialMerging) {
+ mergedRecord = recordMerger.partialMerge(
+ readerContext.constructHoodieRecord(older, olderInfoMap), (Schema)
olderInfoMap.get(INTERNAL_META_SCHEMA),
+ readerContext.constructHoodieRecord(newer, newerInfoMap), (Schema)
newerInfoMap.get(INTERNAL_META_SCHEMA),
+ readerSchema, payloadProps);
+ } else {
+ mergedRecord = recordMerger.merge(
+ readerContext.constructHoodieRecord(older, olderInfoMap), (Schema)
olderInfoMap.get(INTERNAL_META_SCHEMA),
+ readerContext.constructHoodieRecord(newer, newerInfoMap), (Schema)
newerInfoMap.get(INTERNAL_META_SCHEMA), payloadProps);
+ }
if (mergedRecord.isPresent()) {
return Option.ofNullable((T) mergedRecord.get().getLeft().getData());
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index 479a0c1b339..6e5679adfbf 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -39,8 +39,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
-import static org.apache.hudi.common.util.ValidationUtils.checkState;
-
/**
* A buffer that is used to store log records by {@link
org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
* by calling the {@link #processDataBlock} and {@link #processDeleteBlock}
methods into a record key based map.
@@ -66,17 +64,19 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupR
@Override
public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec>
keySpecOpt) throws IOException {
- checkState(partitionNameOverrideOpt.isPresent() ||
partitionPathFieldOpt.isPresent(),
- "Either partition-name override or partition-path field had to be
present");
-
-
Pair<ClosableIterator<T>, Schema> recordsIteratorSchemaPair =
getRecordsIterator(dataBlock, keySpecOpt);
+ if (dataBlock.containsPartialUpdates()) {
+ // When a data block contains partial updates, subsequent record merging
must always use
+ // partial merging.
+ enablePartialMerging = true;
+ }
try (ClosableIterator<T> recordIterator =
recordsIteratorSchemaPair.getLeft()) {
while (recordIterator.hasNext()) {
T nextRecord = recordIterator.next();
- Map<String, Object> metadata =
readerContext.generateMetadataForRecord(nextRecord, readerSchema);
+ Map<String, Object> metadata = readerContext.generateMetadataForRecord(
+ nextRecord, recordsIteratorSchemaPair.getRight());
String recordKey = (String)
metadata.get(HoodieReaderContext.INTERNAL_META_RECORD_KEY);
processNextDataRecord(nextRecord, metadata, recordKey);
}
@@ -127,10 +127,12 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileGroupR
String recordKey = readerContext.getRecordKey(baseRecord,
baseFileSchema);
Pair<Option<T>, Map<String, Object>> logRecordInfo =
records.remove(recordKey);
+ Map<String, Object> metadata = readerContext.generateMetadataForRecord(
+ baseRecord, baseFileSchema);
Option<T> resultRecord = logRecordInfo != null
- ? merge(Option.of(baseRecord), Collections.emptyMap(),
logRecordInfo.getLeft(), logRecordInfo.getRight())
- : merge(Option.empty(), Collections.emptyMap(),
Option.of(baseRecord), Collections.emptyMap());
+ ? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(),
logRecordInfo.getRight())
+ : merge(Option.empty(), Collections.emptyMap(),
Option.of(baseRecord), metadata);
if (resultRecord.isPresent()) {
nextRecord = readerContext.seal(resultRecord.get());
return true;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index 6c80af8b56c..352e7c726d7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -42,8 +42,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.apache.hudi.common.util.ValidationUtils.checkState;
-
/**
* A buffer that is used to store log records by {@link
org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
* by calling the {@link #processDataBlock} and {@link #processDeleteBlock}
methods into record position based map.
@@ -71,9 +69,6 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileG
@Override
public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec>
keySpecOpt) throws IOException {
- checkState(partitionNameOverrideOpt.isPresent() ||
partitionPathFieldOpt.isPresent(),
- "Either partition-name override or partition-path field had to be
present");
-
// Prepare key filters.
Set<String> keys = new HashSet<>();
boolean isFullKey = true;
@@ -84,6 +79,12 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileG
isFullKey = keySpecOpt.get().isFullKey();
}
+ if (dataBlock.containsPartialUpdates()) {
+ // When a data block contains partial updates, subsequent record merging
must always use
+ // partial merging.
+ enablePartialMerging = true;
+ }
+
// Extract positions from data block.
List<Long> recordPositions = extractRecordPositions(dataBlock);
@@ -159,9 +160,12 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileG
T baseRecord = baseFileIterator.next();
Pair<Option<T>, Map<String, Object>> logRecordInfo =
records.remove(nextRecordPosition++);
+ Map<String, Object> metadata = readerContext.generateMetadataForRecord(
+ baseRecord, baseFileSchema);
+
Option<T> resultRecord = logRecordInfo != null
- ? merge(Option.of(baseRecord), Collections.emptyMap(),
logRecordInfo.getLeft(), logRecordInfo.getRight())
- : merge(Option.empty(), Collections.emptyMap(),
Option.of(baseRecord), Collections.emptyMap());
+ ? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(),
logRecordInfo.getRight())
+ : merge(Option.empty(), Collections.emptyMap(),
Option.of(baseRecord), metadata);
if (resultRecord.isPresent()) {
nextRecord = readerContext.seal(resultRecord.get());
return true;
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
index 612929bc8a6..350a14d591a 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.fs;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -244,7 +245,9 @@ public class TestFSUtils extends HoodieCommonTestHarness {
String logFile = FSUtils.makeLogFileName(fileName, ".log", "100", 2,
"1-0-1");
System.out.println("Log File =" + logFile);
Path rlPath = new Path(new Path(partitionPath), logFile);
+ Path inlineFsPath = InLineFSUtils.getInlineFilePath(rlPath, "file", 0,
100);
assertTrue(FSUtils.isLogFile(rlPath));
+ assertTrue(FSUtils.isLogFile(inlineFsPath));
assertEquals(fileName, FSUtils.getFileIdFromLogPath(rlPath));
assertEquals("100", FSUtils.getDeltaCommitTimeFromLogPath(rlPath));
assertEquals(2, FSUtils.getFileVersionFromLog(rlPath));
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index af1fc120c5b..febc0d32466 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.FileSlice;
@@ -35,6 +36,7 @@ import
org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
@@ -66,12 +68,13 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
public abstract String getBasePath();
- public abstract HoodieReaderContext<T> getHoodieReaderContext(String[]
partitionPath);
+ public abstract HoodieReaderContext<T> getHoodieReaderContext(String
tablePath, String[] partitionPath);
public abstract void commitToTable(List<String> recordList, String operation,
Map<String, String> writeConfigs);
- public abstract void validateRecordsInFileGroup(List<T> actualRecordList,
+ public abstract void validateRecordsInFileGroup(String tablePath,
+ List<T> actualRecordList,
Schema schema,
String fileGroupId);
@@ -90,7 +93,6 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
writeConfigs.put("hoodie.delete.shuffle.parallelism", "1");
writeConfigs.put("hoodie.merge.small.file.group.candidates.limit", "0");
writeConfigs.put("hoodie.compact.inline", "false");
- writeConfigs.put(HoodieMetadataConfig.ENABLE.key(), "false");
try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
// One commit; reading one file group containing a base file only
@@ -108,16 +110,19 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
}
private void validateOutputFromFileGroupReader(Configuration hadoopConf,
- String basePath,
+ String tablePath,
String[] partitionPaths,
int expectedLogFileNum)
throws Exception {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
- .setConf(hadoopConf).setBasePath(basePath).build();
+ .setConf(hadoopConf).setBasePath(tablePath).build();
Schema avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema();
+ HoodieEngineContext engineContext = new
HoodieLocalEngineContext(hadoopConf);
+ HoodieMetadataConfig metadataConfig =
HoodieMetadataConfig.newBuilder().build();
FileSystemViewManager viewManager =
FileSystemViewManager.createViewManager(
- new HoodieLocalEngineContext(hadoopConf),
HoodieMetadataConfig.newBuilder().build(),
- FileSystemViewStorageConfig.newBuilder().build(),
- HoodieCommonConfig.newBuilder().build());
+ engineContext, metadataConfig,
FileSystemViewStorageConfig.newBuilder().build(),
+ HoodieCommonConfig.newBuilder().build(),
+ mc -> HoodieTableMetadata.create(
+ engineContext, metadataConfig, mc.getBasePathV2().toString()));
SyncableFileSystemView fsView = viewManager.getFileSystemView(metaClient);
FileSlice fileSlice =
fsView.getAllFileSlices(partitionPaths[0]).findFirst().get();
List<String> logFilePathList = getLogFileListFromFileSlice(fileSlice);
@@ -129,12 +134,14 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
props.setProperty("hoodie.datasource.write.precombine.field", "timestamp");
props.setProperty("hoodie.payload.ordering.field", "timestamp");
props.setProperty(RECORD_MERGER_STRATEGY.key(),
RECORD_MERGER_STRATEGY.defaultValue());
- props.setProperty(PARTITION_FIELDS.key(),
metaClient.getTableConfig().getString(PARTITION_FIELDS.key()));
- String[] partitionValues = {partitionPaths[0]};
- HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<T>(
- getHoodieReaderContext(partitionValues),
+ if (metaClient.getTableConfig().contains(PARTITION_FIELDS)) {
+ props.setProperty(PARTITION_FIELDS.key(),
metaClient.getTableConfig().getString(PARTITION_FIELDS));
+ }
+ String[] partitionValues = partitionPaths[0].isEmpty() ? new String[] {} :
new String[] {partitionPaths[0]};
+ HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(
+ getHoodieReaderContext(tablePath, partitionValues),
hadoopConf,
- basePath,
+ tablePath,
metaClient.getActiveTimeline().lastInstant().get().getTimestamp(),
fileSlice.getBaseFile(),
logFilePathList.isEmpty() ? Option.empty() :
Option.of(logFilePathList),
@@ -149,6 +156,6 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
}
fileGroupReader.close();
- validateRecordsInFileGroup(actualRecordList, avroSchema,
fileSlice.getFileId());
+ validateRecordsInFileGroup(tablePath, actualRecordList, avroSchema,
fileSlice.getFileId());
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 50056954a12..a1441be5ddc 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -74,16 +74,21 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
tempDir.toAbsolutePath.toUri.toString
}
- override def getHoodieReaderContext(partitionValues: Array[String]):
HoodieReaderContext[InternalRow] = {
+ override def getHoodieReaderContext(tablePath: String,
+ partitionValues: Array[String]):
HoodieReaderContext[InternalRow] = {
val parquetFileFormat = new ParquetFileFormat
- val metaClient =
HoodieTableMetaClient.builder.setConf(getHadoopConf).setBasePath(getBasePath).build
+ val metaClient =
HoodieTableMetaClient.builder.setConf(getHadoopConf).setBasePath(tablePath).build
val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema
val structTypeSchema =
AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
val partitionFields = metaClient.getTableConfig.getPartitionFields
- val partitionSchema = new StructType(structTypeSchema.fields.filter(f =>
partitionFields.get().contains(f.name)))
+ val partitionSchema = if (partitionFields.isPresent) {
+ new StructType(structTypeSchema.fields.filter(f =>
partitionFields.get().contains(f.name)))
+ } else {
+ new StructType()
+ }
val recordReaderIterator =
parquetFileFormat.buildReaderWithPartitionValues(
- spark, structTypeSchema, partitionSchema, structTypeSchema, Seq.empty,
Map.empty, getHadoopConf)
+ spark, structTypeSchema, partitionSchema, structTypeSchema, Seq.empty,
Map.empty, getHadoopConf)
val numPartitionFields = if (partitionFields.isPresent)
partitionFields.get().length else 0
assertEquals(numPartitionFields, partitionValues.length)
@@ -98,6 +103,7 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
override def commitToTable(recordList: util.List[String], operation: String,
options: util.Map[String, String]): Unit = {
val inputDF: Dataset[Row] =
spark.read.json(spark.sparkContext.parallelize(recordList.toList, 2))
+
inputDF.write.format("hudi")
.options(options)
.option("hoodie.compact.inline", "false") // else fails due to
compaction & deltacommit instant times being same
@@ -108,11 +114,12 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
.save(getBasePath)
}
- override def validateRecordsInFileGroup(actualRecordList:
util.List[InternalRow],
+ override def validateRecordsInFileGroup(basePath: String,
+ actualRecordList:
util.List[InternalRow],
schema: Schema,
fileGroupId: String): Unit = {
val expectedDf = spark.read.format("hudi")
- .load(getBasePath)
+ .load(basePath)
.where(col(HoodieRecord.FILENAME_METADATA_FIELD).contains(fileGroupId))
assertEquals(expectedDf.count, actualRecordList.size)
val actualDf = HoodieUnsafeUtils.createDataFrameFromInternalRows(
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
index 4e6232fe3fe..f1375f0749d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartialUpdateForMergeInto.scala
@@ -18,7 +18,12 @@
package org.apache.spark.sql.hudi
import org.apache.avro.Schema
+import org.apache.hudi.DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT
+import org.apache.hudi.DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES
+import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.avro.HoodieAvroUtils
+import
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED
+import
org.apache.hudi.common.config.HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
import org.apache.hudi.common.engine.HoodieLocalEngineContext
import org.apache.hudi.common.function.SerializableFunctionUnchecked
@@ -30,9 +35,9 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestUtils.{getDefaultHadoopConf,
getLogFileListFromFileSlice}
import
org.apache.hudi.config.HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT
import org.apache.hudi.metadata.HoodieTableMetadata
-import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import java.io.File
import java.util.{Collections, List}
import scala.collection.JavaConverters._
@@ -40,108 +45,84 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
test("Test Partial Update") {
withTempDir { tmp =>
- Seq("cow", "mor").foreach { tableType =>
- val tableName = generateTableName
- val basePath = tmp.getCanonicalPath + "/" + tableName
- spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
- spark.sql(s"set
${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | _ts long
- |) using hudi
- |tblproperties(
- | type ='$tableType',
- | primaryKey = 'id',
- | preCombineField = '_ts'
- |)
- |location '$basePath'
- """.stripMargin)
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-
- spark.sql(
- s"""
- |merge into $tableName t0
- |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as ts )
s0
- |on t0.id = s0.id
- |when matched then update set price = s0.price, _ts = s0.ts
- |""".stripMargin)
- if (tableType.equals("cow")) {
- checkAnswer(s"select id, name, price, _ts from $tableName")(
- Seq(1, "a1", 12.0, 1001)
- )
- } else {
- // TODO(HUDI-6801): validate data once merging of partial updates is
implemented
- // Validate the log file
- val hadoopConf = getDefaultHadoopConf
- val metaClient: HoodieTableMetaClient =
-
HoodieTableMetaClient.builder.setConf(hadoopConf).setBasePath(basePath).build
- val metadataConfig = HoodieMetadataConfig.newBuilder.build
- val engineContext = new HoodieLocalEngineContext(hadoopConf)
- val viewManager: FileSystemViewManager =
FileSystemViewManager.createViewManager(
- engineContext, metadataConfig,
FileSystemViewStorageConfig.newBuilder.build,
- HoodieCommonConfig.newBuilder.build,
- new SerializableFunctionUnchecked[HoodieTableMetaClient,
HoodieTableMetadata] {
- override def apply(v1: HoodieTableMetaClient):
HoodieTableMetadata = {
- HoodieTableMetadata.create(
- engineContext, metadataConfig,
metaClient.getBasePathV2.toString)
- }
- }
- )
- val fsView: SyncableFileSystemView =
viewManager.getFileSystemView(metaClient)
- val fileSlice: FileSlice = fsView.getAllFileSlices("").findFirst.get
- val logFilePathList: List[String] =
getLogFileListFromFileSlice(fileSlice)
- Collections.sort(logFilePathList)
- assertEquals(1, logFilePathList.size)
-
- val avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema
- val logReader = new HoodieLogFileReader(
- metaClient.getFs, new HoodieLogFile(logFilePathList.get(0)),
- avroSchema, 1024 * 1024, true, false, false,
- "id", null)
- assertTrue(logReader.hasNext)
- val logBlockHeader = logReader.next().getLogBlockHeader
- assertTrue(logBlockHeader.containsKey(HeaderMetadataType.SCHEMA))
- assertTrue(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
- val partialSchema = new
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
- val expectedPartialSchema =
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.generateProjectionSchema(
- avroSchema, Seq("price", "_ts").asJava), false)
- assertEquals(expectedPartialSchema, partialSchema)
-
assertTrue(logBlockHeader.get(HeaderMetadataType.IS_PARTIAL).toBoolean)
- }
+ testPartialUpdate(tmp, "cow", "avro")
+ testPartialUpdate(tmp, "mor", "avro")
+ testPartialUpdate(tmp, "mor", "parquet")
+ }
+ }
- val tableName2 = generateTableName
- spark.sql(
- s"""
- |create table $tableName2 (
- | id int,
- | name string,
- | price double
- |) using hudi
- |tblproperties(
- | type ='$tableType',
- | primaryKey = 'id'
- |)
- |location '${tmp.getCanonicalPath}/$tableName2'
- """.stripMargin)
- spark.sql(s"insert into $tableName2 values(1, 'a1', 10)")
-
- spark.sql(
- s"""
- |merge into $tableName2 t0
- |using ( select 1 as id, 'a1' as name, 12 as price) s0
- |on t0.id = s0.id
- |when matched then update set price = s0.price
- |""".stripMargin)
- if (tableType.equals("cow")) {
- checkAnswer(s"select id, name, price from $tableName2")(
- Seq(1, "a1", 12.0)
- )
- }
- }
+ def testPartialUpdate(tmp: File,
+ tableType: String,
+ logDataBlockFormat: String): Unit = {
+ val tableName = generateTableName
+ val basePath = tmp.getCanonicalPath + "/" + tableName
+ spark.sql(s"set ${MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key} = 0")
+ spark.sql(s"set ${ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = true")
+ spark.sql(s"set ${LOGFILE_DATA_BLOCK_FORMAT.key} = $logDataBlockFormat")
+ spark.sql(s"set ${FILE_GROUP_READER_ENABLED.key} = true")
+ spark.sql(s"set ${USE_NEW_HUDI_PARQUET_FILE_FORMAT.key} = true")
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | _ts long
+ |) using hudi
+ |tblproperties(
+ | type ='$tableType',
+ | primaryKey = 'id',
+ | preCombineField = '_ts'
+ |)
+ |location '$basePath'
+ """.stripMargin)
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+
+ spark.sql(
+ s"""
+ |merge into $tableName t0
+ |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as ts ) s0
+ |on t0.id = s0.id
+ |when matched then update set price = s0.price, _ts = s0.ts
+ |""".stripMargin)
+
+ checkAnswer(s"select id, name, price, _ts from $tableName")(
+ Seq(1, "a1", 12.0, 1001)
+ )
+
+ if (tableType.equals("mor")) {
+ validateLogBlock(basePath, 1, Seq("price", "_ts"))
+ }
+
+ if (tableType.equals("cow")) {
+ // No preCombine field
+ val tableName2 = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName2 (
+ | id int,
+ | name string,
+ | price double
+ |) using hudi
+ |tblproperties(
+ | type ='$tableType',
+ | primaryKey = 'id'
+ |)
+ |location '${tmp.getCanonicalPath}/$tableName2'
+ """.stripMargin)
+ spark.sql(s"insert into $tableName2 values(1, 'a1', 10)")
+
+ spark.sql(
+ s"""
+ |merge into $tableName2 t0
+ |using ( select 1 as id, 'a1' as name, 12 as price) s0
+ |on t0.id = s0.id
+ |when matched then update set price = s0.price
+ |""".stripMargin)
+
+ checkAnswer(s"select id, name, price from $tableName2")(
+ Seq(1, "a1", 12.0)
+ )
}
}
@@ -190,4 +171,44 @@ class TestPartialUpdateForMergeInto extends
HoodieSparkSqlTestBase {
| preCombineField = '_ts'
|)""".stripMargin)
}
+
+ def validateLogBlock(basePath: String,
+ expectedNumLogFile: Int,
+ changedFields: Seq[String]): Unit = {
+ val hadoopConf = getDefaultHadoopConf
+ val metaClient: HoodieTableMetaClient =
+
HoodieTableMetaClient.builder.setConf(hadoopConf).setBasePath(basePath).build
+ val metadataConfig = HoodieMetadataConfig.newBuilder.build
+ val engineContext = new HoodieLocalEngineContext(hadoopConf)
+ val viewManager: FileSystemViewManager =
FileSystemViewManager.createViewManager(
+ engineContext, metadataConfig,
FileSystemViewStorageConfig.newBuilder.build,
+ HoodieCommonConfig.newBuilder.build,
+ new SerializableFunctionUnchecked[HoodieTableMetaClient,
HoodieTableMetadata] {
+ override def apply(v1: HoodieTableMetaClient): HoodieTableMetadata = {
+ HoodieTableMetadata.create(
+ engineContext, metadataConfig, metaClient.getBasePathV2.toString)
+ }
+ }
+ )
+ val fsView: SyncableFileSystemView =
viewManager.getFileSystemView(metaClient)
+ val fileSlice: FileSlice = fsView.getAllFileSlices("").findFirst.get
+ val logFilePathList: List[String] = getLogFileListFromFileSlice(fileSlice)
+ Collections.sort(logFilePathList)
+ assertEquals(expectedNumLogFile, logFilePathList.size)
+
+ val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema
+ val logReader = new HoodieLogFileReader(
+ metaClient.getFs, new HoodieLogFile(logFilePathList.get(0)),
+ avroSchema, 1024 * 1024, true, false, false,
+ "id", null)
+ assertTrue(logReader.hasNext)
+ val logBlockHeader = logReader.next().getLogBlockHeader
+ assertTrue(logBlockHeader.containsKey(HeaderMetadataType.SCHEMA))
+ assertTrue(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
+ val partialSchema = new
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
+ val expectedPartialSchema =
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.generateProjectionSchema(
+ avroSchema, changedFields.asJava), false)
+ assertEquals(expectedPartialSchema, partialSchema)
+ assertTrue(logBlockHeader.get(HeaderMetadataType.IS_PARTIAL).toBoolean)
+ }
}