xushiyan commented on code in PR #6476:
URL: https://github.com/apache/hudi/pull/6476#discussion_r956897079


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -465,23 +464,6 @@ public List<WriteStatus> writeStatuses() {
     return statuses;
   }
 
-  private Writer createLogWriter(Option<FileSlice> fileSlice, String 
baseCommitTime)

Review Comment:
   can you clarify why removed? is it due to refactoring or unused?



##########
hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala:
##########
@@ -313,7 +313,7 @@ class Spark31HoodieParquetFileFormat(private val 
shouldAppendPartitionValues: Bo
           }).toAttributes ++ partitionSchema.toAttributes
           val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
             if (typeChangeInfos.containsKey(i)) {
-              Cast(attr, typeChangeInfos.get(i).getLeft)
+              Cast(attr, typeChangeInfos.get(i).getLeft, 
Option(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)))

Review Comment:
   can you clarify this change please and can we move it to a separate PR if 
it's independent from cdc



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -399,9 +451,65 @@ protected void writeIncomingRecords() throws IOException {
     }
   }
 
+  protected SerializableRecord cdcRecord(CDCOperationEnum operation, String 
recordKey, String partitionPath,
+                                         GenericRecord oldRecord, 
GenericRecord newRecord) {
+    GenericData.Record record;
+    if 
(cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER))
 {
+      record = CDCUtils.cdcRecord(operation.getValue(), instantTime,
+          oldRecord, addCommitMetadata(newRecord, recordKey, partitionPath));
+    } else if 
(cdcSupplementalLoggingMode.equals(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE))
 {
+      record = CDCUtils.cdcRecord(operation.getValue(), recordKey, oldRecord);
+    } else {
+      record = CDCUtils.cdcRecord(operation.getValue(), recordKey);
+    }
+    return new SerializableRecord(record);
+  }
+
+  protected GenericRecord addCommitMetadata(GenericRecord record, String 
recordKey, String partitionPath) {
+    if (record != null && config.populateMetaFields()) {
+      GenericRecord rewriteRecord = rewriteRecord(record);
+      String seqId = HoodieRecord.generateSequenceId(instantTime, 
getPartitionId(), writtenRecordCount.get());
+      HoodieAvroUtils.addCommitMetadataToRecord(rewriteRecord, instantTime, 
seqId);
+      HoodieAvroUtils.addHoodieKeyToRecord(rewriteRecord, recordKey, 
partitionPath, newFilePath.getName());
+      return rewriteRecord;
+    }
+    return record;
+  }
+
+  protected AppendResult writeCDCData() {

Review Comment:
   can we use option? this return is nullable



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/TestCDCBase.scala:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional.cdc
+
+import org.apache.hudi.DataSourceReadOptions._
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.HoodieCommitMetadata
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieWriteConfig}
+import org.apache.hudi.testutils.HoodieClientTestBase
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.Assertions.assertEquals
+
+import scala.collection.JavaConverters._
+
+abstract class TestCDCBase extends HoodieClientTestBase {

Review Comment:
   ```suggestion
   abstract class HoodieCDCTestBase extends HoodieClientTestBase {
   ```
   
   the convention is Test* or *Test(s) are reserved for actual running test 
classes. test utils and helpers are to avoid the name patterns



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -127,6 +127,27 @@ public class HoodieTableConfig extends HoodieConfig {
       .withDocumentation("Columns used to uniquely identify the table. 
Concatenated values of these fields are used as "
           + " the record key component of HoodieKey.");
 
+  public static final ConfigProperty<Boolean> CDC_ENABLED = ConfigProperty
+      .key("hoodie.table.cdc.enabled")
+      .defaultValue(false)
+      .withDocumentation("When enable, persist the change data if necessary, 
and can be queried as a CDC query mode.");
+
+  // op and key
+  public static final String CDC_SUPPLEMENTAL_LOGGING_MODE_MINI = "op_key";
+  public static final String CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE = 
"cdc_data_before";
+  public static final String CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER = 
"cdc_data_before_after";
+
+  public static final ConfigProperty<String> CDC_SUPPLEMENTAL_LOGGING_MODE = 
ConfigProperty
+      .key("hoodie.table.cdc.supplemental.logging.mode")
+      .defaultValue(CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER)

Review Comment:
   can we make default KEY_OP instead of full logging? we want to minimize 
storage out of the box and only log as needed



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java:
##########
@@ -44,6 +44,12 @@ public class HoodieWriteStat implements Serializable {
    */
   private String path;
 
+  /**
+   * Relative cdc file path that store the CDC data.
+   */
+  @Nullable

Review Comment:
   should mark the getter nullable instead of on the instance var



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java:
##########
@@ -127,6 +127,27 @@ public class HoodieTableConfig extends HoodieConfig {
       .withDocumentation("Columns used to uniquely identify the table. 
Concatenated values of these fields are used as "
           + " the record key component of HoodieKey.");
 
+  public static final ConfigProperty<Boolean> CDC_ENABLED = ConfigProperty
+      .key("hoodie.table.cdc.enabled")
+      .defaultValue(false)
+      .withDocumentation("When enable, persist the change data if necessary, 
and can be queried as a CDC query mode.");
+
+  // op and key
+  public static final String CDC_SUPPLEMENTAL_LOGGING_MODE_MINI = "op_key";

Review Comment:
   `MINI` not a proper name for this mode. can you call it KEY_OP ?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/CDCFileTypeEnum.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+/**
+ * Here define four cdc file types. The different cdc file type will decide 
which file will be
+ * used to extract the change data, and how to do this.
+ *
+ * CDC_LOG_FILE:
+ *   For this type, there must be a real cdc log file from which we get the 
whole/part change data.
+ *   when `hoodie.table.cdc.supplemental.logging.mode` is 
'cdc_data_before_after', it keeps all the fields about the
+ *   change data, including `op`, `ts_ms`, `before` and `after`. So read it 
and return directly,
+ *   no more other files need to be loaded.
+ *   when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before', 
it keeps the `op`, the key and the
+ *   `before` of the changing record. When `op` is equal to 'i' or 'u', need 
to get the current record from the
+ *   current base/log file as `after`.
+ *   when `hoodie.table.cdc.supplemental.logging.mode` is 'op_key', it just 
keeps the `op` and the key of
+ *   the changing record. When `op` is equal to 'i', `before` is null and get 
the current record
+ *   from the current base/log file as `after`. When `op` is equal to 'u', get 
the previous
+ *   record from the previous file slice as `before`, and get the current 
record from the
+ *   current base/log file as `after`. When `op` is equal to 'd', get the 
previous record from
+ *   the previous file slice as `before`, and `after` is null.
+ *
+ * ADD_BASE_FILE:
+ *   For this type, there must be a base file at the current instant. All the 
records from this
+ *   file is new-coming, so we can load this, mark all the records with `i`, 
and treat them as
+ *   the value of `after`. The value of `before` for each record is null.
+ *
+ * REMOVE_BASE_FILE:
+ *   For this type, there must be an empty file at the current instant, but a 
non-empty base file
+ *   at the previous instant. First we find this base file that has the same 
file group and belongs
+ *   to the previous instant. Then load this, mark all the records with `d`, 
and treat them as
+ *   the value of `before`. The value of `after` for each record is null.
+ *
+ * MOR_LOG_FILE:
+ *   For this type, a normal log file of mor table will be used. First we need 
to load the previous
+ *   file slice(including the base file and other log files in the same file 
group). Then for each
+ *   record from the log file, get the key of this, and execute the following 
steps:
+ *     1) if the record is deleted,
+ *       a) if there is a record with the same key in the data loaded, `op` is 
'd', 'before' is the
+ *          record from the data loaded, `after` is null;
+ *       b) if there is not a record with the same key in the data loaded, 
just skip.
+ *     2) the record is not deleted,
+ *       a) if there is a record with the same key in the data loaded, `op` is 
'u', 'before' is the
+ *          record from the data loaded, `after` is the current record;
+ *       b) if there is not a record with the same key in the data loaded, 
`op` is 'i', 'before' is
+ *          null, `after` is the current record;
+ *
+ * REPLACED_FILE_GROUP:
+ *   For this type, it must be a replacecommit, like INSERT_OVERWRITE and 
DROP_PARTITION. It drops
+ *   a whole file group. First we find this file group. Then load this, mark 
all the records with
+ *   `d`, and treat them as the value of `before`. The value of `after` for 
each record is null.
+ */
+public enum CDCFileTypeEnum {

Review Comment:
   HoodieCDCLogicalFileType. Suffix Enum for an enum should be avoided



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -229,6 +230,10 @@ protected synchronized void scanInternal(Option<KeySpec> 
keySpecOpt) {
         HoodieLogBlock logBlock = logFormatReaderWrapper.next();
         final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
         totalLogBlocks.incrementAndGet();
+        if (logBlock.getBlockType() == CDC_DATA_BLOCK) {
+          // hit a cdc block, just skip.
+          continue;

Review Comment:
   get instant time above should be skipped if skipping here. also, should 
totalLogBlocks increment? pls confirm



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/CDCLogRecordReader.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.cdc.CDCUtils;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.util.ClosableIterator;
+
+import java.io.IOException;
+
+public class CDCLogRecordReader implements ClosableIterator<IndexedRecord> {
+
+  private final HoodieLogFile cdcLogFile;
+
+  private final HoodieLogFormat.Reader reader;
+
+  private ClosableIterator<IndexedRecord> itr;
+
+  public CDCLogRecordReader(FileSystem fs, Path cdcLogPath, String 
cdcSupplementalLoggingMode) throws IOException {
+    this.cdcLogFile = new HoodieLogFile(fs.getFileStatus(cdcLogPath));
+    this.reader = new HoodieLogFileReader(fs, cdcLogFile,
+      CDCUtils.schemaBySupplementalLoggingMode(cdcSupplementalLoggingMode),
+      HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false);
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (null == itr || !itr.hasNext()) {
+      if (reader.hasNext()) {
+        HoodieDataBlock dataBlock = (HoodieDataBlock) reader.next();
+        if (dataBlock.getBlockType() == 
HoodieLogBlock.HoodieLogBlockType.CDC_DATA_BLOCK) {
+          itr = dataBlock.getRecordIterator();
+          return itr.hasNext();
+        } else {
+          return hasNext();
+        }
+      }
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public IndexedRecord next() {
+    return itr.next();
+  }
+
+  @Override
+  public void close() {
+    try {
+      itr.close();
+      reader.close();
+    } catch (IOException e) {
+      e.printStackTrace();

Review Comment:
   avoid e.print. throw if not properly close



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java:
##########
@@ -116,12 +125,18 @@ public List<WriteStatus> close() {
         String key = newRecordKeysSorted.poll();
         HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
         if (!writtenRecordKeys.contains(hoodieRecord.getRecordKey())) {
+          Option<IndexedRecord> insertRecord;
           if (useWriterSchemaForCompaction) {
-            writeRecord(hoodieRecord, 
hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, 
config.getProps()));
+            insertRecord = 
hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, 
config.getProps());
           } else {
-            writeRecord(hoodieRecord, 
hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
+            insertRecord = hoodieRecord.getData().getInsertValue(tableSchema, 
config.getProps());
           }
+          writeRecord(hoodieRecord, insertRecord);

Review Comment:
   ditto



##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -507,6 +507,26 @@ public static FileStatus[] 
getAllDataFilesInPartition(FileSystem fs, Path partit
     }
   }
 
+  /**
+   * Get the path which has a specified commit and file group id.
+   */
+  public static FileStatus getBaseFile(FileSystem fs, Path partitionPath, 
String fileGroupId, String commitTime) {
+    PathFilter pathFilter = path -> path.getName().startsWith(fileGroupId)
+        && path.getName().endsWith(commitTime + 
HoodieFileFormat.PARQUET.getFileExtension());
+    try {
+      FileStatus[] statuses = fs.listStatus(partitionPath, pathFilter);

Review Comment:
   this fs listing and string matching is a performance hit. can you look into 
other APIs like fs view to find a performant way to do this?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java:
##########
@@ -93,13 +96,19 @@ public void write(GenericRecord oldRecord) {
         throw new HoodieUpsertException("Insert/Update not in sorted order");
       }
       try {
+        Option<IndexedRecord> insertRecord;
         if (useWriterSchemaForCompaction) {
-          writeRecord(hoodieRecord, 
hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, 
config.getProps()));
+          insertRecord = 
hoodieRecord.getData().getInsertValue(tableSchemaWithMetaFields, 
config.getProps());
         } else {
-          writeRecord(hoodieRecord, 
hoodieRecord.getData().getInsertValue(tableSchema, config.getProps()));
+          insertRecord = hoodieRecord.getData().getInsertValue(tableSchema, 
config.getProps());
         }
+        writeRecord(hoodieRecord, insertRecord);

Review Comment:
   let's avoid this kind of cdc-unrelated refactoring. there is a major 
refactoring work going on in release-rfc46 branch about abstracting hoodie 
record, which will touch all code path about hoodie records



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -358,11 +357,9 @@ private Pair<Boolean, List<CleanFileInfo>> 
getFilesToCleanKeepingLatestCommits(S
                 deletePaths.add(new 
CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
               }
             });
-            if (hoodieTable.getMetaClient().getTableType() == 
HoodieTableType.MERGE_ON_READ) {
-              // If merge on read, then clean the log files for the commits as 
well
-              deletePaths.addAll(aSlice.getLogFiles().map(lf -> new 
CleanFileInfo(lf.getPath().toString(), false))
-                  .collect(Collectors.toList()));
-            }
+            // clean the log files for the commits as well

Review Comment:
   can be edit this comment to explain this is for cdc cow scenario



##########
hudi-common/src/main/java/org/apache/hudi/avro/SerializableRecord.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.avro.generic.GenericData;
+
+import java.io.Serializable;
+
+/**
+ * In some cases like putting the [[GenericData.Record]] into 
[[ExternalSpillableMap]],
+ * objects is asked to extend [[Serializable]].
+ *
+ * This class wraps [[GenericData.Record]].
+ */
+public class SerializableRecord implements Serializable {

Review Comment:
   is there a way to avoid creating this model? how does the current spillable 
map persist records?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -425,11 +422,9 @@ private List<CleanFileInfo> 
getCleanFileInfoForSlice(FileSlice nextSlice) {
         cleanPaths.add(new 
CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true));
       }
     }
-    if (hoodieTable.getMetaClient().getTableType() == 
HoodieTableType.MERGE_ON_READ) {
-      // If merge on read, then clean the log files for the commits as well
-      cleanPaths.addAll(nextSlice.getLogFiles().map(lf -> new 
CleanFileInfo(lf.getPath().toString(), false))
-          .collect(Collectors.toList()));
-    }
+    // clean the log files for the commits as well

Review Comment:
   ditto



##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/CDCOperationEnum.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+public enum CDCOperationEnum {

Review Comment:
   HoodieCDCOperation. ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to