This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 90d678c0f7 [HUDI-3478][HUDI-4887] Use Avro as the format of persisted 
cdc data (#6734)
90d678c0f7 is described below

commit 90d678c0f74b4c2e19085d7332325dd2dd0c7517
Author: Yann Byron <[email protected]>
AuthorDate: Fri Sep 23 01:33:19 2022 +0800

    [HUDI-3478][HUDI-4887] Use Avro as the format of persisted cdc data (#6734)
---
 .../java/org/apache/hudi/io/HoodieCDCLogger.java   |  36 +--
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  13 +-
 .../java/org/apache/hudi/avro/AvroSchemaUtils.java |   8 +-
 .../hudi/common/table/cdc/HoodieCDCUtils.java      | 102 ++++---
 .../common/functional/TestHoodieLogFormat.java     |  59 +++-
 .../functional/cdc/TestCDCDataFrameSuite.scala     | 324 +++++++++++++++++++++
 6 files changed, 448 insertions(+), 94 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index c93489d890..e4f1e14252 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -102,16 +102,11 @@ public class HoodieCDCLogger implements Closeable {
       this.cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse(
           
config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));
 
-      if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER))
 {
-        this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA;
-        this.cdcSchemaString = HoodieCDCUtils.CDC_SCHEMA_STRING;
-      } else if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE))
 {
-        this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE;
-        this.cdcSchemaString = 
HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING;
-      } else {
-        this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY;
-        this.cdcSchemaString = 
HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY_STRING;
-      }
+      this.cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
+          cdcSupplementalLoggingMode,
+          dataSchema
+      );
+      this.cdcSchemaString = this.cdcSchema.toString();
 
       this.cdcData = new ExternalSpillableMap<>(
           maxInMemorySizeInBytes,
@@ -158,18 +153,21 @@ public class HoodieCDCLogger implements Closeable {
                                              GenericRecord newRecord) {
     GenericData.Record record;
     if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER))
 {
-      record = HoodieCDCUtils.cdcRecord(operation.getValue(), commitTime,
+      record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), 
commitTime,
           removeCommitMetadata(oldRecord), newRecord);
     } else if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE))
 {
-      record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey,
+      record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), 
recordKey,
           removeCommitMetadata(oldRecord));
     } else {
-      record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey);
+      record = HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), 
recordKey);
     }
     return record;
   }
 
   private GenericRecord removeCommitMetadata(GenericRecord record) {
+    if (record == null) {
+      return null;
+    }
     return HoodieAvroUtils.rewriteRecordWithNewSchema(record, dataSchema, new 
HashMap<>());
   }
 
@@ -221,18 +219,6 @@ public class HoodieCDCLogger implements Closeable {
     }
   }
 
-  public static Option<AppendResult> writeCDCDataIfNeeded(HoodieCDCLogger 
cdcLogger,
-                                                          long recordsWritten,
-                                                          long 
insertRecordsWritten) {
-    if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == 
insertRecordsWritten)) {
-      // the following cases where we do not need to write out the cdc file:
-      // case 1: all the data from the previous file slice are deleted. and no 
new data is inserted;
-      // case 2: all the data are new-coming,
-      return Option.empty();
-    }
-    return cdcLogger.writeCDCData();
-  }
-
   public static void setCDCStatIfNeeded(HoodieWriteStat stat,
                                         Option<AppendResult> cdcResult,
                                         String partitionPath,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index da6b1c6071..442256ade3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -425,6 +425,16 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload, I, K, O> extends H
     }
   }
 
