This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit b81f49eb062534da3ebb447f13a0482bbb1cd470 Author: Yann Byron <[email protected]> AuthorDate: Mon Jan 30 16:15:24 2023 +0800 [HUDI-5634] Rename CDC related classes (#7410) --- .../hudi/common/table/cdc/HoodieCDCExtractor.java | 10 ++-- .../hudi/common/table/cdc/HoodieCDCFileSplit.java | 22 ++++---- ...CInferCase.java => HoodieCDCInferenceCase.java} | 4 +- .../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala | 63 +++++++++++----------- rfc/rfc-51/rfc-51.md | 2 +- 5 files changed, 49 insertions(+), 52 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java index 506680dc3b2..6ca116015b0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java @@ -56,11 +56,11 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.AS_IS; -import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.BASE_FILE_DELETE; -import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.BASE_FILE_INSERT; -import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.LOG_FILE; -import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.REPLACE_COMMIT; +import static org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase.AS_IS; +import static org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase.BASE_FILE_DELETE; +import static org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase.BASE_FILE_INSERT; +import static org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase.LOG_FILE; +import static org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase.REPLACE_COMMIT; import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after; import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.op_key_only; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java index d508f7ac4ea..f992a9b228c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java @@ -33,16 +33,16 @@ import java.util.stream.Collectors; * This contains all the information that retrieve the change data at a single file group and * at a single commit. * <p> - * For `cdcInferCase` = {@link HoodieCDCInferCase#BASE_FILE_INSERT}, `cdcFile` is a current version of + * For `cdcInferCase` = {@link HoodieCDCInferenceCase#BASE_FILE_INSERT}, `cdcFile` is a current version of * the base file in the group, and `beforeFileSlice` is None. - * For `cdcInferCase` = {@link HoodieCDCInferCase#BASE_FILE_DELETE}, `cdcFile` is null, + * For `cdcInferCase` = {@link HoodieCDCInferenceCase#BASE_FILE_DELETE}, `cdcFile` is null, * `beforeFileSlice` is the previous version of the base file in the group. - * For `cdcInferCase` = {@link HoodieCDCInferCase#AS_IS}, `cdcFile` is a log file with cdc blocks. + * For `cdcInferCase` = {@link HoodieCDCInferenceCase#AS_IS}, `cdcFile` is a log file with cdc blocks. * when enable the supplemental logging, both `beforeFileSlice` and `afterFileSlice` are None, * otherwise these two are the previous and current version of the base file. - * For `cdcInferCase` = {@link HoodieCDCInferCase#LOG_FILE}, `cdcFile` is a normal log file and + * For `cdcInferCase` = {@link HoodieCDCInferenceCase#LOG_FILE}, `cdcFile` is a normal log file and * `beforeFileSlice` is the previous version of the file slice. - * For `cdcInferCase` = {@link HoodieCDCInferCase#REPLACE_COMMIT}, `cdcFile` is null, + * For `cdcInferCase` = {@link HoodieCDCInferenceCase#REPLACE_COMMIT}, `cdcFile` is null, * `beforeFileSlice` is the current version of the file slice. */ public class HoodieCDCFileSplit implements Serializable, Comparable<HoodieCDCFileSplit> { @@ -54,7 +54,7 @@ public class HoodieCDCFileSplit implements Serializable, Comparable<HoodieCDCFil /** * Flag that decides to how to retrieve the change data. More details see: `HoodieCDCLogicalFileType`. */ - private final HoodieCDCInferCase cdcInferCase; + private final HoodieCDCInferenceCase cdcInferCase; /** * The file that the change data can be parsed from. @@ -71,17 +71,17 @@ public class HoodieCDCFileSplit implements Serializable, Comparable<HoodieCDCFil */ private final Option<FileSlice> afterFileSlice; - public HoodieCDCFileSplit(String instant, HoodieCDCInferCase cdcInferCase, String cdcFile) { + public HoodieCDCFileSplit(String instant, HoodieCDCInferenceCase cdcInferCase, String cdcFile) { this(instant, cdcInferCase, cdcFile, Option.empty(), Option.empty()); } - public HoodieCDCFileSplit(String instant, HoodieCDCInferCase cdcInferCase, Collection<String> cdcFiles) { + public HoodieCDCFileSplit(String instant, HoodieCDCInferenceCase cdcInferCase, Collection<String> cdcFiles) { this(instant, cdcInferCase, cdcFiles, Option.empty(), Option.empty()); } public HoodieCDCFileSplit( String instant, - HoodieCDCInferCase cdcInferCase, + HoodieCDCInferenceCase cdcInferCase, String cdcFile, Option<FileSlice> beforeFileSlice, Option<FileSlice> afterFileSlice) { @@ -90,7 +90,7 @@ public class HoodieCDCFileSplit implements Serializable, Comparable<HoodieCDCFil public HoodieCDCFileSplit( String instant, - HoodieCDCInferCase cdcInferCase, + HoodieCDCInferenceCase cdcInferCase, Collection<String> cdcFiles, Option<FileSlice> beforeFileSlice, Option<FileSlice> afterFileSlice) { @@ -106,7 +106,7 @@ public class HoodieCDCFileSplit implements Serializable, Comparable<HoodieCDCFil return this.instant; } - public HoodieCDCInferCase getCdcInferCase() { + public HoodieCDCInferenceCase getCdcInferCase() { return this.cdcInferCase; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferenceCase.java similarity index 98% rename from hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java rename to hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferenceCase.java index dfcb08a84cd..9f6d85108c1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferenceCase.java @@ -42,7 +42,7 @@ package org.apache.hudi.common.table.cdc; * file is new-coming, so we can load this, mark all the records with `i`, and treat them as * the value of `after`. The value of `before` for each record is null. * - * BASE_FILE_INSERT: + * BASE_FILE_DELETE: * For this type, there must be an empty file at the current instant, but a non-empty base file * at the previous instant. First we find this base file that has the same file group and belongs * to the previous instant. Then load this, mark all the records with `d`, and treat them as @@ -67,7 +67,7 @@ package org.apache.hudi.common.table.cdc; * a whole file group. First we find this file group. Then load this, mark all the records with * `d`, and treat them as the value of `before`. The value of `after` for each record is null. */ -public enum HoodieCDCInferCase { +public enum HoodieCDCInferenceCase { AS_IS, BASE_FILE_INSERT, diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala index 1e4bf0098a1..29f477a84d4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala @@ -32,7 +32,7 @@ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.model._ import org.apache.hudi.common.table.HoodieTableMetaClient -import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._ +import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase._ import org.apache.hudi.common.table.cdc.HoodieCDCOperation._ import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._ import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCUtils} @@ -81,10 +81,11 @@ class HoodieCDCRDD( originTableSchema: HoodieTableSchema, cdcSchema: StructType, requiredCdcSchema: StructType, - changes: Array[HoodieCDCFileGroupSplit]) + @transient changes: Array[HoodieCDCFileGroupSplit]) extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD { - @transient private val hadoopConf = spark.sparkContext.hadoopConfiguration + @transient + private val hadoopConf = spark.sparkContext.hadoopConfiguration private val confBroadcast = spark.sparkContext.broadcast(new SerializableWritable(hadoopConf)) @@ -118,7 +119,7 @@ class HoodieCDCRDD( private lazy val fs = metaClient.getFs.getFileSystem - private lazy val conf = new Configuration(confBroadcast.value.value) + private lazy val conf = confBroadcast.value.value private lazy val basePath = metaClient.getBasePathV2 @@ -127,11 +128,7 @@ class HoodieCDCRDD( private lazy val populateMetaFields = tableConfig.populateMetaFields() private lazy val keyGenerator = { - val props = new TypedProperties() - props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, tableConfig.getKeyGeneratorClassName) - props.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, tableConfig.getRecordKeyFieldProp) - props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key, tableConfig.getPartitionFieldProp) - HoodieSparkKeyGeneratorFactory.createKeyGenerator(props) + HoodieSparkKeyGeneratorFactory.createKeyGenerator(tableConfig.getProps()) } private lazy val recordKeyField: String = if (populateMetaFields) { @@ -202,7 +199,7 @@ class HoodieCDCRDD( private var currentInstant: String = _ // The change file that is currently being processed - private var currentChangeFile: HoodieCDCFileSplit = _ + private var currentCDCFileSplit: HoodieCDCFileSplit = _ /** * Two cases will use this to iterator the records: @@ -258,10 +255,10 @@ class HoodieCDCRDD( if (needLoadNextFile) { loadCdcFile() } - if (currentChangeFile == null) { + if (currentCDCFileSplit == null) { false } else { - currentChangeFile.getCdcInferCase match { + currentCDCFileSplit.getCdcInferCase match { case BASE_FILE_INSERT | BASE_FILE_DELETE | REPLACE_COMMIT => if (recordIter.hasNext && loadNext()) { true @@ -292,7 +289,7 @@ class HoodieCDCRDD( def loadNext(): Boolean = { var loaded = false - currentChangeFile.getCdcInferCase match { + currentCDCFileSplit.getCdcInferCase match { case BASE_FILE_INSERT => val originRecord = recordIter.next() recordToLoad.update(3, convertRowToJsonString(originRecord)) @@ -416,34 +413,34 @@ class HoodieCDCRDD( if (cdcFileIter.hasNext) { val split = cdcFileIter.next() currentInstant = split.getInstant - currentChangeFile = split - currentChangeFile.getCdcInferCase match { + currentCDCFileSplit = split + currentCDCFileSplit.getCdcInferCase match { case BASE_FILE_INSERT => - assert(currentChangeFile.getCdcFiles != null && currentChangeFile.getCdcFiles.size() == 1) - val absCDCPath = new Path(basePath, currentChangeFile.getCdcFiles.get(0)) + assert(currentCDCFileSplit.getCdcFiles != null && currentCDCFileSplit.getCdcFiles.size() == 1) + val absCDCPath = new Path(basePath, currentCDCFileSplit.getCdcFiles.get(0)) val fileStatus = fs.getFileStatus(absCDCPath) val pf = PartitionedFile(InternalRow.empty, absCDCPath.toUri.toString, 0, fileStatus.getLen) recordIter = parquetReader(pf) case BASE_FILE_DELETE => - assert(currentChangeFile.getBeforeFileSlice.isPresent) - recordIter = loadFileSlice(currentChangeFile.getBeforeFileSlice.get) + assert(currentCDCFileSplit.getBeforeFileSlice.isPresent) + recordIter = loadFileSlice(currentCDCFileSplit.getBeforeFileSlice.get) case LOG_FILE => - assert(currentChangeFile.getCdcFiles != null && currentChangeFile.getCdcFiles.size() == 1 - && currentChangeFile.getBeforeFileSlice.isPresent) - loadBeforeFileSliceIfNeeded(currentChangeFile.getBeforeFileSlice.get) - val absLogPath = new Path(basePath, currentChangeFile.getCdcFiles.get(0)) + assert(currentCDCFileSplit.getCdcFiles != null && currentCDCFileSplit.getCdcFiles.size() == 1 + && currentCDCFileSplit.getBeforeFileSlice.isPresent) + loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get) + val absLogPath = new Path(basePath, currentCDCFileSplit.getCdcFiles.get(0)) val morSplit = HoodieMergeOnReadFileSplit(None, List(new HoodieLogFile(fs.getFileStatus(absLogPath)))) val logFileIterator = new LogFileIterator(morSplit, originTableSchema, originTableSchema, tableState, conf) logRecordIter = logFileIterator.logRecordsPairIterator case AS_IS => - assert(currentChangeFile.getCdcFiles != null && !currentChangeFile.getCdcFiles.isEmpty) + assert(currentCDCFileSplit.getCdcFiles != null && !currentCDCFileSplit.getCdcFiles.isEmpty) // load beforeFileSlice to beforeImageRecords - if (currentChangeFile.getBeforeFileSlice.isPresent) { - loadBeforeFileSliceIfNeeded(currentChangeFile.getBeforeFileSlice.get) + if (currentCDCFileSplit.getBeforeFileSlice.isPresent) { + loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get) } // load afterFileSlice to afterImageRecords - if (currentChangeFile.getAfterFileSlice.isPresent) { - val iter = loadFileSlice(currentChangeFile.getAfterFileSlice.get()) + if (currentCDCFileSplit.getAfterFileSlice.isPresent) { + val iter = loadFileSlice(currentCDCFileSplit.getAfterFileSlice.get()) afterImageRecords = mutable.Map.empty iter.foreach { row => val key = getRecordKey(row) @@ -451,13 +448,13 @@ class HoodieCDCRDD( } } - val cdcLogFiles = currentChangeFile.getCdcFiles.asScala.map { cdcFile => + val cdcLogFiles = currentCDCFileSplit.getCdcFiles.asScala.map { cdcFile => new HoodieLogFile(fs.getFileStatus(new Path(basePath, cdcFile))) }.toArray cdcLogRecordIterator = new HoodieCDCLogRecordIterator(fs, cdcLogFiles, cdcAvroSchema) case REPLACE_COMMIT => - if (currentChangeFile.getBeforeFileSlice.isPresent) { - loadBeforeFileSliceIfNeeded(currentChangeFile.getBeforeFileSlice.get) + if (currentCDCFileSplit.getBeforeFileSlice.isPresent) { + loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get) } recordIter = beforeImageRecords.values.map { record => deserialize(record) @@ -467,7 +464,7 @@ class HoodieCDCRDD( resetRecordFormat() } else { currentInstant = null - currentChangeFile = null + currentCDCFileSplit = null } } @@ -475,7 +472,7 @@ class HoodieCDCRDD( * Initialize the partial fields of the data to be returned in advance to speed up. */ private def resetRecordFormat(): Unit = { - recordToLoad = currentChangeFile.getCdcInferCase match { + recordToLoad = currentCDCFileSplit.getCdcInferCase match { case BASE_FILE_INSERT => InternalRow.fromSeq(Array( CDCRelation.CDC_OPERATION_INSERT, convertToUTF8String(currentInstant), diff --git a/rfc/rfc-51/rfc-51.md b/rfc/rfc-51/rfc-51.md index 18b76116c86..29115b46344 100644 --- a/rfc/rfc-51/rfc-51.md +++ b/rfc/rfc-51/rfc-51.md @@ -202,7 +202,7 @@ tblproperties ( ### How to infer CDC results -| `HoodieCDCInferCase` | Infer case details | Infer logic | Note | +| `HoodieCDCInferenceCase` | Infer case details | Infer logic | Note | |----------------------|---------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------| | `AS_IS` | CDC file written (suffix contains `-cdc`) alongside base files (COW) or log files (MOR) | CDC info will be extracted as is | the read-optimized way to read CDC | | `BASE_FILE_INSERT` | Base files were written to a new file group | All records (in the current commit): `op=I`, `before=null`, `after=<current value>` | on-the-fly inference |
