This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b031e95af03 [HUDI-6217] Spark reads should skip record with delete
operation metadata (#10219)
b031e95af03 is described below
commit b031e95af03db196e5168226a6f19366eebcb189
Author: Jing Zhang <[email protected]>
AuthorDate: Sat Dec 2 11:38:29 2023 +0800
[HUDI-6217] Spark reads should skip record with delete operation metadata
(#10219)
---
.../src/main/scala/org/apache/hudi/Iterators.scala | 66 ++++++--
.../apache/hudi/TestDataSourceReadWithDeletes.java | 181 +++++++++++++++++++++
2 files changed, 235 insertions(+), 12 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
index a2ad13ada2e..540825afdb8 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala
@@ -32,7 +32,7 @@ import org.apache.hudi.common.engine.{EngineType,
HoodieLocalEngineContext}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-import org.apache.hudi.common.model._
+import org.apache.hudi.common.model.{HoodieSparkRecord, _}
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
import org.apache.hudi.common.util.{ConfigUtils, FileIOUtils,
HoodieRecordUtils}
import org.apache.hudi.config.HoodiePayloadConfig
@@ -108,6 +108,29 @@ class LogFileIterator(logFiles: List[HoodieLogFile],
maxCompactionMemoryInBytes, config, internalSchema)
}
+ private val (hasOperationField, operationFieldPos) = {
+ val operationField =
logFileReaderAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD)
+ if (operationField != null) {
+ (true, operationField.pos())
+ } else {
+ (false, -1)
+ }
+ }
+
+ protected def isDeleteOperation(r: InternalRow): Boolean = if
(hasOperationField) {
+ val operation = r.getString(operationFieldPos)
+ HoodieOperation.fromName(operation) == HoodieOperation.DELETE
+ } else {
+ false
+ }
+
+ protected def isDeleteOperation(r: GenericRecord): Boolean = if
(hasOperationField) {
+ val operation = r.get(operationFieldPos).toString
+ HoodieOperation.fromName(operation) == HoodieOperation.DELETE
+ } else {
+ false
+ }
+
def logRecordsPairIterator(): Iterator[(String, HoodieRecord[_])] = {
logRecords.iterator
}
@@ -134,12 +157,22 @@ class LogFileIterator(logFiles: List[HoodieLogFile],
logRecordsIterator.hasNext && {
logRecordsIterator.next() match {
case Some(r: HoodieAvroIndexedRecord) =>
- val projectedAvroRecord =
requiredSchemaAvroProjection(r.getData.asInstanceOf[GenericRecord])
- nextRecord = deserialize(projectedAvroRecord)
- true
+ val data = r.getData.asInstanceOf[GenericRecord]
+ if (isDeleteOperation(data)) {
+ this.hasNextInternal
+ } else {
+ val projectedAvroRecord = requiredSchemaAvroProjection(data)
+ nextRecord = deserialize(projectedAvroRecord)
+ true
+ }
case Some(r: HoodieSparkRecord) =>
- nextRecord = requiredSchemaRowProjection(r.getData)
- true
+ val data = r.getData
+ if (isDeleteOperation(data)) {
+ this.hasNextInternal
+ } else {
+ nextRecord = requiredSchemaRowProjection(data)
+ true
+ }
case None => this.hasNextInternal
}
}
@@ -272,18 +305,27 @@ class RecordMergingFileIterator(logFiles:
List[HoodieLogFile],
val curRecord = new HoodieSparkRecord(curRow, readerSchema)
val result = recordMerger.merge(curRecord, baseFileReaderAvroSchema,
newRecord, logFileReaderAvroSchema, payloadProps)
toScalaOption(result)
- .map { r =>
- val schema = HoodieInternalRowUtils.getCachedSchema(r.getRight)
- val projection =
HoodieInternalRowUtils.getCachedUnsafeProjection(schema, structTypeSchema)
- projection.apply(r.getLeft.getData.asInstanceOf[InternalRow])
+ .flatMap { r =>
+ val data = r.getLeft.getData.asInstanceOf[InternalRow]
+ if (isDeleteOperation(data)) {
+ None
+ } else {
+ val schema = HoodieInternalRowUtils.getCachedSchema(r.getRight)
+ val projection =
HoodieInternalRowUtils.getCachedUnsafeProjection(schema, structTypeSchema)
+ Some(projection.apply(data))
+ }
}
case _ =>
val curRecord = new HoodieAvroIndexedRecord(serialize(curRow))
val result = recordMerger.merge(curRecord, baseFileReaderAvroSchema,
newRecord, logFileReaderAvroSchema, payloadProps)
toScalaOption(result)
- .map { r =>
+ .flatMap { r =>
val avroRecord = r.getLeft.toIndexedRecord(r.getRight,
payloadProps).get.getData.asInstanceOf[GenericRecord]
- deserialize(requiredSchemaAvroProjection(avroRecord))
+ if (isDeleteOperation(avroRecord)) {
+ None
+ } else {
+ Some(deserialize(requiredSchemaAvroProjection(avroRecord)))
+ }
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceReadWithDeletes.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceReadWithDeletes.java
new file mode 100644
index 00000000000..4192a47d51d
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceReadWithDeletes.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.marker.MarkerType;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieLayoutConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner;
+import org.apache.hudi.table.storage.HoodieStorageLayout;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.table.HoodieTableConfig.TYPE;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Tag("functional")
+public class TestDataSourceReadWithDeletes extends
SparkClientFunctionalTestHarness {
+
+ String jsonSchema = "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"partialRecord\", \"namespace\":\"org.apache.hudi\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"_hoodie_commit_time\", \"type\": [\"null\",
\"string\"]},\n"
+ + " {\"name\": \"_hoodie_commit_seqno\", \"type\": [\"null\",
\"string\"]},\n"
+ + " {\"name\": \"_hoodie_record_key\", \"type\": [\"null\",
\"string\"]},\n"
+ + " {\"name\": \"_hoodie_partition_path\", \"type\": [\"null\",
\"string\"]},\n"
+ + " {\"name\": \"_hoodie_file_name\", \"type\": [\"null\",
\"string\"]},\n"
+ + " {\"name\": \"_hoodie_operation\", \"type\": [\"null\",
\"string\"]},\n"
+ + " {\"name\": \"id\", \"type\": [\"null\", \"string\"]},\n"
+ + " {\"name\": \"name\", \"type\": [\"null\", \"string\"]},\n"
+ + " {\"name\": \"age\", \"type\": [\"null\", \"int\"]},\n"
+ + " {\"name\": \"ts\", \"type\": [\"null\", \"long\"]},\n"
+ + " {\"name\": \"part\", \"type\": [\"null\", \"string\"]}\n"
+ + " ]\n"
+ + "}";
+
+ private Schema schema;
+ private HoodieTableMetaClient metaClient;
+
+ @BeforeEach
+ public void setUp() {
+ schema = new Schema.Parser().parse(jsonSchema);
+ }
+
+ @Test
+ public void test() throws Exception {
+ HoodieWriteConfig config = createHoodieWriteConfig();
+ metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ,
config.getProps());
+
+ String[] dataset1 = new String[] {"I,id1,Danny,23,1,par1",
"I,id2,Tony,20,1,par1"};
+ SparkRDDWriteClient client = getHoodieWriteClient(config);
+ String insertTime1 = client.createNewInstantTime();
+ List<WriteStatus> writeStatuses1 = writeData(client, insertTime1,
dataset1);
+ client.commit(insertTime1, jsc().parallelize(writeStatuses1));
+
+ String[] dataset2 = new String[] {
+ "I,id1,Danny,30,2,par1",
+ "D,id2,Tony,20,2,par1",
+ "I,id3,Julian,40,2,par1",
+ "D,id4,Stephan,35,2,par1"};
+ String insertTime2 = client.createNewInstantTime();
+ List<WriteStatus> writeStatuses2 = writeData(client, insertTime2,
dataset2);
+ client.commit(insertTime2, jsc().parallelize(writeStatuses2));
+
+ List<Row> rows = spark().read().format("org.apache.hudi")
+ .option("hoodie.datasource.query.type", "snapshot")
+ .load(config.getBasePath() + "/*/*")
+ .select("id", "name", "age", "ts", "part")
+ .collectAsList();
+ assertEquals(2, rows.size());
+ String[] expected = new String[] {
+ "[id1,Danny,30,2,par1]",
+ "[id3,Julian,40,2,par1]"};
+ assertArrayEquals(expected,
rows.stream().map(Row::toString).sorted().toArray(String[]::new));
+ }
+
+ private HoodieWriteConfig createHoodieWriteConfig() {
+ Properties props = getPropertiesForKeyGen(true);
+ props.put(TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
+ String basePath = basePath();
+ return HoodieWriteConfig.newBuilder()
+ .forTable("test")
+ .withPath(basePath)
+ .withSchema(jsonSchema)
+ .withParallelism(2, 2)
+ .withAutoCommit(false)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withStorageConfig(HoodieStorageConfig.newBuilder()
+ .parquetMaxFileSize(1024).build())
+ .withLayoutConfig(HoodieLayoutConfig.newBuilder()
+ .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
+
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
+ .withIndexConfig(HoodieIndexConfig.newBuilder()
+ .fromProperties(props)
+ .withIndexType(HoodieIndex.IndexType.BUCKET)
+ .withBucketNum("1")
+ .build())
+ .withPopulateMetaFields(true)
+ .withAllowOperationMetadataField(true)
+ // Timeline-server-based markers are not used for multi-writer tests
+ .withMarkersType(MarkerType.DIRECT.name())
+ .build();
+ }
+
+ private List<WriteStatus> writeData(
+ SparkRDDWriteClient client,
+ String instant,
+ String[] records) {
+ List<HoodieRecord> recordList = str2HoodieRecord(records);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(recordList, 2);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ client.startCommitWithTime(instant);
+ List<WriteStatus> writeStatuses = client.upsert(writeRecords,
instant).collect();
+ org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ return writeStatuses;
+ }
+
+ private List<HoodieRecord> str2HoodieRecord(String[] records) {
+ return Stream.of(records).map(rawRecordStr -> {
+ String[] parts = rawRecordStr.split(",");
+ boolean isDelete = parts[0].equalsIgnoreCase("D");
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("id", parts[1]);
+ record.put("name", parts[2]);
+ record.put("age", Integer.parseInt(parts[3]));
+ record.put("ts", Long.parseLong(parts[4]));
+ record.put("part", parts[5]);
+ OverwriteWithLatestAvroPayload payload = new
OverwriteWithLatestAvroPayload(record, (Long) record.get("ts"));
+ return new HoodieAvroRecord<>(
+ new HoodieKey((String) record.get("id"), (String)
record.get("part")),
+ payload,
+ isDelete ? HoodieOperation.DELETE : HoodieOperation.INSERT);
+ }).collect(Collectors.toList());
+ }
+}