+  private Option<AppendResult> writeCDCDataIfNeeded() {
+    if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == 
insertRecordsWritten)) {
+      // the following cases where we do not need to write out the cdc file:
+      // case 1: all the data from the previous file slice are deleted. and no 
new data is inserted;
+      // case 2: all the data are new-coming,
+      return Option.empty();
+    }
+    return cdcLogger.writeCDCData();
+  }
+
   @Override
   public List<WriteStatus> close() {
     try {
@@ -445,8 +455,7 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload, I, K, O> extends H
       }
 
       // if there are cdc data written, set the CDC-related information.
-      Option<AppendResult> cdcResult =
-          HoodieCDCLogger.writeCDCDataIfNeeded(cdcLogger, recordsWritten, 
insertRecordsWritten);
+      Option<AppendResult> cdcResult = writeCDCDataIfNeeded();
       HoodieCDCLogger.setCDCStatIfNeeded(stat, cdcResult, partitionPath, fs);
 
       long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
index d45cfe0351..6a87cc3c29 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java
@@ -105,8 +105,12 @@ public class AvroSchemaUtils {
    * wrapping around provided target non-null type
    */
   public static Schema createNullableSchema(Schema.Type avroType) {
-    checkState(avroType != Schema.Type.NULL);
-    return Schema.createUnion(Schema.create(Schema.Type.NULL), 
Schema.create(avroType));
+    return createNullableSchema(Schema.create(avroType));
+  }
+
+  public static Schema createNullableSchema(Schema schema) {
+    checkState(schema.getType() != Schema.Type.NULL);
+    return Schema.createUnion(Schema.create(Schema.Type.NULL), schema);
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
index 3cf8315a54..a741181d4d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java
@@ -18,12 +18,17 @@
 
 package org.apache.hudi.common.table.cdc;
 
+import org.apache.avro.JsonProperties;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 
+import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.exception.HoodieException;
 
+import java.util.Arrays;
+import java.util.List;
+
 public class HoodieCDCUtils {
 
   public static final String CDC_LOGFILE_SUFFIX = "-cdc";
@@ -50,33 +55,6 @@ public class HoodieCDCUtils {
       CDC_AFTER_IMAGE
   };
 
-  /**
-   * This is the standard CDC output format.
-   * Also, this is the schema of cdc log file in the case 
`hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'.
-   */
-  public static final String CDC_SCHEMA_STRING = 
"{\"type\":\"record\",\"name\":\"Record\","
-      + "\"fields\":["
-      + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]},"
-      + "{\"name\":\"ts_ms\",\"type\":[\"string\",\"null\"]},"
-      + "{\"name\":\"before\",\"type\":[\"string\",\"null\"]},"
-      + "{\"name\":\"after\",\"type\":[\"string\",\"null\"]}"
-      + "]}";
-
-  public static final Schema CDC_SCHEMA = new 
Schema.Parser().parse(CDC_SCHEMA_STRING);
-
-  /**
-   * The schema of cdc log file in the case 
`hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'.
-   */
-  public static final String CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING = 
"{\"type\":\"record\",\"name\":\"Record\","
-      + "\"fields\":["
-      + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]},"
-      + "{\"name\":\"record_key\",\"type\":[\"string\",\"null\"]},"
-      + "{\"name\":\"before\",\"type\":[\"string\",\"null\"]}"
-      + "]}";
-
-  public static final Schema CDC_SCHEMA_OP_RECORDKEY_BEFORE =
-      new Schema.Parser().parse(CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING);
-
   /**
    * The schema of cdc log file in the case 
`hoodie.table.cdc.supplemental.logging.mode` is 'cdc_op_key'.
    */
