This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b81f49eb062534da3ebb447f13a0482bbb1cd470
Author: Yann Byron <[email protected]>
AuthorDate: Mon Jan 30 16:15:24 2023 +0800

    [HUDI-5634] Rename CDC related classes (#7410)
---
 .../hudi/common/table/cdc/HoodieCDCExtractor.java  | 10 ++--
 .../hudi/common/table/cdc/HoodieCDCFileSplit.java  | 22 ++++----
 ...CInferCase.java => HoodieCDCInferenceCase.java} |  4 +-
 .../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala   | 63 +++++++++++-----------
 rfc/rfc-51/rfc-51.md                               |  2 +-
 5 files changed, 49 insertions(+), 52 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index 506680dc3b2..6ca116015b0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -56,11 +56,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.AS_IS;
-import static 
org.apache.hudi.common.table.cdc.HoodieCDCInferCase.BASE_FILE_DELETE;
-import static 
org.apache.hudi.common.table.cdc.HoodieCDCInferCase.BASE_FILE_INSERT;
-import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.LOG_FILE;
-import static 
org.apache.hudi.common.table.cdc.HoodieCDCInferCase.REPLACE_COMMIT;
+import static org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase.AS_IS;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase.BASE_FILE_DELETE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase.BASE_FILE_INSERT;
+import static org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase.LOG_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase.REPLACE_COMMIT;
 import static 
org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after;
 import static 
org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.op_key_only;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
index d508f7ac4ea..f992a9b228c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
@@ -33,16 +33,16 @@ import java.util.stream.Collectors;
  * This contains all the information that retrieve the change data at a single 
file group and
  * at a single commit.
  * <p>
