This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 90d678c0f7 [HUDI-3478][HUDI-4887] Use Avro as the format of persisted
cdc data (#6734)
90d678c0f7 is described below
commit 90d678c0f74b4c2e19085d7332325dd2dd0c7517
Author: Yann Byron <[email protected]>
AuthorDate: Fri Sep 23 01:33:19 2022 +0800
[HUDI-3478][HUDI-4887] Use Avro as the format of persisted cdc data (#6734)
---
.../java/org/apache/hudi/io/HoodieCDCLogger.java | 36 +--
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 13 +-
.../java/org/apache/hudi/avro/AvroSchemaUtils.java | 8 +-
.../hudi/common/table/cdc/HoodieCDCUtils.java | 102 ++++---
.../common/functional/TestHoodieLogFormat.java | 59 +++-
.../functional/cdc/TestCDCDataFrameSuite.scala | 324 +++++++++++++++++++++
6 files changed, 448 insertions(+), 94 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index c93489d890..e4f1e14252 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -102,16 +102,11 @@ public class HoodieCDCLogger implements Closeable {
this.cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse(
config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));
- if
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER))
{
- this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA;
- this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_STRING;
- } else if
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE))
{
- this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE;
- this.cdcSchemaString =
HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING;
- } else {
- this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY;
- this.cdcSchemaString =
HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY_STRING;
- }
+ this.cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
+ cdcSupplementalLoggingMode,
+ dataSchema
+ );
+ this.cdcSchemaString = this.cdcSchema.toString();
this.cdcData = new ExternalSpillableMap<>(
maxInMemorySizeInBytes,
@@ -158,18 +153,21 @@ public class HoodieCDCLogger implements Closeable {
GenericRecord newRecord) {
GenericData.Record record;
if
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER))
{
- record = HoodieCDCUtils.cdcRecord(operation.getValue(), commitTime,
+ record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(),
commitTime,
removeCommitMetadata(oldRecord), newRecord);
} else if
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE))
{
- record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey,
+ record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(),
recordKey,
removeCommitMetadata(oldRecord));
} else {
- record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey);
+ record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(),
recordKey);
}
return record;
}
private GenericRecord removeCommitMetadata(GenericRecord record) {
+ if (record == null) {
+ return null;
+ }
return HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, new
HashMap<>());
}
@@ -221,18 +219,6 @@ public class HoodieCDCLogger implements Closeable {
}
}
- public static Option<AppendResult> writeCDCDataIfNeeded(HoodieCDCLogger
cdcLogger,
- long recordsWritten,
- long
insertRecordsWritten) {
- if (cdcLogger == null || recordsWritten == 0L || (recordsWritten ==
insertRecordsWritten)) {
- // the following cases where we do not need to write out the cdc file:
- // case 1: all the data from the previous file slice are deleted. and no
new data is inserted;
- // case 2: all the data are new-coming,
- return Option.empty();
- }
- return cdcLogger.writeCDCData();
- }
-
public static void setCDCStatIfNeeded(HoodieWriteStat stat,
Option<AppendResult> cdcResult,
String partitionPath,
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index da6b1c6071..442256ade3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -425,6 +425,16 @@ public class HoodieMergeHandle<T extends
HoodieRecordPayload, I, K, O> extends H
}
}
+ private Option<AppendResult> writeCDCDataIfNeeded() {
+ if (cdcLogger == null || recordsWritten == 0L || (recordsWritten ==
insertRecordsWritten)) {
+ // the following cases where we do not need to write out the cdc file:
+ // case 1: all the data from the previous file slice are deleted. and no
new data is inserted;
+ // case 2: all the data are new-coming,
+ return Option.empty();
+ }
+ return cdcLogger.writeCDCData();
+ }
+
@Override
public List<WriteStatus> close() {
try {
@@ -445,8 +455,7 @@ public class HoodieMergeHandle<T extends
HoodieRecordPayload, I, K, O> extends H
}
// if there are cdc data written, set the CDC-related information.
- Option<AppendResult> cdcResult =
- HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten,
insertRecordsWritten);
+ Option<AppendResult> cdcResult = writeCDCDataIfNeeded();
HoodieCDCLogger.setCDCStatIfNeeded(stat, cdcResult, partitionPath, fs);
long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index d45cfe0351..6a87cc3c29 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -105,8 +105,12 @@ public class AvroSchemaUtils {
* wrapping around provided target non-null type
*/
public static Schema createNullableSchema(Schema.Type avroType) {
- checkState(avroType != Schema.Type.NULL);
- return Schema.createUnion(Schema.create(Schema.Type.NULL),
Schema.create(avroType));
+ return createNullableSchema(Schema.create(avroType));
+ }
+
+ public static Schema createNullableSchema(Schema schema) {
+ checkState(schema.getType() != Schema.Type.NULL);
+ return Schema.createUnion(Schema.create(Schema.Type.NULL), schema);
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
index 3cf8315a54..a741181d4d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
@@ -18,12 +18,17 @@
package org.apache.hudi.common.table.cdc;
+import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.exception.HoodieException;
+import java.util.Arrays;
+import java.util.List;
+
public class HoodieCDCUtils {
public static final String CDC_LOGFILE_SUFFIX = "-cdc";
@@ -50,33 +55,6 @@ public class HoodieCDCUtils {
CDC_AFTER_IMAGE
};
- /**
- * This is the standard CDC output format.
- * Also, this is the schema of cdc log file in the case
`hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'.
- */
- public static final String CDC_SCHEMA_STRING =
"{\"type\":\"record\",\"name\":\"Record\","
- + "\"fields\":["
- + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]},"
- + "{\"name\":\"ts_ms\",\"type\":[\"string\",\"null\"]},"
- + "{\"name\":\"before\",\"type\":[\"string\",\"null\"]},"
- + "{\"name\":\"after\",\"type\":[\"string\",\"null\"]}"
- + "]}";
-
- public static final Schema CDC_SCHEMA = new
Schema.Parser().parse(CDC_SCHEMA_STRING);
-
- /**
- * The schema of cdc log file in the case
`hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'.
- */
- public static final String CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING =
"{\"type\":\"record\",\"name\":\"Record\","
- + "\"fields\":["
- + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]},"
- + "{\"name\":\"record_key\",\"type\":[\"string\",\"null\"]},"
- + "{\"name\":\"before\",\"type\":[\"string\",\"null\"]}"
- + "]}";
-
- public static final Schema CDC_SCHEMA_OP_RECORDKEY_BEFORE =
- new Schema.Parser().parse(CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING);
-
/**
* The schema of cdc log file in the case
`hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'.
*/
@@ -89,32 +67,50 @@ public class HoodieCDCUtils {
public static final Schema CDC_SCHEMA_OP_AND_RECORDKEY =
new Schema.Parser().parse(CDC_SCHEMA_OP_AND_RECORDKEY_STRING);
- public static final Schema
schemaBySupplementalLoggingMode(HoodieCDCSupplementalLoggingMode
supplementalLoggingMode) {
- switch (supplementalLoggingMode) {
- case WITH_BEFORE_AFTER:
- return CDC_SCHEMA;
- case WITH_BEFORE:
- return CDC_SCHEMA_OP_RECORDKEY_BEFORE;
- case OP_KEY:
- return CDC_SCHEMA_OP_AND_RECORDKEY;
- default:
- throw new HoodieException("not support this supplemental logging mode:
" + supplementalLoggingMode);
+ public static Schema schemaBySupplementalLoggingMode(
+ HoodieCDCSupplementalLoggingMode supplementalLoggingMode,
+ Schema tableSchema) {
+ if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.OP_KEY) {
+ return CDC_SCHEMA_OP_AND_RECORDKEY;
+ } else if (supplementalLoggingMode ==
HoodieCDCSupplementalLoggingMode.WITH_BEFORE) {
+ return createCDCSchema(tableSchema, false);
+ } else if (supplementalLoggingMode ==
HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER) {
+ return createCDCSchema(tableSchema, true);
+ } else {
+ throw new HoodieException("not support this supplemental logging mode: "
+ supplementalLoggingMode);
}
}
+ private static Schema createCDCSchema(Schema tableSchema, boolean
withAfterImage) {
+ Schema imageSchema = AvroSchemaUtils.createNullableSchema(tableSchema);
+ Schema.Field opField = new Schema.Field(CDC_OPERATION_TYPE,
+ AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "",
JsonProperties.NULL_VALUE);
+ Schema.Field beforeField = new Schema.Field(
+ CDC_BEFORE_IMAGE, imageSchema, "", JsonProperties.NULL_VALUE);
+ List<Schema.Field> fields;
+ if (withAfterImage) {
+ Schema.Field tsField = new Schema.Field(CDC_COMMIT_TIMESTAMP,
+ AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "",
JsonProperties.NULL_VALUE);
+ Schema.Field afterField = new Schema.Field(
+ CDC_AFTER_IMAGE, imageSchema, "", JsonProperties.NULL_VALUE);
+ fields = Arrays.asList(opField, tsField, beforeField, afterField);
+ } else {
+ Schema.Field keyField = new Schema.Field(CDC_RECORD_KEY,
+ AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "",
JsonProperties.NULL_VALUE);
+ fields = Arrays.asList(opField, keyField, beforeField);
+ }
+
+ Schema mergedSchema = Schema.createRecord("CDC", null,
tableSchema.getNamespace(), false);
+ mergedSchema.setFields(fields);
+ return mergedSchema;
+ }
+
/**
* Build the cdc record which has all the cdc fields when
`hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'.
*/
- public static GenericData.Record cdcRecord(
- String op, String commitTime, GenericRecord before, GenericRecord after)
{
- String beforeJsonStr = recordToJson(before);
- String afterJsonStr = recordToJson(after);
- return cdcRecord(op, commitTime, beforeJsonStr, afterJsonStr);
- }
-
- public static GenericData.Record cdcRecord(
- String op, String commitTime, String before, String after) {
- GenericData.Record record = new GenericData.Record(CDC_SCHEMA);
+ public static GenericData.Record cdcRecord(Schema cdcSchema, String op,
String commitTime,
+ GenericRecord before,
GenericRecord after) {
+ GenericData.Record record = new GenericData.Record(cdcSchema);
record.put(CDC_OPERATION_TYPE, op);
record.put(CDC_COMMIT_TIMESTAMP, commitTime);
record.put(CDC_BEFORE_IMAGE, before);
@@ -125,20 +121,20 @@ public class HoodieCDCUtils {
/**
* Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is
'cdc_data_before'.
*/
- public static GenericData.Record cdcRecord(String op, String recordKey,
GenericRecord before) {
- GenericData.Record record = new
GenericData.Record(CDC_SCHEMA_OP_RECORDKEY_BEFORE);
+ public static GenericData.Record cdcRecord(Schema cdcSchema, String op,
+ String recordKey, GenericRecord
before) {
+ GenericData.Record record = new GenericData.Record(cdcSchema);
record.put(CDC_OPERATION_TYPE, op);
record.put(CDC_RECORD_KEY, recordKey);
- String beforeJsonStr = recordToJson(before);
- record.put(CDC_BEFORE_IMAGE, beforeJsonStr);
+ record.put(CDC_BEFORE_IMAGE, before);
return record;
}
/**
* Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is
'cdc_op_key'.
*/
- public static GenericData.Record cdcRecord(String op, String recordKey) {
- GenericData.Record record = new
GenericData.Record(CDC_SCHEMA_OP_AND_RECORDKEY);
+ public static GenericData.Record cdcRecord(Schema cdcSchema, String op,
String recordKey) {
+ GenericData.Record record = new GenericData.Record(cdcSchema);
record.put(CDC_OPERATION_TYPE, op);
record.put(CDC_RECORD_KEY, recordKey);
return record;
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index c037c79dd8..19552ed5a9 100755
---
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.functional;
import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
@@ -37,6 +38,7 @@ import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFileReader;
@@ -561,46 +563,79 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.withFs(fs)
.build();
- GenericRecord record1 = HoodieCDCUtils.cdcRecord("i", "100",
- null, "{\"uuid\": 1, \"name\": \"apple\"}, \"ts\": 1100}");
- GenericRecord record2 = HoodieCDCUtils.cdcRecord("u", "100",
- "{\"uuid\": 2, \"name\": \"banana\"}, \"ts\": 1000}",
- "{\"uuid\": 2, \"name\": \"blueberry\"}, \"ts\": 1100}");
- GenericRecord record3 = HoodieCDCUtils.cdcRecord("d", "100",
- "{\"uuid\": 3, \"name\": \"cherry\"}, \"ts\": 1000}", null);
+ String dataSchameString = "{\"type\":\"record\",\"name\":\"Record\","
+ + "\"fields\":["
+ + "{\"name\":\"uuid\",\"type\":[\"int\",\"null\"]},"
+ + "{\"name\":\"name\",\"type\":[\"string\",\"null\"]},"
+ + "{\"name\":\"ts\",\"type\":[\"long\",\"null\"]}"
+ + "]}";
+ Schema dataSchema = new Schema.Parser().parse(dataSchameString);
+ Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
+ HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER, dataSchema);
+ GenericRecord insertedRecord = new GenericData.Record(dataSchema);
+ insertedRecord.put("uuid", 1);
+ insertedRecord.put("name", "apple");
+ insertedRecord.put("ts", 1100L);
+
+ GenericRecord updateBeforeImageRecord = new GenericData.Record(dataSchema);
+ updateBeforeImageRecord.put("uuid", 2);
+ updateBeforeImageRecord.put("name", "banana");
+ updateBeforeImageRecord.put("ts", 1000L);
+ GenericRecord updateAfterImageRecord = new GenericData.Record(dataSchema);
+ updateAfterImageRecord.put("uuid", 2);
+ updateAfterImageRecord.put("name", "blueberry");
+ updateAfterImageRecord.put("ts", 1100L);
+
+ GenericRecord deletedRecord = new GenericData.Record(dataSchema);
+ deletedRecord.put("uuid", 3);
+ deletedRecord.put("name", "cherry");
+ deletedRecord.put("ts", 1000L);
+
+ GenericRecord record1 = HoodieCDCUtils.cdcRecord(cdcSchema, "i", "100",
+ null, insertedRecord);
+ GenericRecord record2 = HoodieCDCUtils.cdcRecord(cdcSchema, "u", "100",
+ updateBeforeImageRecord, updateAfterImageRecord);
+ GenericRecord record3 = HoodieCDCUtils.cdcRecord(cdcSchema, "d", "100",
+ deletedRecord, null);
List<IndexedRecord> records = new ArrayList<>(Arrays.asList(record1,
record2, record3));
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
- header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
HoodieCDCUtils.CDC_SCHEMA_STRING);
+ header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, cdcSchema.toString());
HoodieDataBlock dataBlock =
getDataBlock(HoodieLogBlockType.CDC_DATA_BLOCK, records, header);
writer.appendBlock(dataBlock);
writer.close();
- Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(),
HoodieCDCUtils.CDC_SCHEMA);
+ Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(),
cdcSchema);
assertTrue(reader.hasNext());
HoodieLogBlock block = reader.next();
HoodieDataBlock dataBlockRead = (HoodieDataBlock) block;
List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
assertEquals(3, recordsRead.size(),
"Read records size should be equal to the written records size");
- assertEquals(dataBlockRead.getSchema(), HoodieCDCUtils.CDC_SCHEMA);
+ assertEquals(dataBlockRead.getSchema(), cdcSchema);
GenericRecord insert = (GenericRecord) recordsRead.stream()
.filter(record ->
record.get(0).toString().equals("i")).findFirst().get();
assertNull(insert.get("before"));
assertNotNull(insert.get("after"));
+ assertEquals(((GenericRecord) insert.get("after")).get("name").toString(),
"apple");
GenericRecord update = (GenericRecord) recordsRead.stream()
.filter(record ->
record.get(0).toString().equals("u")).findFirst().get();
assertNotNull(update.get("before"));
assertNotNull(update.get("after"));
- assertTrue(update.get("before").toString().contains("banana"));
- assertTrue(update.get("after").toString().contains("blueberry"));
+ GenericRecord uBefore = (GenericRecord) update.get("before");
+ GenericRecord uAfter = (GenericRecord) update.get("after");
+ assertEquals(String.valueOf(uBefore.get("name")), "banana");
+ assertEquals(Long.valueOf(uBefore.get("ts").toString()), 1000L);
+ assertEquals(String.valueOf(uAfter.get("name")), "blueberry");
+ assertEquals(Long.valueOf(uAfter.get("ts").toString()), 1100L);
GenericRecord delete = (GenericRecord) recordsRead.stream()
.filter(record ->
record.get(0).toString().equals("d")).findFirst().get();
assertNotNull(delete.get("before"));
assertNull(delete.get("after"));
+ assertEquals(((GenericRecord)
delete.get("before")).get("name").toString(), "cherry");
reader.close();
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
new file mode 100644
index 0000000000..6f0731578d
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional.cdc
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, IndexedRecord}
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieKey,
HoodieLogFile, HoodieRecord}
+import org.apache.hudi.common.table.cdc.{HoodieCDCOperation,
HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
+import org.apache.hudi.common.table.log.HoodieLogFormat
+import org.apache.hudi.common.table.log.block.{HoodieDataBlock}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.common.testutils.RawTripTestPayload
+import
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings,
recordsToStrings}
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieWriteConfig}
+import org.apache.hudi.testutils.HoodieClientTestBase
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.SaveMode
+
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotEquals, assertNull, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.CsvSource
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+class TestCDCDataFrameSuite extends HoodieClientTestBase {
+
+ var spark: SparkSession = _
+
+ val commonOpts = Map(
+ HoodieTableConfig.CDC_ENABLED.key -> "true",
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+ "hoodie.delete.shuffle.parallelism" -> "1",
+ RECORDKEY_FIELD.key -> "_row_key",
+ PRECOMBINE_FIELD.key -> "timestamp",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1",
+ HoodieCleanConfig.AUTO_CLEAN.key -> "false"
+ )
+
+ @BeforeEach override def setUp(): Unit = {
+ setTableName("hoodie_test")
+ initPath()
+ initSparkContexts()
+ spark = sqlContext.sparkSession
+ initTestDataGenerator()
+ initFileSystem()
+ }
+
+ @AfterEach override def tearDown(): Unit = {
+ cleanupSparkContexts()
+ cleanupTestDataGenerator()
+ cleanupFileSystem()
+ }
+
+ @ParameterizedTest
+ @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after"))
+ def testCOWDataSourceWrite(cdcSupplementalLoggingMode: String): Unit = {
+ val options = commonOpts ++ Map(
+ HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key ->
cdcSupplementalLoggingMode
+ )
+
+ // Insert Operation
+ val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(options)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(spark.sessionState.newHadoopConf)
+ .build()
+ val instant1 = metaClient.reloadActiveTimeline.lastInstant().get()
+ assertEquals(spark.read.format("hudi").load(basePath).count(), 100)
+ // all the data is new-coming, it will write out cdc log files.
+ assertFalse(hasCDCLogFile(instant1))
+
+ val schemaResolver = new TableSchemaResolver(metaClient)
+ val dataSchema = schemaResolver.getTableAvroSchema(false)
+ val cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
+ HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode),
dataSchema)
+
+ // Upsert Operation
+ val hoodieRecords2 = dataGen.generateUniqueUpdates("001", 50)
+ val records2 = recordsToStrings(hoodieRecords2).toList
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ inputDF2.write.format("org.apache.hudi")
+ .options(options)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val instant2 = metaClient.reloadActiveTimeline.lastInstant().get()
+ assertEquals(spark.read.format("hudi").load(basePath).count(), 100)
+
+ // part of data are updated, it will write out cdc log files
+ assertTrue(hasCDCLogFile(instant2))
+ val cdcData2: Seq[IndexedRecord] =
getCDCLogFIle(instant2).flatMap(readCDCLogFile(_, cdcSchema))
+ // check the num of cdc data
+ assertEquals(cdcData2.size, 50)
+ // check op
+ assert(cdcData2.forall( r => r.get(0).toString == "u"))
+ // check record key, before, after according to the supplemental logging
mode
+ checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode, cdcSchema,
dataSchema,
+ cdcData2, hoodieRecords2, HoodieCDCOperation.UPDATE)
+
+ // Delete Operation
+ val hoodieKey3 = dataGen.generateUniqueDeletes(20)
+ val records3 = deleteRecordsToStrings(hoodieKey3).toList
+ val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2))
+ inputDF3.write.format("org.apache.hudi")
+ .options(options)
+ .option(OPERATION.key, DELETE_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val instant3 = metaClient.reloadActiveTimeline.lastInstant().get()
+ assertEquals(spark.read.format("hudi").load(basePath).count(), 80)
+
+ // part of data are deleted, it will write out cdc log files
+ assertTrue(hasCDCLogFile(instant3))
+ val cdcData3 = getCDCLogFIle(instant3).flatMap(readCDCLogFile(_,
cdcSchema))
+ // check the num of cdc data
+ assertEquals(cdcData3.size, 20)
+ // check op
+ assert(cdcData3.forall( r => r.get(0).toString == "d"))
+ // check record key, before, after according to the supplemental logging
mode
+ checkCDCDataForDelete(cdcSupplementalLoggingMode, cdcSchema, cdcData3,
hoodieKey3)
+ }
+
+ @ParameterizedTest
+ @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after"))
+ def testMORDataSourceWrite(cdcSupplementalLoggingMode: String): Unit = {
+ val options = commonOpts ++ Map(
+ TABLE_TYPE.key() -> MOR_TABLE_TYPE_OPT_VAL,
+ HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key ->
cdcSupplementalLoggingMode
+ )
+
+ // 1. Insert Operation
+ val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(options)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(spark.sessionState.newHadoopConf)
+ .build()
+
+ val schemaResolver = new TableSchemaResolver(metaClient)
+ val dataSchema = schemaResolver.getTableAvroSchema(false)
+ val cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
+ HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode),
dataSchema)
+
+ val instant1 = metaClient.reloadActiveTimeline.lastInstant().get()
+ // all the data is new-coming, it will NOT write out cdc log files.
+ assertFalse(hasCDCLogFile(instant1))
+
+ // 2. Upsert Operation
+ val records2_1 = recordsToStrings(dataGen.generateUniqueUpdates("001",
30)).toList
+ val inputDF2_1 =
spark.read.json(spark.sparkContext.parallelize(records2_1, 2))
+ val records2_2 = recordsToStrings(dataGen.generateInserts("001",
20)).toList
+ val inputDF2_2 =
spark.read.json(spark.sparkContext.parallelize(records2_2, 2))
+ inputDF2_1.union(inputDF2_2).write.format("org.apache.hudi")
+ .options(options)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val instant2 = metaClient.reloadActiveTimeline.lastInstant().get()
+
+ // part of data are updated, it will write out cdc log files
+ assertTrue(hasCDCLogFile(instant2))
+ val cdcData2 = getCDCLogFIle(instant2).flatMap(readCDCLogFile(_,
cdcSchema))
+ assertEquals(cdcData2.size, 50)
+ // check op
+ assertEquals(cdcData2.count(r => r.get(0).toString == "u"), 30)
+ assertEquals(cdcData2.count(r => r.get(0).toString == "i"), 20)
+
+ // 3. Delete Operation
+ val records3 =
deleteRecordsToStrings(dataGen.generateUniqueDeletes(20)).toList
+ val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2))
+ inputDF3.write.format("org.apache.hudi")
+ .options(options)
+ .option(OPERATION.key, DELETE_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val instant3 = metaClient.reloadActiveTimeline.lastInstant().get()
+ // in cases that there is log files, it will NOT write out cdc log files.
+ assertFalse(hasCDCLogFile(instant3))
+ }
+
+ /**
+ * whether this instant will create a cdc log file.
+ */
+ private def hasCDCLogFile(instant: HoodieInstant): Boolean = {
+ val commitMetadata = HoodieCommitMetadata.fromBytes(
+ metaClient.reloadActiveTimeline().getInstantDetails(instant).get(),
+ classOf[HoodieCommitMetadata]
+ )
+ val hoodieWriteStats = commitMetadata.getWriteStats.asScala
+ hoodieWriteStats.exists { hoodieWriteStat =>
+ val cdcPath = hoodieWriteStat.getCdcPath
+ cdcPath != null && cdcPath.nonEmpty
+ }
+ }
+
+ /**
+ * whether this instant will create a cdc log file.
+ */
+ private def getCDCLogFIle(instant: HoodieInstant): List[String] = {
+ val commitMetadata = HoodieCommitMetadata.fromBytes(
+ metaClient.reloadActiveTimeline().getInstantDetails(instant).get(),
+ classOf[HoodieCommitMetadata]
+ )
+ commitMetadata.getWriteStats.asScala.map(_.getCdcPath).toList
+ }
+
+ private def readCDCLogFile(relativeLogFile: String, cdcSchema: Schema):
List[IndexedRecord] = {
+ val logFile = new HoodieLogFile(
+ metaClient.getFs.getFileStatus(new Path(metaClient.getBasePathV2,
relativeLogFile)))
+ val reader = HoodieLogFormat.newReader(fs, logFile, cdcSchema);
+ assertTrue(reader.hasNext);
+
+ val block = reader.next().asInstanceOf[HoodieDataBlock];
+ block.getRecordIterator.asScala.toList
+ }
+
+ private def checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode: String,
+ cdcSchema: Schema,
+ dataSchema: Schema,
+ cdcRecords: Seq[IndexedRecord],
+ newHoodieRecords:
java.util.List[HoodieRecord[_]],
+ op: HoodieCDCOperation): Unit = {
+ val cdcRecord = cdcRecords.head.asInstanceOf[GenericRecord]
+ // check schema
+ assertEquals(cdcRecord.getSchema, cdcSchema)
+ if (cdcSupplementalLoggingMode == "cdc_op_key") {
+ // check record key
+ assert(cdcRecords.map(_.get(1).toString).sorted ==
newHoodieRecords.map(_.getKey.getRecordKey).sorted)
+ } else if (cdcSupplementalLoggingMode == "cdc_data_before") {
+ // check record key
+ assert(cdcRecords.map(_.get(1).toString).sorted ==
newHoodieRecords.map(_.getKey.getRecordKey).sorted)
+ // check before
+ if (op == HoodieCDCOperation.INSERT) {
+ assertNull(cdcRecord.get("before"))
+ } else {
+ val payload = newHoodieRecords.find(_.getKey.getRecordKey ==
cdcRecord.get("record_key").toString).get
+ .getData.asInstanceOf[RawTripTestPayload]
+ val genericRecord =
payload.getInsertValue(dataSchema).get.asInstanceOf[GenericRecord]
+ val cdcBeforeValue =
cdcRecord.get("before").asInstanceOf[GenericRecord]
+ assertNotEquals(genericRecord.get("begin_lat"),
cdcBeforeValue.get("begin_lat"))
+ }
+ } else {
+ val cdcBeforeValue = cdcRecord.get("before").asInstanceOf[GenericRecord]
+ val cdcAfterValue = cdcRecord.get("after").asInstanceOf[GenericRecord]
+ if (op == HoodieCDCOperation.INSERT) {
+ // check before
+ assertNull(cdcBeforeValue)
+ // check after
+ val payload = newHoodieRecords.find(_.getKey.getRecordKey ==
cdcAfterValue.get("_row_key").toString).get
+ .getData.asInstanceOf[RawTripTestPayload]
+ val genericRecord =
payload.getInsertValue(dataSchema).get.asInstanceOf[GenericRecord]
+ assertEquals(genericRecord.get("begin_lat"),
cdcAfterValue.get("begin_lat"))
+ } else {
+ val payload = newHoodieRecords.find(_.getKey.getRecordKey ==
cdcAfterValue.get("_row_key").toString).get
+ .getData.asInstanceOf[RawTripTestPayload]
+ val genericRecord =
payload.getInsertValue(dataSchema).get.asInstanceOf[GenericRecord]
+ // check before
+ assertNotEquals(genericRecord.get("begin_lat"),
cdcBeforeValue.get("begin_lat"))
+ // check after
+ assertEquals(genericRecord.get("begin_lat"),
cdcAfterValue.get("begin_lat"))
+ }
+ }
+ }
+
+ private def checkCDCDataForDelete(cdcSupplementalLoggingMode: String,
+ cdcSchema: Schema,
+ cdcRecords: Seq[IndexedRecord],
+ deletedKeys: java.util.List[HoodieKey]):
Unit = {
+ val cdcRecord = cdcRecords.head.asInstanceOf[GenericRecord]
+ // check schema
+ assertEquals(cdcRecord.getSchema, cdcSchema)
+ if (cdcSupplementalLoggingMode == "cdc_op_key") {
+ // check record key
+ assert(cdcRecords.map(_.get(1).toString).sorted ==
deletedKeys.map(_.getRecordKey).sorted)
+ } else if (cdcSupplementalLoggingMode == "cdc_data_before") {
+ // check record key
+ assert(cdcRecords.map(_.get(1).toString).sorted ==
deletedKeys.map(_.getRecordKey).sorted)
+ } else {
+ val cdcBeforeValue = cdcRecord.get("before").asInstanceOf[GenericRecord]
+ val cdcAfterValue = cdcRecord.get("after").asInstanceOf[GenericRecord]
+ // check before
+ assert(deletedKeys.exists(_.getRecordKey ==
cdcBeforeValue.get("_row_key").toString))
+ // check after
+ assertNull(cdcAfterValue)
+ }
+ }
+}