@@ -89,32 +67,50 @@ public class HoodieCDCUtils {
   public static final Schema CDC_SCHEMA_OP_AND_RECORDKEY =
       new Schema.Parser().parse(CDC_SCHEMA_OP_AND_RECORDKEY_STRING);
 
-  public static final Schema 
schemaBySupplementalLoggingMode(HoodieCDCSupplementalLoggingMode 
supplementalLoggingMode) {
-    switch (supplementalLoggingMode) {
-      case WITH_BEFORE_AFTER:
-        return CDC_SCHEMA;
-      case WITH_BEFORE:
-        return CDC_SCHEMA_OP_RECORDKEY_BEFORE;
-      case OP_KEY:
-        return CDC_SCHEMA_OP_AND_RECORDKEY;
-      default:
-        throw new HoodieException("not support this supplemental logging mode: 
" + supplementalLoggingMode);
+  public static Schema schemaBySupplementalLoggingMode(
+      HoodieCDCSupplementalLoggingMode supplementalLoggingMode,
+      Schema tableSchema) {
+    if (supplementalLoggingMode == HoodieCDCSupplementalLoggingMode.OP_KEY) {
+      return CDC_SCHEMA_OP_AND_RECORDKEY;
+    } else if (supplementalLoggingMode == 
HoodieCDCSupplementalLoggingMode.WITH_BEFORE) {
+      return createCDCSchema(tableSchema, false);
+    } else if (supplementalLoggingMode == 
HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER) {
+      return createCDCSchema(tableSchema, true);
+    } else {
+      throw new HoodieException("not support this supplemental logging mode: " 
+ supplementalLoggingMode);
     }
   }
 
+  private static Schema createCDCSchema(Schema tableSchema, boolean 
withAfterImage) {
+    Schema imageSchema = AvroSchemaUtils.createNullableSchema(tableSchema);
+    Schema.Field opField = new Schema.Field(CDC_OPERATION_TYPE,
+        AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "", 
JsonProperties.NULL_VALUE);
+    Schema.Field beforeField = new Schema.Field(
+        CDC_BEFORE_IMAGE, imageSchema, "", JsonProperties.NULL_VALUE);
+    List<Schema.Field> fields;
+    if (withAfterImage) {
+      Schema.Field tsField = new Schema.Field(CDC_COMMIT_TIMESTAMP,
+          AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "", 
JsonProperties.NULL_VALUE);
+      Schema.Field afterField = new Schema.Field(
+          CDC_AFTER_IMAGE, imageSchema, "", JsonProperties.NULL_VALUE);
+      fields = Arrays.asList(opField, tsField, beforeField, afterField);
+    } else {
+      Schema.Field keyField = new Schema.Field(CDC_RECORD_KEY,
+          AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "", 
JsonProperties.NULL_VALUE);
+      fields = Arrays.asList(opField, keyField, beforeField);
+    }
+
+    Schema mergedSchema = Schema.createRecord("CDC", null, 
tableSchema.getNamespace(), false);
+    mergedSchema.setFields(fields);
+    return mergedSchema;
+  }
+
   /**
    * Build the cdc record which has all the cdc fields when 
`hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'.
    */
-  public static GenericData.Record cdcRecord(
-      String op, String commitTime, GenericRecord before, GenericRecord after) 
{
-    String beforeJsonStr = recordToJson(before);
-    String afterJsonStr = recordToJson(after);
-    return cdcRecord(op, commitTime, beforeJsonStr, afterJsonStr);
-  }
-
-  public static GenericData.Record cdcRecord(
-      String op, String commitTime, String before, String after) {
-    GenericData.Record record = new GenericData.Record(CDC_SCHEMA);
+  public static GenericData.Record cdcRecord(Schema cdcSchema, String op, 
String commitTime,
+                                             GenericRecord before, 
GenericRecord after) {
+    GenericData.Record record = new GenericData.Record(cdcSchema);
     record.put(CDC_OPERATION_TYPE, op);
     record.put(CDC_COMMIT_TIMESTAMP, commitTime);
     record.put(CDC_BEFORE_IMAGE, before);
@@ -125,20 +121,20 @@ public class HoodieCDCUtils {
   /**
    * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 
'cdc_data_before'.
    */
-  public static GenericData.Record cdcRecord(String op, String recordKey, 
GenericRecord before) {
-    GenericData.Record record = new 
GenericData.Record(CDC_SCHEMA_OP_RECORDKEY_BEFORE);
+  public static GenericData.Record cdcRecord(Schema cdcSchema, String op,
+                                             String recordKey, GenericRecord 
before) {
+    GenericData.Record record = new GenericData.Record(cdcSchema);
     record.put(CDC_OPERATION_TYPE, op);
     record.put(CDC_RECORD_KEY, recordKey);
-    String beforeJsonStr = recordToJson(before);
-    record.put(CDC_BEFORE_IMAGE, beforeJsonStr);
+    record.put(CDC_BEFORE_IMAGE, before);
     return record;
   }
 
   /**
    * Build the cdc record when `hoodie.table.cdc.supplemental.logging.mode` is 
'cdc_op_key'.
    */
-  public static GenericData.Record cdcRecord(String op, String recordKey) {
-    GenericData.Record record = new 
GenericData.Record(CDC_SCHEMA_OP_AND_RECORDKEY);
+  public static GenericData.Record cdcRecord(Schema cdcSchema, String op, 
String recordKey) {
+    GenericData.Record record = new GenericData.Record(cdcSchema);
     record.put(CDC_OPERATION_TYPE, op);
     record.put(CDC_RECORD_KEY, recordKey);
     return record;
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index c037c79dd8..19552ed5a9 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.functional;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 
@@ -37,6 +38,7 @@ import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.table.log.AppendResult;
 import org.apache.hudi.common.table.log.HoodieLogFileReader;
@@ -561,46 +563,79 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         .withFs(fs)
         .build();
 
-    GenericRecord record1 = HoodieCDCUtils.cdcRecord("i", "100",
-        null, "{\"uuid\": 1, \"name\": \"apple\"}, \"ts\": 1100}");
-    GenericRecord record2 = HoodieCDCUtils.cdcRecord("u", "100",
-        "{\"uuid\": 2, \"name\": \"banana\"}, \"ts\": 1000}",
-        "{\"uuid\": 2, \"name\": \"blueberry\"}, \"ts\": 1100}");
-    GenericRecord record3 = HoodieCDCUtils.cdcRecord("d", "100",
-        "{\"uuid\": 3, \"name\": \"cherry\"}, \"ts\": 1000}", null);
+    String dataSchameString = "{\"type\":\"record\",\"name\":\"Record\","
+        + "\"fields\":["
+        + "{\"name\":\"uuid\",\"type\":[\"int\",\"null\"]},"
+        + "{\"name\":\"name\",\"type\":[\"string\",\"null\"]},"
+        + "{\"name\":\"ts\",\"type\":[\"long\",\"null\"]}"
+        + "]}";
+    Schema dataSchema = new Schema.Parser().parse(dataSchameString);
+    Schema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
+        HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER, dataSchema);
+    GenericRecord insertedRecord = new GenericData.Record(dataSchema);
+    insertedRecord.put("uuid", 1);
+    insertedRecord.put("name", "apple");
+    insertedRecord.put("ts", 1100L);
+
+    GenericRecord updateBeforeImageRecord = new GenericData.Record(dataSchema);
+    updateBeforeImageRecord.put("uuid", 2);
+    updateBeforeImageRecord.put("name", "banana");
+    updateBeforeImageRecord.put("ts", 1000L);
+    GenericRecord updateAfterImageRecord = new GenericData.Record(dataSchema);
+    updateAfterImageRecord.put("uuid", 2);
+    updateAfterImageRecord.put("name", "blueberry");
+    updateAfterImageRecord.put("ts", 1100L);
+
+    GenericRecord deletedRecord = new GenericData.Record(dataSchema);
+    deletedRecord.put("uuid", 3);
+    deletedRecord.put("name", "cherry");
+    deletedRecord.put("ts", 1000L);
+
+    GenericRecord record1 = HoodieCDCUtils.cdcRecord(cdcSchema, "i", "100",
+        null, insertedRecord);
+    GenericRecord record2 = HoodieCDCUtils.cdcRecord(cdcSchema, "u", "100",
+        updateBeforeImageRecord, updateAfterImageRecord);
+    GenericRecord record3 = HoodieCDCUtils.cdcRecord(cdcSchema, "d", "100",
+        deletedRecord, null);
     List<IndexedRecord> records = new ArrayList<>(Arrays.asList(record1, 
record2, record3));
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
-    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
HoodieCDCUtils.CDC_SCHEMA_STRING);
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, cdcSchema.toString());
     HoodieDataBlock dataBlock = 
getDataBlock(HoodieLogBlockType.CDC_DATA_BLOCK, records, header);
     writer.appendBlock(dataBlock);
     writer.close();
 
-    Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), 
HoodieCDCUtils.CDC_SCHEMA);
+    Reader reader = HoodieLogFormat.newReader(fs, writer.getLogFile(), 
cdcSchema);
     assertTrue(reader.hasNext());
     HoodieLogBlock block = reader.next();
     HoodieDataBlock dataBlockRead = (HoodieDataBlock) block;
     List<IndexedRecord> recordsRead = getRecords(dataBlockRead);
     assertEquals(3, recordsRead.size(),
         "Read records size should be equal to the written records size");
-    assertEquals(dataBlockRead.getSchema(), HoodieCDCUtils.CDC_SCHEMA);
+    assertEquals(dataBlockRead.getSchema(), cdcSchema);
 
     GenericRecord insert = (GenericRecord) recordsRead.stream()
         .filter(record -> 
record.get(0).toString().equals("i")).findFirst().get();
     assertNull(insert.get("before"));
     assertNotNull(insert.get("after"));
+    assertEquals(((GenericRecord) insert.get("after")).get("name").toString(), 
"apple");
 
     GenericRecord update = (GenericRecord) recordsRead.stream()
         .filter(record -> 
record.get(0).toString().equals("u")).findFirst().get();
     assertNotNull(update.get("before"));
     assertNotNull(update.get("after"));
-    assertTrue(update.get("before").toString().contains("banana"));
-    assertTrue(update.get("after").toString().contains("blueberry"));
+    GenericRecord uBefore = (GenericRecord) update.get("before");
+    GenericRecord uAfter = (GenericRecord) update.get("after");
+    assertEquals(String.valueOf(uBefore.get("name")), "banana");
+    assertEquals(Long.valueOf(uBefore.get("ts").toString()), 1000L);
+    assertEquals(String.valueOf(uAfter.get("name")), "blueberry");
+    assertEquals(Long.valueOf(uAfter.get("ts").toString()), 1100L);
 
     GenericRecord delete = (GenericRecord) recordsRead.stream()
         .filter(record -> 
record.get(0).toString().equals("d")).findFirst().get();
     assertNotNull(delete.get("before"));
     assertNull(delete.get("after"));
+    assertEquals(((GenericRecord) 
delete.get("before")).get("name").toString(), "cherry");
 
     reader.close();
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
new file mode 100644
index 0000000000..6f0731578d
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCDataFrameSuite.scala
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional.cdc
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, IndexedRecord}
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieKey, 
HoodieLogFile, HoodieRecord}
+import org.apache.hudi.common.table.cdc.{HoodieCDCOperation, 
HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
+import org.apache.hudi.common.table.log.HoodieLogFormat
+import org.apache.hudi.common.table.log.block.{HoodieDataBlock}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.common.testutils.RawTripTestPayload
+import 
org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, 
recordsToStrings}
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieWriteConfig}
+import org.apache.hudi.testutils.HoodieClientTestBase
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.SaveMode
+
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotEquals, assertNull, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.CsvSource
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+class TestCDCDataFrameSuite extends HoodieClientTestBase {
+
+  var spark: SparkSession = _
+
+  val commonOpts = Map(
+    HoodieTableConfig.CDC_ENABLED.key -> "true",
+    "hoodie.insert.shuffle.parallelism" -> "4",
+    "hoodie.upsert.shuffle.parallelism" -> "4",
+    "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+    "hoodie.delete.shuffle.parallelism" -> "1",
+    RECORDKEY_FIELD.key -> "_row_key",
+    PRECOMBINE_FIELD.key -> "timestamp",
+    HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+    HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "1",
+    HoodieCleanConfig.AUTO_CLEAN.key -> "false"
+  )
+
+  @BeforeEach override def setUp(): Unit = {
+    setTableName("hoodie_test")
+    initPath()
+    initSparkContexts()
+    spark = sqlContext.sparkSession
+    initTestDataGenerator()
+    initFileSystem()
+  }
+
+  @AfterEach override def tearDown(): Unit = {
+    cleanupSparkContexts()
+    cleanupTestDataGenerator()
+    cleanupFileSystem()
+  }
+
+  @ParameterizedTest
+  @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after"))
+  def testCOWDataSourceWrite(cdcSupplementalLoggingMode: String): Unit = {
+    val options = commonOpts ++ Map(
+      HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> 
cdcSupplementalLoggingMode
+    )
+
+    // Insert Operation
+    val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(options)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(spark.sessionState.newHadoopConf)
+      .build()
+    val instant1 = metaClient.reloadActiveTimeline.lastInstant().get()
+    assertEquals(spark.read.format("hudi").load(basePath).count(), 100)
+    // all the data is new-coming, it will write out cdc log files.
+    assertFalse(hasCDCLogFile(instant1))
+
+    val schemaResolver = new TableSchemaResolver(metaClient)
+    val dataSchema = schemaResolver.getTableAvroSchema(false)
+    val cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
+      HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode), 
dataSchema)
+
+    // Upsert Operation
+    val hoodieRecords2 = dataGen.generateUniqueUpdates("001", 50)
+    val records2 = recordsToStrings(hoodieRecords2).toList
+    val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("org.apache.hudi")
+      .options(options)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val instant2 = metaClient.reloadActiveTimeline.lastInstant().get()
+    assertEquals(spark.read.format("hudi").load(basePath).count(), 100)
+
+    // part of data are updated, it will write out cdc log files
+    assertTrue(hasCDCLogFile(instant2))
+    val cdcData2: Seq[IndexedRecord] = 
getCDCLogFIle(instant2).flatMap(readCDCLogFile(_, cdcSchema))
+    // check the num of cdc data
+    assertEquals(cdcData2.size, 50)
+    // check op
+    assert(cdcData2.forall( r => r.get(0).toString == "u"))
+    // check record key, before, after according to the supplemental logging 
mode
+    checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode, cdcSchema, 
dataSchema,
+      cdcData2, hoodieRecords2, HoodieCDCOperation.UPDATE)
+
+    // Delete Operation
+    val hoodieKey3 = dataGen.generateUniqueDeletes(20)
+    val records3 = deleteRecordsToStrings(hoodieKey3).toList
+    val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2))
+    inputDF3.write.format("org.apache.hudi")
+      .options(options)
+      .option(OPERATION.key, DELETE_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val instant3 = metaClient.reloadActiveTimeline.lastInstant().get()
+    assertEquals(spark.read.format("hudi").load(basePath).count(), 80)
+
+    // part of data are deleted, it will write out cdc log files
+    assertTrue(hasCDCLogFile(instant3))
+    val cdcData3 = getCDCLogFIle(instant3).flatMap(readCDCLogFile(_, 
cdcSchema))
+    // check the num of cdc data
+    assertEquals(cdcData3.size, 20)
+    // check op
+    assert(cdcData3.forall( r => r.get(0).toString == "d"))
+    // check record key, before, after according to the supplemental logging 
mode
+    checkCDCDataForDelete(cdcSupplementalLoggingMode, cdcSchema, cdcData3, 
hoodieKey3)
+  }
+
+  @ParameterizedTest
+  @CsvSource(Array("cdc_op_key", "cdc_data_before", "cdc_data_before_after"))
+  def testMORDataSourceWrite(cdcSupplementalLoggingMode: String): Unit = {
+    val options = commonOpts ++ Map(
+      TABLE_TYPE.key() -> MOR_TABLE_TYPE_OPT_VAL,
+      HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key -> 
cdcSupplementalLoggingMode
+    )
+
+    // 1. Insert Operation
+    val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("org.apache.hudi")
+      .options(options)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(spark.sessionState.newHadoopConf)
+      .build()
+
+    val schemaResolver = new TableSchemaResolver(metaClient)
+    val dataSchema = schemaResolver.getTableAvroSchema(false)
+    val cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(
+      HoodieCDCSupplementalLoggingMode.parse(cdcSupplementalLoggingMode), 
dataSchema)
+
+    val instant1 = metaClient.reloadActiveTimeline.lastInstant().get()
+    // all the data is new-coming, it will NOT write out cdc log files.
+    assertFalse(hasCDCLogFile(instant1))
+
+    // 2. Upsert Operation
+    val records2_1 = recordsToStrings(dataGen.generateUniqueUpdates("001", 
30)).toList
+    val inputDF2_1 = 
spark.read.json(spark.sparkContext.parallelize(records2_1, 2))
+    val records2_2 = recordsToStrings(dataGen.generateInserts("001", 
20)).toList
+    val inputDF2_2 = 
spark.read.json(spark.sparkContext.parallelize(records2_2, 2))
+    inputDF2_1.union(inputDF2_2).write.format("org.apache.hudi")
+      .options(options)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val instant2 = metaClient.reloadActiveTimeline.lastInstant().get()
+
+    // part of data are updated, it will write out cdc log files
+    assertTrue(hasCDCLogFile(instant2))
+    val cdcData2 = getCDCLogFIle(instant2).flatMap(readCDCLogFile(_, 
cdcSchema))
+    assertEquals(cdcData2.size, 50)
+    // check op
+    assertEquals(cdcData2.count(r => r.get(0).toString == "u"), 30)
+    assertEquals(cdcData2.count(r => r.get(0).toString == "i"), 20)
+
+    // 3. Delete Operation
+    val records3 = 
deleteRecordsToStrings(dataGen.generateUniqueDeletes(20)).toList
+    val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2))
+    inputDF3.write.format("org.apache.hudi")
+      .options(options)
+      .option(OPERATION.key, DELETE_OPERATION_OPT_VAL)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val instant3 = metaClient.reloadActiveTimeline.lastInstant().get()
+    // in cases that there is log files, it will NOT write out cdc log files.
+    assertFalse(hasCDCLogFile(instant3))
+  }
+
+  /**
+   * whether this instant will create a cdc log file.
+   */
+  private def hasCDCLogFile(instant: HoodieInstant): Boolean = {
+    val commitMetadata = HoodieCommitMetadata.fromBytes(
+      metaClient.reloadActiveTimeline().getInstantDetails(instant).get(),
+      classOf[HoodieCommitMetadata]
+    )
+    val hoodieWriteStats = commitMetadata.getWriteStats.asScala
+    hoodieWriteStats.exists { hoodieWriteStat =>
+      val cdcPath = hoodieWriteStat.getCdcPath
+      cdcPath != null && cdcPath.nonEmpty
+    }
+  }
+
+  /**
+   * whether this instant will create a cdc log file.
+   */
+  private def getCDCLogFIle(instant: HoodieInstant): List[String] = {
+    val commitMetadata = HoodieCommitMetadata.fromBytes(
+      metaClient.reloadActiveTimeline().getInstantDetails(instant).get(),
+      classOf[HoodieCommitMetadata]
+    )
+    commitMetadata.getWriteStats.asScala.map(_.getCdcPath).toList
+  }
+
+  private def readCDCLogFile(relativeLogFile: String, cdcSchema: Schema): 
List[IndexedRecord] = {
+    val logFile = new HoodieLogFile(
+      metaClient.getFs.getFileStatus(new Path(metaClient.getBasePathV2, 
relativeLogFile)))
+    val reader = HoodieLogFormat.newReader(fs, logFile, cdcSchema);
+    assertTrue(reader.hasNext);
+
+    val block = reader.next().asInstanceOf[HoodieDataBlock];
+    block.getRecordIterator.asScala.toList
+  }
+
+  private def checkCDCDataForInsertOrUpdate(cdcSupplementalLoggingMode: String,
+                                            cdcSchema: Schema,
+                                            dataSchema: Schema,
+                                            cdcRecords: Seq[IndexedRecord],
+                                            newHoodieRecords: 
java.util.List[HoodieRecord[_]],
+                                            op: HoodieCDCOperation): Unit = {
+    val cdcRecord = cdcRecords.head.asInstanceOf[GenericRecord]
+    // check schema
+    assertEquals(cdcRecord.getSchema, cdcSchema)
+    if (cdcSupplementalLoggingMode == "cdc_op_key") {
+      // check record key
+      assert(cdcRecords.map(_.get(1).toString).sorted == 
newHoodieRecords.map(_.getKey.getRecordKey).sorted)
+    } else if (cdcSupplementalLoggingMode == "cdc_data_before") {
+      // check record key
+      assert(cdcRecords.map(_.get(1).toString).sorted == 
newHoodieRecords.map(_.getKey.getRecordKey).sorted)
+      // check before
+      if (op == HoodieCDCOperation.INSERT) {
+        assertNull(cdcRecord.get("before"))
+      } else {
+        val payload = newHoodieRecords.find(_.getKey.getRecordKey == 
cdcRecord.get("record_key").toString).get
+          .getData.asInstanceOf[RawTripTestPayload]
+        val genericRecord = 
payload.getInsertValue(dataSchema).get.asInstanceOf[GenericRecord]
+        val cdcBeforeValue = 
cdcRecord.get("before").asInstanceOf[GenericRecord]
+        assertNotEquals(genericRecord.get("begin_lat"), 
cdcBeforeValue.get("begin_lat"))
+      }
+    } else {
+      val cdcBeforeValue = cdcRecord.get("before").asInstanceOf[GenericRecord]
+      val cdcAfterValue = cdcRecord.get("after").asInstanceOf[GenericRecord]
+      if (op == HoodieCDCOperation.INSERT) {
+        // check before
+        assertNull(cdcBeforeValue)
+        // check after
+        val payload = newHoodieRecords.find(_.getKey.getRecordKey == 
cdcAfterValue.get("_row_key").toString).get
+          .getData.asInstanceOf[RawTripTestPayload]
+        val genericRecord = 
payload.getInsertValue(dataSchema).get.asInstanceOf[GenericRecord]
+        assertEquals(genericRecord.get("begin_lat"), 
cdcAfterValue.get("begin_lat"))
+      } else {
+        val payload = newHoodieRecords.find(_.getKey.getRecordKey == 
cdcAfterValue.get("_row_key").toString).get
+          .getData.asInstanceOf[RawTripTestPayload]
+        val genericRecord = 
payload.getInsertValue(dataSchema).get.asInstanceOf[GenericRecord]
+        // check before
+        assertNotEquals(genericRecord.get("begin_lat"), 
cdcBeforeValue.get("begin_lat"))
+        // check after
+        assertEquals(genericRecord.get("begin_lat"), 
cdcAfterValue.get("begin_lat"))
+      }
+    }
+  }
+
+  private def checkCDCDataForDelete(cdcSupplementalLoggingMode: String,
+                                    cdcSchema: Schema,
+                                    cdcRecords: Seq[IndexedRecord],
+                                    deletedKeys: java.util.List[HoodieKey]): 
Unit = {
+    val cdcRecord = cdcRecords.head.asInstanceOf[GenericRecord]
+    // check schema
+    assertEquals(cdcRecord.getSchema, cdcSchema)
+    if (cdcSupplementalLoggingMode == "cdc_op_key") {
+      // check record key
+      assert(cdcRecords.map(_.get(1).toString).sorted == 
deletedKeys.map(_.getRecordKey).sorted)
+    } else if (cdcSupplementalLoggingMode == "cdc_data_before") {
+      // check record key
+      assert(cdcRecords.map(_.get(1).toString).sorted == 
deletedKeys.map(_.getRecordKey).sorted)
+    } else {
+      val cdcBeforeValue = cdcRecord.get("before").asInstanceOf[GenericRecord]
+      val cdcAfterValue = cdcRecord.get("after").asInstanceOf[GenericRecord]
+      // check before
+      assert(deletedKeys.exists(_.getRecordKey == 
cdcBeforeValue.get("_row_key").toString))
+      // check after
+      assertNull(cdcAfterValue)
+    }
+  }
+}


Reply via email to