- * For `cdcInferCase` = {@link HoodieCDCInferCase#BASE_FILE_INSERT}, `cdcFile` 
is a current version of
+ * For `cdcInferCase` = {@link HoodieCDCInferenceCase#BASE_FILE_INSERT}, 
`cdcFile` is a current version of
  * the base file in the group, and `beforeFileSlice` is None.
- * For `cdcInferCase` = {@link HoodieCDCInferCase#BASE_FILE_DELETE}, `cdcFile` 
is null,
+ * For `cdcInferCase` = {@link HoodieCDCInferenceCase#BASE_FILE_DELETE}, 
`cdcFile` is null,
  * `beforeFileSlice` is the previous version of the base file in the group.
- * For `cdcInferCase` = {@link HoodieCDCInferCase#AS_IS}, `cdcFile` is a log 
file with cdc blocks.
+ * For `cdcInferCase` = {@link HoodieCDCInferenceCase#AS_IS}, `cdcFile` is a 
log file with cdc blocks.
  * when enable the supplemental logging, both `beforeFileSlice` and 
`afterFileSlice` are None,
  * otherwise these two are the previous and current version of the base file.
- * For `cdcInferCase` = {@link HoodieCDCInferCase#LOG_FILE}, `cdcFile` is a 
normal log file and
+ * For `cdcInferCase` = {@link HoodieCDCInferenceCase#LOG_FILE}, `cdcFile` is 
a normal log file and
  * `beforeFileSlice` is the previous version of the file slice.
- * For `cdcInferCase` = {@link HoodieCDCInferCase#REPLACE_COMMIT}, `cdcFile` 
is null,
+ * For `cdcInferCase` = {@link HoodieCDCInferenceCase#REPLACE_COMMIT}, 
`cdcFile` is null,
  * `beforeFileSlice` is the current version of the file slice.
  */
 public class HoodieCDCFileSplit implements Serializable, 
Comparable<HoodieCDCFileSplit> {
@@ -54,7 +54,7 @@ public class HoodieCDCFileSplit implements Serializable, 
Comparable<HoodieCDCFil
   /**
    * Flag that decides to how to retrieve the change data. More details see: 
`HoodieCDCLogicalFileType`.
    */
-  private final HoodieCDCInferCase cdcInferCase;
+  private final HoodieCDCInferenceCase cdcInferCase;
 
   /**
    * The file that the change data can be parsed from.
@@ -71,17 +71,17 @@ public class HoodieCDCFileSplit implements Serializable, 
Comparable<HoodieCDCFil
    */
   private final Option<FileSlice> afterFileSlice;
 
-  public HoodieCDCFileSplit(String instant, HoodieCDCInferCase cdcInferCase, 
String cdcFile) {
+  public HoodieCDCFileSplit(String instant, HoodieCDCInferenceCase 
cdcInferCase, String cdcFile) {
     this(instant, cdcInferCase, cdcFile, Option.empty(), Option.empty());
   }
 
-  public HoodieCDCFileSplit(String instant, HoodieCDCInferCase cdcInferCase, 
Collection<String> cdcFiles) {
+  public HoodieCDCFileSplit(String instant, HoodieCDCInferenceCase 
cdcInferCase, Collection<String> cdcFiles) {
     this(instant, cdcInferCase, cdcFiles, Option.empty(), Option.empty());
   }
 
   public HoodieCDCFileSplit(
       String instant,
-      HoodieCDCInferCase cdcInferCase,
+      HoodieCDCInferenceCase cdcInferCase,
       String cdcFile,
       Option<FileSlice> beforeFileSlice,
       Option<FileSlice> afterFileSlice) {
@@ -90,7 +90,7 @@ public class HoodieCDCFileSplit implements Serializable, 
Comparable<HoodieCDCFil
 
   public HoodieCDCFileSplit(
       String instant,
-      HoodieCDCInferCase cdcInferCase,
+      HoodieCDCInferenceCase cdcInferCase,
       Collection<String> cdcFiles,
       Option<FileSlice> beforeFileSlice,
       Option<FileSlice> afterFileSlice) {
@@ -106,7 +106,7 @@ public class HoodieCDCFileSplit implements Serializable, 
Comparable<HoodieCDCFil
     return this.instant;
   }
 
-  public HoodieCDCInferCase getCdcInferCase() {
+  public HoodieCDCInferenceCase getCdcInferCase() {
     return this.cdcInferCase;
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferenceCase.java
similarity index 98%
rename from 
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferenceCase.java
index dfcb08a84cd..9f6d85108c1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferenceCase.java
@@ -42,7 +42,7 @@ package org.apache.hudi.common.table.cdc;
  *   file is new-coming, so we can load this, mark all the records with `i`, 
and treat them as
  *   the value of `after`. The value of `before` for each record is null.
  *
- * BASE_FILE_INSERT:
+ * BASE_FILE_DELETE:
  *   For this type, there must be an empty file at the current instant, but a 
non-empty base file
  *   at the previous instant. First we find this base file that has the same 
file group and belongs
  *   to the previous instant. Then load this, mark all the records with `d`, 
and treat them as
@@ -67,7 +67,7 @@ package org.apache.hudi.common.table.cdc;
  *   a whole file group. First we find this file group. Then load this, mark 
all the records with
  *   `d`, and treat them as the value of `before`. The value of `after` for 
each record is null.
  */
-public enum HoodieCDCInferCase {
+public enum HoodieCDCInferenceCase {
 
   AS_IS,
   BASE_FILE_INSERT,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index 1e4bf0098a1..29f477a84d4 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -32,7 +32,7 @@ import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
 import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._
+import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase._
 import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
 import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCUtils}
@@ -81,10 +81,11 @@ class HoodieCDCRDD(
     originTableSchema: HoodieTableSchema,
     cdcSchema: StructType,
     requiredCdcSchema: StructType,
-    changes: Array[HoodieCDCFileGroupSplit])
+    @transient changes: Array[HoodieCDCFileGroupSplit])
   extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD {
 
-  @transient private val hadoopConf = spark.sparkContext.hadoopConfiguration
+  @transient
+  private val hadoopConf = spark.sparkContext.hadoopConfiguration
 
   private val confBroadcast = spark.sparkContext.broadcast(new 
SerializableWritable(hadoopConf))
 
@@ -118,7 +119,7 @@ class HoodieCDCRDD(
 
     private lazy val fs = metaClient.getFs.getFileSystem
 
-    private lazy val conf = new Configuration(confBroadcast.value.value)
+    private lazy val conf = confBroadcast.value.value
 
     private lazy val basePath = metaClient.getBasePathV2
 
@@ -127,11 +128,7 @@ class HoodieCDCRDD(
     private lazy val populateMetaFields = tableConfig.populateMetaFields()
 
     private lazy val keyGenerator = {
-      val props = new TypedProperties()
-      props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, 
tableConfig.getKeyGeneratorClassName)
-      props.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, 
tableConfig.getRecordKeyFieldProp)
-      props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key, 
tableConfig.getPartitionFieldProp)
-      HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
+      HoodieSparkKeyGeneratorFactory.createKeyGenerator(tableConfig.getProps())
     }
 
     private lazy val recordKeyField: String = if (populateMetaFields) {
@@ -202,7 +199,7 @@ class HoodieCDCRDD(
     private var currentInstant: String = _
 
     // The change file that is currently being processed
-    private var currentChangeFile: HoodieCDCFileSplit = _
+    private var currentCDCFileSplit: HoodieCDCFileSplit = _
 
     /**
      * Two cases will use this to iterator the records:
@@ -258,10 +255,10 @@ class HoodieCDCRDD(
       if (needLoadNextFile) {
         loadCdcFile()
       }
-      if (currentChangeFile == null) {
+      if (currentCDCFileSplit == null) {
         false
       } else {
-        currentChangeFile.getCdcInferCase match {
+        currentCDCFileSplit.getCdcInferCase match {
           case BASE_FILE_INSERT | BASE_FILE_DELETE | REPLACE_COMMIT =>
             if (recordIter.hasNext && loadNext()) {
               true
@@ -292,7 +289,7 @@ class HoodieCDCRDD(
 
     def loadNext(): Boolean = {
       var loaded = false
-      currentChangeFile.getCdcInferCase match {
+      currentCDCFileSplit.getCdcInferCase match {
         case BASE_FILE_INSERT =>
           val originRecord = recordIter.next()
           recordToLoad.update(3, convertRowToJsonString(originRecord))
@@ -416,34 +413,34 @@ class HoodieCDCRDD(
       if (cdcFileIter.hasNext) {
         val split = cdcFileIter.next()
         currentInstant = split.getInstant
-        currentChangeFile = split
-        currentChangeFile.getCdcInferCase match {
+        currentCDCFileSplit = split
+        currentCDCFileSplit.getCdcInferCase match {
           case BASE_FILE_INSERT =>
-            assert(currentChangeFile.getCdcFiles != null && 
currentChangeFile.getCdcFiles.size() == 1)
-            val absCDCPath = new Path(basePath, 
currentChangeFile.getCdcFiles.get(0))
+            assert(currentCDCFileSplit.getCdcFiles != null && 
currentCDCFileSplit.getCdcFiles.size() == 1)
+            val absCDCPath = new Path(basePath, 
currentCDCFileSplit.getCdcFiles.get(0))
             val fileStatus = fs.getFileStatus(absCDCPath)
             val pf = PartitionedFile(InternalRow.empty, 
absCDCPath.toUri.toString, 0, fileStatus.getLen)
             recordIter = parquetReader(pf)
           case BASE_FILE_DELETE =>
-            assert(currentChangeFile.getBeforeFileSlice.isPresent)
-            recordIter = 
loadFileSlice(currentChangeFile.getBeforeFileSlice.get)
+            assert(currentCDCFileSplit.getBeforeFileSlice.isPresent)
+            recordIter = 
loadFileSlice(currentCDCFileSplit.getBeforeFileSlice.get)
           case LOG_FILE =>
-            assert(currentChangeFile.getCdcFiles != null && 
currentChangeFile.getCdcFiles.size() == 1
-              && currentChangeFile.getBeforeFileSlice.isPresent)
-            
loadBeforeFileSliceIfNeeded(currentChangeFile.getBeforeFileSlice.get)
-            val absLogPath = new Path(basePath, 
currentChangeFile.getCdcFiles.get(0))
+            assert(currentCDCFileSplit.getCdcFiles != null && 
currentCDCFileSplit.getCdcFiles.size() == 1
+              && currentCDCFileSplit.getBeforeFileSlice.isPresent)
+            
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
+            val absLogPath = new Path(basePath, 
currentCDCFileSplit.getCdcFiles.get(0))
             val morSplit = HoodieMergeOnReadFileSplit(None, List(new 
HoodieLogFile(fs.getFileStatus(absLogPath))))
             val logFileIterator = new LogFileIterator(morSplit, 
originTableSchema, originTableSchema, tableState, conf)
             logRecordIter = logFileIterator.logRecordsPairIterator
           case AS_IS =>
-            assert(currentChangeFile.getCdcFiles != null && 
!currentChangeFile.getCdcFiles.isEmpty)
+            assert(currentCDCFileSplit.getCdcFiles != null && 
!currentCDCFileSplit.getCdcFiles.isEmpty)
             // load beforeFileSlice to beforeImageRecords
-            if (currentChangeFile.getBeforeFileSlice.isPresent) {
-              
loadBeforeFileSliceIfNeeded(currentChangeFile.getBeforeFileSlice.get)
+            if (currentCDCFileSplit.getBeforeFileSlice.isPresent) {
+              
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
             }
             // load afterFileSlice to afterImageRecords
-            if (currentChangeFile.getAfterFileSlice.isPresent) {
-              val iter = 
loadFileSlice(currentChangeFile.getAfterFileSlice.get())
+            if (currentCDCFileSplit.getAfterFileSlice.isPresent) {
+              val iter = 
loadFileSlice(currentCDCFileSplit.getAfterFileSlice.get())
               afterImageRecords = mutable.Map.empty
               iter.foreach { row =>
                 val key = getRecordKey(row)
@@ -451,13 +448,13 @@ class HoodieCDCRDD(
               }
             }
 
-            val cdcLogFiles = currentChangeFile.getCdcFiles.asScala.map { 
cdcFile =>
+            val cdcLogFiles = currentCDCFileSplit.getCdcFiles.asScala.map { 
cdcFile =>
               new HoodieLogFile(fs.getFileStatus(new Path(basePath, cdcFile)))
             }.toArray
             cdcLogRecordIterator = new HoodieCDCLogRecordIterator(fs, 
cdcLogFiles, cdcAvroSchema)
           case REPLACE_COMMIT =>
-            if (currentChangeFile.getBeforeFileSlice.isPresent) {
-              
loadBeforeFileSliceIfNeeded(currentChangeFile.getBeforeFileSlice.get)
+            if (currentCDCFileSplit.getBeforeFileSlice.isPresent) {
+              
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
             }
             recordIter = beforeImageRecords.values.map { record =>
               deserialize(record)
@@ -467,7 +464,7 @@ class HoodieCDCRDD(
         resetRecordFormat()
       } else {
         currentInstant = null
-        currentChangeFile = null
+        currentCDCFileSplit = null
       }
     }
 
@@ -475,7 +472,7 @@ class HoodieCDCRDD(
      * Initialize the partial fields of the data to be returned in advance to 
speed up.
      */
     private def resetRecordFormat(): Unit = {
-      recordToLoad = currentChangeFile.getCdcInferCase match {
+      recordToLoad = currentCDCFileSplit.getCdcInferCase match {
         case BASE_FILE_INSERT =>
           InternalRow.fromSeq(Array(
             CDCRelation.CDC_OPERATION_INSERT, 
convertToUTF8String(currentInstant),
diff --git a/rfc/rfc-51/rfc-51.md b/rfc/rfc-51/rfc-51.md
index 18b76116c86..29115b46344 100644
--- a/rfc/rfc-51/rfc-51.md
+++ b/rfc/rfc-51/rfc-51.md
@@ -202,7 +202,7 @@ tblproperties (
 
 ### How to infer CDC results
 
-| `HoodieCDCInferCase` | Infer case details                                    
                                                                    | Infer 
logic                                                                           
                                                                                
    | Note                               |
+| `HoodieCDCInferenceCase` | Infer case details                                
                                                                        | Infer 
logic                                                                           
                                                                                
    | Note                               |
 
|----------------------|---------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------|
 | `AS_IS`              | CDC file written (suffix contains `-cdc`) alongside 
base files (COW) or log files (MOR)                                   | CDC 
info will be extracted as is                                                    
                                                                                
      | the read-optimized way to read CDC |
 | `BASE_FILE_INSERT`   | Base files were written to a new file group           
                                                                    | All 
records (in the current commit): `op=I`, `before=null`, `after=<current value>` 
                                                                                
      | on-the-fly inference               |

Reply via email to