prasannarajaperumal commented on code in PR #5885:
URL: https://github.com/apache/hudi/pull/5885#discussion_r922989756


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -102,6 +117,15 @@
   protected Map<String, HoodieRecord<T>> keyToNewRecords;
   protected Set<String> writtenRecordKeys;
   protected HoodieFileWriter<IndexedRecord> fileWriter;
+  // a flag that indicate whether allow the change data to write out a cdc log 
file.
+  protected boolean cdcEnabled = false;

Review Comment:
   Create a sub-class of HoodieAppendHandle - HoodieChangeTrackingAppendHandle 
and move all the code related to persisting row-level change tracking metadata 
to the subclass. I prefer naming all methods/parameters as changeTracking 
instead of CDC. CDC is a feature, ChangeTracking is the action you do during 
write. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCRelation.scala:
##########
@@ -0,0 +1,414 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.cdc
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
HoodieDataSourceHelper, HoodieTableSchema, SparkAdapterSupport}
+import org.apache.hudi.HoodieConversionUtils._
+import org.apache.hudi.common.table.cdc.CDCFileTypeEnum._
+import org.apache.hudi.common.table.cdc.CDCUtils._
+import org.apache.hudi.common.table.cdc.CDCOperationEnum._
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, 
HoodieCommitMetadata, HoodieFileFormat, HoodieFileGroupId, HoodieLogFile, 
HoodieReplaceCommitMetadata, HoodieWriteStat, WriteOperationType}
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.table.timeline.HoodieTimeline._
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.TableSchemaResolver
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.exception.{HoodieException, HoodieNotSupportedException}
+import org.apache.hudi.internal.schema.InternalSchema
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.{Row, SQLContext, SparkSession}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.{Failure, Success, Try}
+
+class CDCRelation(
+    override val sqlContext: SQLContext,
+    metaClient: HoodieTableMetaClient,
+    cdcSupplementalLogging: Boolean,
+    startInstant: String,
+    endInstant: String,
+    options: Map[String, String]
+) extends BaseRelation with PrunedFilteredScan with Logging {
+
+  val spark: SparkSession = sqlContext.sparkSession
+
+  val fs: FileSystem = metaClient.getFs.getFileSystem
+
+  val basePath: Path = metaClient.getBasePathV2
+
+  val (tableAvroSchema, _) = {
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    val avroSchema = Try(schemaUtil.getTableAvroSchema) match {
+      case Success(schema) => schema
+      case Failure(e) =>
+        throw new IllegalArgumentException("Failed to fetch schema from the 
table", e)
+    }
+    // try to find internalSchema
+    val internalSchemaFromMeta = try {
+      
schemaUtil.getTableInternalSchemaFromCommitMetadata.orElse(InternalSchema.getEmptyInternalSchema)
+    } catch {
+      case _: Exception => InternalSchema.getEmptyInternalSchema
+    }
+    (avroSchema, internalSchemaFromMeta)
+  }
+
+  val tableStructSchema: StructType = 
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+
+  val commits: Map[HoodieInstant, HoodieCommitMetadata] =
+    CDCRelation.getCompletedCommitInstantInSpecifiedRange(metaClient, 
startInstant, endInstant)
+
+  /**
+   * Parse the commit metadata between (startInstant, endInstant], and extract 
the touched partitions
+   * and files to build the filesystem view.
+   */
+  lazy val fsView: HoodieTableFileSystemView = {
+    val touchedPartition = commits.flatMap { case (_, commitMetadata) =>
+      val partitionSet = commitMetadata.getPartitionToWriteStats.keySet()
+      val replacedPartitionSet = commitMetadata match {
+        case replaceCommitMetadata: HoodieReplaceCommitMetadata =>
+          replaceCommitMetadata.getPartitionToReplaceFileIds.keySet().asScala
+        case _ => Set.empty[String]
+      }
+      partitionSet.asScala ++ replacedPartitionSet
+    }.toSet
+    val touchedFiles = touchedPartition.flatMap { partition =>
+      val partitionPath = FSUtils.getPartitionPath(basePath, partition)
+      fs.listStatus(partitionPath)
+    }.toArray
+    new HoodieTableFileSystemView(metaClient, 
metaClient.getCommitsTimeline.filterCompletedInstants, touchedFiles)
+  }
+
+  /**
+   * At the granularity of a file group, trace the mapping between each 
commit/instant and changes to this file group.
+   */
+  val changeFilesForPerFileGroupAndCommit: Map[HoodieFileGroupId, 
HoodieCDCFileGroupSplit] = {
+    val fgToCommitChanges = mutable.Map.empty[HoodieFileGroupId,
+      mutable.Map[HoodieInstant, ChangeFileForSingleFileGroupAndCommit]]
+
+    commits.foreach {
+      case (instant, commitMetadata) =>
+        // parse `partitionToWriteStats` in the metadata of commit
+        commitMetadata.getPartitionToWriteStats.asScala.foreach {
+          case (partition, hoodieWriteStats) =>
+            hoodieWriteStats.asScala.foreach { writeStat =>
+              val fileGroupId = new HoodieFileGroupId(partition, 
writeStat.getFileId)
+              // Identify the CDC source involved in this commit and
+              // determine its type for subsequent loading using different 
methods.
+              val changeFile = parseWriteStat(fileGroupId, instant, writeStat,
+                commitMetadata.getOperationType == WriteOperationType.DELETE)
+              if (fgToCommitChanges.contains(fileGroupId)) {
+                fgToCommitChanges(fileGroupId)(instant) = changeFile
+              } else {
+                fgToCommitChanges.put(fileGroupId, mutable.Map(instant -> 
changeFile))
+              }
+            }
+        }
+
+        // parse `partitionToReplaceFileIds` in the metadata of commit
+        commitMetadata match {
+          case replaceCommitMetadata: HoodieReplaceCommitMetadata =>
+            replaceCommitMetadata.getPartitionToReplaceFileIds.asScala.foreach 
{
+              case (partition, fileIds) =>
+                fileIds.asScala.foreach { fileId =>
+                  toScalaOption(fsView.fetchLatestFileSlice(partition, 
fileId)).foreach {
+                    fileSlice =>
+                      val fileGroupId = new HoodieFileGroupId(partition, 
fileId)
+                      val changeFile =
+                        
ChangeFileForSingleFileGroupAndCommit(REPLACED_FILE_GROUP, null, 
Some(fileSlice))
+                      if (fgToCommitChanges.contains(fileGroupId)) {
+                        fgToCommitChanges(fileGroupId)(instant) = changeFile
+                      } else {
+                        fgToCommitChanges.put(fileGroupId, mutable.Map(instant 
-> changeFile))
+                      }
+                  }
+                }
+            }
+          case _ =>
+        }
+      case _ =>
+    }
+    fgToCommitChanges.map { case (fgId, instantToChanges) =>
+      (fgId, HoodieCDCFileGroupSplit(instantToChanges.toArray.sortBy(_._1)))
+    }.toMap
+  }
+
+  override final def needConversion: Boolean = false
+
+  override def schema: StructType = CDCRelation.CDC_SPARK_SCHEMA
+
+  override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
+    val internalRows = buildScan0(requiredColumns, filters)
+    internalRows.asInstanceOf[RDD[Row]]
+  }
+
+  def buildScan0(requiredColumns: Array[String], filters: Array[Filter]): 
RDD[InternalRow] = {
+    val nameToField = schema.fields.map(f => f.name -> f).toMap
+    val requiredSchema = StructType(requiredColumns.map(nameToField))
+    val originTableSchema = HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString)
+    val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
+      sparkSession = spark,
+      dataSchema = tableStructSchema,
+      partitionSchema = StructType(Nil),
+      requiredSchema = tableStructSchema,
+      filters = Nil,
+      options = options,
+      hadoopConf = spark.sessionState.newHadoopConf()
+    )
+    val cdcRdd = new HoodieCDCRDD(
+      spark,
+      metaClient,
+      cdcSupplementalLogging,
+      parquetReader,
+      originTableSchema,
+      schema,
+      requiredSchema,
+      changeFilesForPerFileGroupAndCommit.values.toArray
+    )
+    cdcRdd.asInstanceOf[RDD[InternalRow]]
+  }
+
+  /**
+   * Parse HoodieWriteStat, judge which type the file is, and what strategy 
should be used to parse CDC data.
+   * Then build a [[ChangeFileForSingleFileGroupAndCommit]] object.
+   */
+  private def parseWriteStat(

Review Comment:
   Does it make sense to generalize this out of Spark and make the logic to 
identify the different CDC types and load them common to all clients?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/CDCFileTypeEnum.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+/**
+ * Here define four cdc file types. The different cdc file type will decide 
which file will be
+ * used to extract the change data, and how to do this.
+ *
+ * CDC_LOG_FILE:
+ *   For this type, there must be a real cdc log file from which we get the 
whole/part change data.
+ *   when `hoodie.table.cdc.supplemental.logging` is true, it keeps all the 
fields about the
+ *   change data, including `op`, `ts_ms`, `before` and `after`. So read it 
and return directly,
+ *   no more other files need to be loaded.
+ *   when `hoodie.table.cdc.supplemental.logging` is false, it just keep the 
`op` and the key of
+ *   the changing record. When `op` is equal to 'i', `before` is null and get 
the current record
+ *   from the current base/log file as `after`. When `op` is equal to 'u', get 
the previous
+ *   record from the previous file slice as `before`, and get the current 
record from the
+ *   current base/log file `after`. When `op` is equal to 'd', get the 
previous record from
+ *   the previous file slice as `before`, and `after` is null.
+ *
+ * ADD_BASE_FILE:
+ *   For this type, there must be a base file at the current instant. All the 
records from this
+ *   file is new-coming, so we can load this, mark all the records with `i`, 
and treat them as
+ *   the value of `after`. The value of `before` for each record is null.
+ *
+ * REMOVE_BASE_FILE:
+ *   For this type, there must be an empty file at the current instant, but a 
non-empty base file
+ *   at the previous instant. First we find this base file that has the same 
file group and belongs
+ *   to the previous instant. Then load this, mark all the records with `d`, 
and treat them as
+ *   the value of `before`. The value of `after` for each record is null.
+ *
+ * MOR_LOG_FILE:
+ *   For this type, a normal log file of mor table will be used. First we need 
to load the previous
+ *   file slice(including the base file and other log files in the same file 
group). Then for each
+ *   record from the log file, get the key of this, and execute the following 
steps:
+ *     1) if the record is deleted,
+ *       a) if there is a record with the same key in the data loaded, `op` is 
'd', 'before' is the
+ *          record from the data loaded, `after` is null;
+ *       b) if there is not a record with the same key in the data loaded, 
just skip.
+ *     2) the record is not deleted,
+ *       a) if there is a record with the same key in the data loaded, `op` is 
'u', 'before' is the
+ *          record from the data loaded, `after` is the current record;
+ *       b) if there is not a record with the same key in the data loaded, 
`op` is 'i', 'before' is
+ *          null, `after` is the current record;
+ *
+ * REPLACED_FILE_GROUP:
+ *   For this type, it must be a replacecommit, like INSERT_OVERWRITE and 
DROP_PARTITION. It drops
+ *   a whole file group. First we find this file group. Then load this, mark 
all the records with
+ *   `d`, and treat them as the value of `before`. The value of `after` for 
each record is null.
+ */
+public enum CDCFileTypeEnum {
+
+  CDC_LOG_FILE,
+  ADD_BASE_File,

Review Comment:
   s/ADD_BASE_File/ADD_BASE_FILE



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -281,7 +313,18 @@ private boolean writeUpdateRecord(HoodieRecord<T> 
hoodieRecord, GenericRecord ol
         return false;
       }
     }
-    return writeRecord(hoodieRecord, indexedRecord, isDelete);
+    boolean result = writeRecord(hoodieRecord, indexedRecord, isDelete);
+    if (cdcEnabled) {
+      if (indexedRecord.isPresent()) {
+        GenericRecord record = (GenericRecord) indexedRecord.get();
+        cdcData.add(cdcRecord(CDCOperationEnum.UPDATE, 
hoodieRecord.getRecordKey(), hoodieRecord.getPartitionPath(),

Review Comment:
   We will be holding the record data in-memory until the handle is closed when 
supplemental logging is enabled. Any side-effects to be cautious about?
   We will be deflating the actual record once its written to the file and 
bloom filter calculation happens after - would there be significant memory 
pressure if we still hold on to the data for cdc and how do we handle this?



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java:
##########
@@ -44,6 +44,12 @@ public class HoodieWriteStat implements Serializable {
    */
   private String path;
 
+  /**
+   * Relative cdc file path that store the CDC data.
+   */
+  @Nullable
+  private String cdcPath;

Review Comment:
   ChangeTrackingStat



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to