YannByron commented on code in PR #6727:
URL: https://github.com/apache/hudi/pull/6727#discussion_r979123476


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala:
##########
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cdc
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, 
IndexedRecord}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.HoodieBaseRelation.BaseFileReader
+import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, 
HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, 
HoodieUnsafeRDD, LogFileIterator, LogIteratorUtils, RecordMergingFileIterator, 
SparkAdapterSupport}
+import org.apache.hudi.HoodieConversionUtils._
+import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord, 
HoodieRecordPayload}
+import org.apache.hudi.common.table.HoodieTableConfig._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType._
+import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
+import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, 
HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
+import org.apache.hudi.common.table.log.HoodieCDCLogRecordReader
+import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
+import org.apache.spark.{Partition, SerializableWritable, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession}
+import org.apache.spark.sql.avro.HoodieAvroDeserializer
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.unsafe.types.UTF8String
+
+import java.io.Closeable
+import java.util.Properties
+import java.util.function.Predicate
+import java.util.stream.Collectors
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+ * The split that will be processed by spark task.
+ */
+case class HoodieCDCFileGroupSplit(
+    commitToChanges: Array[(HoodieInstant, HoodieCDCFileSplit)]
+)
+
+/**
+ * The Spark [[Partition]]'s implementation.
+ */
+case class HoodieCDCFileGroupPartition(
+    index: Int,
+    split: HoodieCDCFileGroupSplit
+) extends Partition
+
+class HoodieCDCRDD(
+    spark: SparkSession,
+    metaClient: HoodieTableMetaClient,
+    parquetReader: PartitionedFile => Iterator[InternalRow],
+    originTableSchema: HoodieTableSchema,
+    cdcSchema: StructType,
+    requiredCdcSchema: StructType,
+    changes: Array[HoodieCDCFileGroupSplit])
+  extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD {
+
+  @transient private val hadoopConf = spark.sparkContext.hadoopConfiguration
+
+  private val confBroadcast = spark.sparkContext.broadcast(new 
SerializableWritable(hadoopConf))
+
+  private val cdcSupplementalLoggingMode = 
HoodieCDCSupplementalLoggingMode.parse(
+    metaClient.getTableConfig.cdcSupplementalLoggingMode
+  )
+
+  private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty)
+
+  protected val payloadProps: Properties = 
Option(metaClient.getTableConfig.getPreCombineField)
+    .map { preCombineField =>
+      HoodiePayloadConfig.newBuilder
+        .withPayloadOrderingField(preCombineField)
+        .build
+        .getProps
+    }.getOrElse(new Properties())
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+    val cdcPartition = split.asInstanceOf[HoodieCDCFileGroupPartition]
+    new CDCFileGroupIterator(cdcPartition.split, metaClient)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    changes.zipWithIndex.map{ case (split, index) =>
+      HoodieCDCFileGroupPartition(index, split)
+    }.toArray
+  }
+
+  private class CDCFileGroupIterator(
+      split: HoodieCDCFileGroupSplit,
+      metaClient: HoodieTableMetaClient
+    ) extends Iterator[InternalRow] with SparkAdapterSupport with 
AvroDeserializerSupport with Closeable {
+
+    private val fs = metaClient.getFs.getFileSystem
+
+    private val conf = new Configuration(confBroadcast.value.value)
+
+    private val basePath = metaClient.getBasePathV2
+
+    private val tableConfig = metaClient.getTableConfig
+
+    private val populateMetaFields = tableConfig.populateMetaFields()
+
+    private val keyGenerator = {
+      val props = new TypedProperties()
+      props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, 
tableConfig.getKeyGeneratorClassName)
+      props.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, 
tableConfig.getRecordKeyFieldProp)
+      props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key, 
tableConfig.getPartitionFieldProp)
+      HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
+    }
+
+    private val recordKeyField: String = if (populateMetaFields) {
+      HoodieRecord.RECORD_KEY_METADATA_FIELD
+    } else {
+      val keyFields = metaClient.getTableConfig.getRecordKeyFields.get()
+      checkState(keyFields.length == 1)
+      keyFields.head
+    }
+
+    private val preCombineFieldOpt: Option[String] = 
Option(metaClient.getTableConfig.getPreCombineField)
+
+    private val tableState = {
+      val metadataConfig = HoodieMetadataConfig.newBuilder()
+        .fromProperties(props)
+        .build();
+      HoodieTableState(
+        pathToString(basePath),
+        split.commitToChanges.map(_._1.getTimestamp).max,
+        recordKeyField,
+        preCombineFieldOpt,
+        usesVirtualKeys = false,
+        metaClient.getTableConfig.getPayloadClass,
+        metadataConfig
+      )
+    }
+
+    private lazy val mapper: ObjectMapper = {

Review Comment:
   yes, we need this.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala:
##########
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cdc
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, 
IndexedRecord}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.HoodieBaseRelation.BaseFileReader
+import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, 
HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, 
HoodieUnsafeRDD, LogFileIterator, LogIteratorUtils, RecordMergingFileIterator, 
SparkAdapterSupport}
+import org.apache.hudi.HoodieConversionUtils._
+import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord, 
HoodieRecordPayload}
+import org.apache.hudi.common.table.HoodieTableConfig._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType._
+import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
+import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, 
HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
+import org.apache.hudi.common.table.log.HoodieCDCLogRecordReader
+import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
+import org.apache.spark.{Partition, SerializableWritable, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession}
+import org.apache.spark.sql.avro.HoodieAvroDeserializer
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.unsafe.types.UTF8String
+
+import java.io.Closeable
+import java.util.Properties
+import java.util.function.Predicate
+import java.util.stream.Collectors
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+ * The split that will be processed by spark task.
+ */
+case class HoodieCDCFileGroupSplit(
+    commitToChanges: Array[(HoodieInstant, HoodieCDCFileSplit)]
+)
+
+/**
+ * The Spark [[Partition]]'s implementation.
+ */
+case class HoodieCDCFileGroupPartition(
+    index: Int,
+    split: HoodieCDCFileGroupSplit
+) extends Partition
+
+class HoodieCDCRDD(
+    spark: SparkSession,
+    metaClient: HoodieTableMetaClient,
+    parquetReader: PartitionedFile => Iterator[InternalRow],
+    originTableSchema: HoodieTableSchema,
+    cdcSchema: StructType,
+    requiredCdcSchema: StructType,
+    changes: Array[HoodieCDCFileGroupSplit])
+  extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD {
+
+  @transient private val hadoopConf = spark.sparkContext.hadoopConfiguration
+
+  private val confBroadcast = spark.sparkContext.broadcast(new 
SerializableWritable(hadoopConf))
+
+  private val cdcSupplementalLoggingMode = 
HoodieCDCSupplementalLoggingMode.parse(
+    metaClient.getTableConfig.cdcSupplementalLoggingMode
+  )
+
+  private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty)
+
+  protected val payloadProps: Properties = 
Option(metaClient.getTableConfig.getPreCombineField)
+    .map { preCombineField =>
+      HoodiePayloadConfig.newBuilder
+        .withPayloadOrderingField(preCombineField)
+        .build
+        .getProps
+    }.getOrElse(new Properties())
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+    val cdcPartition = split.asInstanceOf[HoodieCDCFileGroupPartition]
+    new CDCFileGroupIterator(cdcPartition.split, metaClient)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    changes.zipWithIndex.map{ case (split, index) =>
+      HoodieCDCFileGroupPartition(index, split)
+    }.toArray
+  }
+
+  private class CDCFileGroupIterator(
+      split: HoodieCDCFileGroupSplit,
+      metaClient: HoodieTableMetaClient
+    ) extends Iterator[InternalRow] with SparkAdapterSupport with 
AvroDeserializerSupport with Closeable {
+
+    private val fs = metaClient.getFs.getFileSystem

Review Comment:
   done.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala:
##########
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cdc
+
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, 
IndexedRecord}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.HoodieBaseRelation.BaseFileReader
+import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, 
HoodieMergeOnReadFileSplit, HoodieTableSchema, HoodieTableState, 
HoodieUnsafeRDD, LogFileIterator, LogIteratorUtils, RecordMergingFileIterator, 
SparkAdapterSupport}
+import org.apache.hudi.HoodieConversionUtils._
+import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord, 
HoodieRecordPayload}
+import org.apache.hudi.common.table.HoodieTableConfig._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType._
+import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
+import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, 
HoodieCDCSupplementalLoggingMode, HoodieCDCUtils}
+import org.apache.hudi.common.table.log.HoodieCDCLogRecordReader
+import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.config.{HoodiePayloadConfig, HoodieWriteConfig}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
+import org.apache.spark.{Partition, SerializableWritable, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{HoodieCatalystExpressionUtils, SparkSession}
+import org.apache.spark.sql.avro.HoodieAvroDeserializer
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.unsafe.types.UTF8String
+
+import java.io.Closeable
+import java.util.Properties
+import java.util.function.Predicate
+import java.util.stream.Collectors
+import scala.annotation.tailrec
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+ * The split that will be processed by spark task.
+ */
+case class HoodieCDCFileGroupSplit(
+    commitToChanges: Array[(HoodieInstant, HoodieCDCFileSplit)]
+)
+
+/**
+ * The Spark [[Partition]]'s implementation.
+ */
+case class HoodieCDCFileGroupPartition(
+    index: Int,
+    split: HoodieCDCFileGroupSplit
+) extends Partition
+
+class HoodieCDCRDD(
+    spark: SparkSession,
+    metaClient: HoodieTableMetaClient,
+    parquetReader: PartitionedFile => Iterator[InternalRow],
+    originTableSchema: HoodieTableSchema,
+    cdcSchema: StructType,
+    requiredCdcSchema: StructType,
+    changes: Array[HoodieCDCFileGroupSplit])
+  extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD {
+
+  @transient private val hadoopConf = spark.sparkContext.hadoopConfiguration
+
+  private val confBroadcast = spark.sparkContext.broadcast(new 
SerializableWritable(hadoopConf))
+
+  private val cdcSupplementalLoggingMode = 
HoodieCDCSupplementalLoggingMode.parse(
+    metaClient.getTableConfig.cdcSupplementalLoggingMode
+  )
+
+  private val props = HoodieFileIndex.getConfigProperties(spark, Map.empty)
+
+  protected val payloadProps: Properties = 
Option(metaClient.getTableConfig.getPreCombineField)
+    .map { preCombineField =>
+      HoodiePayloadConfig.newBuilder
+        .withPayloadOrderingField(preCombineField)
+        .build
+        .getProps
+    }.getOrElse(new Properties())
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+    val cdcPartition = split.asInstanceOf[HoodieCDCFileGroupPartition]
+    new CDCFileGroupIterator(cdcPartition.split, metaClient)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    changes.zipWithIndex.map{ case (split, index) =>
+      HoodieCDCFileGroupPartition(index, split)
+    }.toArray
+  }
+
+  private class CDCFileGroupIterator(
+      split: HoodieCDCFileGroupSplit,
+      metaClient: HoodieTableMetaClient
+    ) extends Iterator[InternalRow] with SparkAdapterSupport with 
AvroDeserializerSupport with Closeable {
+
+    private val fs = metaClient.getFs.getFileSystem
+
+    private val conf = new Configuration(confBroadcast.value.value)
+
+    private val basePath = metaClient.getBasePathV2
+
+    private val tableConfig = metaClient.getTableConfig
+
+    private val populateMetaFields = tableConfig.populateMetaFields()
+
+    private val keyGenerator = {
+      val props = new TypedProperties()
+      props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, 
tableConfig.getKeyGeneratorClassName)
+      props.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, 
tableConfig.getRecordKeyFieldProp)
+      props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key, 
tableConfig.getPartitionFieldProp)
+      HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
+    }
+
+    private val recordKeyField: String = if (populateMetaFields) {
+      HoodieRecord.RECORD_KEY_METADATA_FIELD
+    } else {
+      val keyFields = metaClient.getTableConfig.getRecordKeyFields.get()
+      checkState(keyFields.length == 1)
+      keyFields.head
+    }
+
+    private val preCombineFieldOpt: Option[String] = 
Option(metaClient.getTableConfig.getPreCombineField)
+
+    private val tableState = {
+      val metadataConfig = HoodieMetadataConfig.newBuilder()
+        .fromProperties(props)
+        .build();
+      HoodieTableState(
+        pathToString(basePath),
+        split.commitToChanges.map(_._1.getTimestamp).max,
+        recordKeyField,
+        preCombineFieldOpt,
+        usesVirtualKeys = false,
+        metaClient.getTableConfig.getPayloadClass,
+        metadataConfig
+      )
+    }
+
+    private lazy val mapper: ObjectMapper = {
+      val _mapper = new ObjectMapper
+      _mapper.setSerializationInclusion(Include.NON_ABSENT)
+      _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false)
+      _mapper.registerModule(DefaultScalaModule)
+      _mapper
+    }
+
+    protected override val avroSchema: Schema = new 
Schema.Parser().parse(originTableSchema.avroSchemaStr)
+
+    protected override val structTypeSchema: StructType = 
originTableSchema.structTypeSchema
+
+    private val serializer = 
sparkAdapter.createAvroSerializer(originTableSchema.structTypeSchema,
+      avroSchema, nullable = false)
+
+    private val reusableRecordBuilder: GenericRecordBuilder = new 
GenericRecordBuilder(avroSchema)
+
+    private val cdcAvroSchema: Schema = 
HoodieCDCUtils.schemaBySupplementalLoggingMode(
+      cdcSupplementalLoggingMode,
+      HoodieAvroUtils.removeMetadataFields(avroSchema)
+    )
+
+    private val cdcSparkSchema: StructType = 
AvroConversionUtils.convertAvroSchemaToStructType(cdcAvroSchema)
+
+    /**
+     * the deserializer used to convert the CDC GenericRecord to Spark 
InternalRow.
+     */
+    private val cdcRecordDeserializer: HoodieAvroDeserializer = {
+      sparkAdapter.createAvroDeserializer(cdcAvroSchema, cdcSparkSchema)
+    }
+
+    private val projection: UnsafeProjection = 
generateUnsafeProjection(cdcSchema, requiredCdcSchema)
+
+    // iterator on cdc file
+    private val cdcFileIter = split.commitToChanges.sortBy(_._1).iterator
+
+    // The instant that is currently being processed
+    private var currentInstant: HoodieInstant = _
+
+    // The change file that is currently being processed
+    private var currentChangeFile: HoodieCDCFileSplit = _
+
+    /**
+     * two cases will use this to iterator the records:

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to