alexeykudinkin commented on code in PR #6697:
URL: https://github.com/apache/hudi/pull/6697#discussion_r974740166


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java:
##########
@@ -358,11 +357,10 @@ private Pair<Boolean, List<CleanFileInfo>> 
getFilesToCleanKeepingLatestCommits(S
                 deletePaths.add(new 
CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
               }
             });
-            if (hoodieTable.getMetaClient().getTableType() == 
HoodieTableType.MERGE_ON_READ) {
-              // If merge on read, then clean the log files for the commits as 
well
-              deletePaths.addAll(aSlice.getLogFiles().map(lf -> new 
CleanFileInfo(lf.getPath().toString(), false))
-                  .collect(Collectors.toList()));
-            }
+            // since the cow tables may also write out the log files in cdc 
scenario, we need to clean the log files
+            // for this commit no matter the table type is mor or cow.
+            deletePaths.addAll(aSlice.getLogFiles().map(lf -> new 
CleanFileInfo(lf.getPath().toString(), false))

Review Comment:
   We should scope this: guard this by checking that CDC is enabled and only 
cleaning up CDC files (assuming we will have separate naming scheme for these).
   
   Overly broad conditionals like this one (cleaning all log-files) is a 
time-bomb.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.cdc.HoodieCDCOperation;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.AppendResult;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieUpsertException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates all the cdc-writing functions.
+ */
+public class HoodieCDCLogger implements Closeable {
+
+  private final String commitTime;
+
+  private final String keyField;
+
+  private final Schema dataSchema;
+
+  private final boolean populateMetaFields;
+
+  // writer for cdc data
+  private final HoodieLogFormat.Writer cdcWriter;
+
+  private final boolean cdcEnabled;
+
+  private final HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode;
+
+  private final Schema cdcSchema;
+
+  // the cdc data
+  private final Map<String, HoodieAvroPayload> cdcData;
+
+  public HoodieCDCLogger(
+      String commitTime,
+      HoodieWriteConfig config,
+      HoodieTableConfig tableConfig,
+      Schema schema,
+      HoodieLogFormat.Writer cdcWriter,
+      long maxInMemorySizeInBytes) {
+    try {
+      this.commitTime = commitTime;
+      this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema);
+      this.populateMetaFields = config.populateMetaFields();
+      this.keyField = populateMetaFields ? 
HoodieRecord.RECORD_KEY_METADATA_FIELD
+          : tableConfig.getRecordKeyFieldProp();
+      this.cdcWriter = cdcWriter;
+
+      this.cdcEnabled = 
config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED);
+      this.cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse(
+          
config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));
+
+      if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER))
 {
+        this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA;
+      } else if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE))
 {
+        this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE;
+      } else {
+        this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY;
+      }
+
+      this.cdcData = new ExternalSpillableMap<>(

Review Comment:
   No need to tackle in this PR, but want to call it out that accumulating 
records in memory will certainly be problematic from memory footprint (as well 
as GC) perspective.
   
   Note, that Spark does process either via iteration where it doesn't hold 
records (also limited to the micro-batch size if underlying format supports 
batch reads, otherwise it will hold just 1 record at a time) in memory for 
longer than the RDD execution chain requires. The only accumulation point is 
Parquet writer which is much more efficient though: a) relying on encoding as 
well as compression, b) storing binary/serialized date in memory



##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.hudi.exception.HoodieException;
+
+public class HoodieCDCUtils {
+
+  public static final String CDC_LOGFILE_SUFFIX = "-cdc";
+
+  /* the `op` column represents how a record is changed. */
+  public static final String CDC_OPERATION_TYPE = "op";
+
+  /* the `ts` column represents when a record is changed. */
+  public static final String CDC_COMMIT_TIMESTAMP = "ts";
+
+  /* the pre-image before one record is changed */
+  public static final String CDC_BEFORE_IMAGE = "before";
+
+  /* the post-image after one record is changed */
+  public static final String CDC_AFTER_IMAGE = "after";
+
+  /* the key of the changed record */
+  public static final String CDC_RECORD_KEY = "record_key";
+
+  public static final String[] CDC_COLUMNS = new String[] {
+      CDC_OPERATION_TYPE,
+      CDC_COMMIT_TIMESTAMP,
+      CDC_BEFORE_IMAGE,
+      CDC_AFTER_IMAGE
+  };
+
+  /**
+   * This is the standard CDC output format.
+   * Also, this is the schema of cdc log file in the case 
`hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'.
+   */
+  public static final String CDC_SCHEMA_STRING = 
"{\"type\":\"record\",\"name\":\"Record\","
+      + "\"fields\":["
+      + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"ts\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"before\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"after\",\"type\":[\"string\",\"null\"]}"
+      + "]}";
+
+  public static final Schema CDC_SCHEMA = new 
Schema.Parser().parse(CDC_SCHEMA_STRING);
+
+  /**
+   * The schema of cdc log file in the case 
`hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'.
+   */
+  public static final String CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING = 
"{\"type\":\"record\",\"name\":\"Record\","
+      + "\"fields\":["
+      + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"record_key\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"before\",\"type\":[\"string\",\"null\"]}"
+      + "]}";
+
+  public static final Schema CDC_SCHEMA_OP_RECORDKEY_BEFORE =
+      new Schema.Parser().parse(CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING);
+
+  /**
+   * The schema of cdc log file in the case 
`hoodie.table.cdc.supplemental.logging.mode` is 'op_key'.
+   */
+  public static final String CDC_SCHEMA_OP_AND_RECORDKEY_STRING = 
"{\"type\":\"record\",\"name\":\"Record\","
+      + "\"fields\":["
+      + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"record_key\",\"type\":[\"string\",\"null\"]}"
+      + "]}";
+
+  public static final Schema CDC_SCHEMA_OP_AND_RECORDKEY =
+      new Schema.Parser().parse(CDC_SCHEMA_OP_AND_RECORDKEY_STRING);
+
+  public static final Schema 
schemaBySupplementalLoggingMode(HoodieCDCSupplementalLoggingMode 
supplementalLoggingMode) {
+    switch (supplementalLoggingMode) {
+      case WITH_BEFORE_AFTER:
+        return CDC_SCHEMA;
+      case WITH_BEFORE:
+        return CDC_SCHEMA_OP_RECORDKEY_BEFORE;
+      case OP_KEY:
+        return CDC_SCHEMA_OP_AND_RECORDKEY;
+      default:
+        throw new HoodieException("not support this supplemental logging mode: 
" + supplementalLoggingMode);
+    }
+  }
+
+  /**
+   * Build the cdc record which has all the cdc fields when 
`hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'.
+   */
+  public static GenericData.Record cdcRecord(
+      String op, String commitTime, GenericRecord before, GenericRecord after) 
{
+    String beforeJsonStr = recordToJson(before);

Review Comment:
   BTW, no need to tackle it in this PR, but want to kickoff the conversation 
on that



##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.hudi.exception.HoodieException;
+
+public class HoodieCDCUtils {
+
+  public static final String CDC_LOGFILE_SUFFIX = "-cdc";
+
+  /* the `op` column represents how a record is changed. */
+  public static final String CDC_OPERATION_TYPE = "op";
+
+  /* the `ts` column represents when a record is changed. */
+  public static final String CDC_COMMIT_TIMESTAMP = "ts";
+
+  /* the pre-image before one record is changed */
+  public static final String CDC_BEFORE_IMAGE = "before";
+
+  /* the post-image after one record is changed */
+  public static final String CDC_AFTER_IMAGE = "after";
+
+  /* the key of the changed record */
+  public static final String CDC_RECORD_KEY = "record_key";
+
+  public static final String[] CDC_COLUMNS = new String[] {
+      CDC_OPERATION_TYPE,
+      CDC_COMMIT_TIMESTAMP,
+      CDC_BEFORE_IMAGE,
+      CDC_AFTER_IMAGE
+  };
+
+  /**
+   * This is the standard CDC output format.
+   * Also, this is the schema of cdc log file in the case 
`hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'.
+   */
+  public static final String CDC_SCHEMA_STRING = 
"{\"type\":\"record\",\"name\":\"Record\","
+      + "\"fields\":["
+      + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"ts\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"before\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"after\",\"type\":[\"string\",\"null\"]}"
+      + "]}";
+
+  public static final Schema CDC_SCHEMA = new 
Schema.Parser().parse(CDC_SCHEMA_STRING);
+
+  /**
+   * The schema of cdc log file in the case 
`hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'.
+   */
+  public static final String CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING = 
"{\"type\":\"record\",\"name\":\"Record\","
+      + "\"fields\":["
+      + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"record_key\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"before\",\"type\":[\"string\",\"null\"]}"
+      + "]}";
+
+  public static final Schema CDC_SCHEMA_OP_RECORDKEY_BEFORE =
+      new Schema.Parser().parse(CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING);
+
+  /**
+   * The schema of cdc log file in the case 
`hoodie.table.cdc.supplemental.logging.mode` is 'op_key'.
+   */
+  public static final String CDC_SCHEMA_OP_AND_RECORDKEY_STRING = 
"{\"type\":\"record\",\"name\":\"Record\","
+      + "\"fields\":["
+      + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"record_key\",\"type\":[\"string\",\"null\"]}"
+      + "]}";
+
+  public static final Schema CDC_SCHEMA_OP_AND_RECORDKEY =
+      new Schema.Parser().parse(CDC_SCHEMA_OP_AND_RECORDKEY_STRING);
+
+  public static final Schema 
schemaBySupplementalLoggingMode(HoodieCDCSupplementalLoggingMode 
supplementalLoggingMode) {
+    switch (supplementalLoggingMode) {
+      case WITH_BEFORE_AFTER:
+        return CDC_SCHEMA;
+      case WITH_BEFORE:
+        return CDC_SCHEMA_OP_RECORDKEY_BEFORE;
+      case OP_KEY:
+        return CDC_SCHEMA_OP_AND_RECORDKEY;
+      default:
+        throw new HoodieException("not support this supplemental logging mode: 
" + supplementalLoggingMode);
+    }
+  }
+
+  /**
+   * Build the cdc record which has all the cdc fields when 
`hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'.
+   */
+  public static GenericData.Record cdcRecord(
+      String op, String commitTime, GenericRecord before, GenericRecord after) 
{
+    String beforeJsonStr = recordToJson(before);

Review Comment:
   I think all of my comments remained in the other PR; repeating here:
   
   Why we're using JSON for the records? Shall we use Avro at least to reduce 
the footprint?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.cdc.HoodieCDCOperation;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.AppendResult;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieUpsertException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates all the cdc-writing functions.
+ */
+public class HoodieCDCLogger implements Closeable {
+
+  private final String commitTime;
+
+  private final String keyField;
+
+  private final Schema dataSchema;
+
+  private final boolean populateMetaFields;
+
+  // writer for cdc data
+  private final HoodieLogFormat.Writer cdcWriter;
+
+  private final boolean cdcEnabled;
+
+  private final HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode;
+
+  private final Schema cdcSchema;
+
+  // the cdc data
+  private final Map<String, HoodieAvroPayload> cdcData;
+
+  public HoodieCDCLogger(
+      String commitTime,
+      HoodieWriteConfig config,
+      HoodieTableConfig tableConfig,
+      Schema schema,
+      HoodieLogFormat.Writer cdcWriter,
+      long maxInMemorySizeInBytes) {
+    try {
+      this.commitTime = commitTime;
+      this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema);
+      this.populateMetaFields = config.populateMetaFields();
+      this.keyField = populateMetaFields ? 
HoodieRecord.RECORD_KEY_METADATA_FIELD
+          : tableConfig.getRecordKeyFieldProp();
+      this.cdcWriter = cdcWriter;
+
+      this.cdcEnabled = 
config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED);
+      this.cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse(
+          
config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));
+
+      if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER))
 {
+        this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA;
+      } else if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE))
 {
+        this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE;
+      } else {
+        this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY;
+      }
+
+      this.cdcData = new ExternalSpillableMap<>(
+          maxInMemorySizeInBytes,
+          config.getSpillableMapBasePath(),
+          new DefaultSizeEstimator<>(),
+          new DefaultSizeEstimator<>(),
+          config.getCommonConfig().getSpillableDiskMapType(),
+          config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()
+      );
+    } catch (IOException e) {
+      throw new HoodieUpsertException("Failed to initialize HoodieCDCLogger", 
e);
+    }
+  }
+
+  public void put(HoodieRecord hoodieRecord,
+                  GenericRecord oldRecord,
+                  Option<IndexedRecord> newRecord) {
+    if (cdcEnabled) {
+      String recordKey = hoodieRecord.getRecordKey();
+      GenericData.Record cdcRecord;
+      if (newRecord.isPresent()) {
+        GenericRecord record = (GenericRecord) newRecord.get();
+        if (oldRecord == null) {
+          // inserted cdc record
+          cdcRecord = createCDCRecord(HoodieCDCOperation.INSERT, recordKey,
+              null, record);
+        } else {
+          // updated cdc record
+          cdcRecord = createCDCRecord(HoodieCDCOperation.UPDATE, recordKey,
+              oldRecord, record);
+        }
+      } else {
+        // deleted cdc record
+        cdcRecord = createCDCRecord(HoodieCDCOperation.DELETE, recordKey,
+            oldRecord, null);
+      }
+      cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord)));
+    }
+  }
+
+  private GenericData.Record createCDCRecord(HoodieCDCOperation operation,
+                                             String recordKey,
+                                             GenericRecord oldRecord,
+                                             GenericRecord newRecord) {
+    GenericData.Record record;
+    if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER))
 {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), commitTime,
+          removeCommitMetadata(oldRecord), newRecord);
+    } else if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE))
 {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey,
+          removeCommitMetadata(oldRecord));
+    } else {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey);
+    }
+    return record;
+  }
+
+  private GenericRecord removeCommitMetadata(GenericRecord record) {
+    if (record != null && populateMetaFields) {
+      GenericData.Record newRecord = new GenericData.Record(dataSchema);
+      for (Schema.Field field : dataSchema.getFields()) {
+        newRecord.put(field.name(), record.get(field.name()));
+      }
+      return newRecord;
+    }
+    return record;
+  }
+
+  public boolean isEmpty() {
+    return !this.cdcEnabled || this.cdcData.isEmpty();
+  }
+
+  public Option<AppendResult> writeCDCData() {
+    if (isEmpty()) {
+      return Option.empty();
+    }
+    try {
+      Map<HoodieLogBlock.HeaderMetadataType, String> header = 
buildCDCBlockHeader();
+      List<IndexedRecord> records = cdcData.values().stream()
+          .map(record -> {
+            try {
+              return record.getInsertValue(cdcSchema).get();
+            } catch (IOException e) {
+              throw new HoodieIOException("Failed to get cdc record", e);
+            }
+          }).collect(Collectors.toList());
+      HoodieLogBlock block = new HoodieCDCDataBlock(records, header, keyField);
+      AppendResult result = 
cdcWriter.appendBlocks(Collections.singletonList(block));
+
+      // call close to trigger the data flush.
+      this.close();
+
+      return Option.of(result);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to write the cdc data to " + 
cdcWriter.getLogFile().getPath(), e);
+    }
+  }
+
+  private Map<HoodieLogBlock.HeaderMetadataType, String> buildCDCBlockHeader() 
{
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime);
+    if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER))
 {

Review Comment:
   nit: We can reduce conditional to pick the schema, then insert into header 
after



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.cdc.HoodieCDCOperation;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.AppendResult;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieUpsertException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates all the cdc-writing functions.
+ */
+public class HoodieCDCLogger implements Closeable {
+
+  private final String commitTime;
+
+  private final String keyField;
+
+  private final Schema dataSchema;
+
+  private final boolean populateMetaFields;
+
+  // writer for cdc data
+  private final HoodieLogFormat.Writer cdcWriter;
+
+  private final boolean cdcEnabled;
+
+  private final HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode;
+
+  private final Schema cdcSchema;
+
+  // the cdc data
+  private final Map<String, HoodieAvroPayload> cdcData;
+
+  public HoodieCDCLogger(
+      String commitTime,
+      HoodieWriteConfig config,
+      HoodieTableConfig tableConfig,
+      Schema schema,
+      HoodieLogFormat.Writer cdcWriter,
+      long maxInMemorySizeInBytes) {
+    try {
+      this.commitTime = commitTime;
+      this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema);
+      this.populateMetaFields = config.populateMetaFields();
+      this.keyField = populateMetaFields ? 
HoodieRecord.RECORD_KEY_METADATA_FIELD
+          : tableConfig.getRecordKeyFieldProp();
+      this.cdcWriter = cdcWriter;
+
+      this.cdcEnabled = 
config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED);
+      this.cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse(
+          
config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));
+
+      if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER))
 {
+        this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA;
+      } else if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE))
 {
+        this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE;
+      } else {
+        this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY;
+      }
+
+      this.cdcData = new ExternalSpillableMap<>(
+          maxInMemorySizeInBytes,
+          config.getSpillableMapBasePath(),
+          new DefaultSizeEstimator<>(),
+          new DefaultSizeEstimator<>(),
+          config.getCommonConfig().getSpillableDiskMapType(),
+          config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()
+      );
+    } catch (IOException e) {
+      throw new HoodieUpsertException("Failed to initialize HoodieCDCLogger", 
e);
+    }
+  }
+
+  public void put(HoodieRecord hoodieRecord,
+                  GenericRecord oldRecord,
+                  Option<IndexedRecord> newRecord) {
+    if (cdcEnabled) {
+      String recordKey = hoodieRecord.getRecordKey();
+      GenericData.Record cdcRecord;
+      if (newRecord.isPresent()) {
+        GenericRecord record = (GenericRecord) newRecord.get();
+        if (oldRecord == null) {
+          // inserted cdc record
+          cdcRecord = createCDCRecord(HoodieCDCOperation.INSERT, recordKey,
+              null, record);
+        } else {
+          // updated cdc record
+          cdcRecord = createCDCRecord(HoodieCDCOperation.UPDATE, recordKey,
+              oldRecord, record);
+        }
+      } else {
+        // deleted cdc record
+        cdcRecord = createCDCRecord(HoodieCDCOperation.DELETE, recordKey,
+            oldRecord, null);
+      }
+      cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord)));
+    }
+  }
+
+  private GenericData.Record createCDCRecord(HoodieCDCOperation operation,
+                                             String recordKey,
+                                             GenericRecord oldRecord,
+                                             GenericRecord newRecord) {
+    GenericData.Record record;
+    if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER))
 {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), commitTime,
+          removeCommitMetadata(oldRecord), newRecord);
+    } else if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE))
 {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey,
+          removeCommitMetadata(oldRecord));
+    } else {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey);
+    }
+    return record;
+  }
+
+  private GenericRecord removeCommitMetadata(GenericRecord record) {
+    if (record != null && populateMetaFields) {
+      GenericData.Record newRecord = new GenericData.Record(dataSchema);
+      for (Schema.Field field : dataSchema.getFields()) {
+        newRecord.put(field.name(), record.get(field.name()));
+      }
+      return newRecord;
+    }
+    return record;
+  }
+
+  public boolean isEmpty() {
+    return !this.cdcEnabled || this.cdcData.isEmpty();
+  }
+
+  public Option<AppendResult> writeCDCData() {
+    if (isEmpty()) {
+      return Option.empty();
+    }
+    try {
+      Map<HoodieLogBlock.HeaderMetadataType, String> header = 
buildCDCBlockHeader();
+      List<IndexedRecord> records = cdcData.values().stream()
+          .map(record -> {
+            try {
+              return record.getInsertValue(cdcSchema).get();
+            } catch (IOException e) {
+              throw new HoodieIOException("Failed to get cdc record", e);
+            }
+          }).collect(Collectors.toList());
+      HoodieLogBlock block = new HoodieCDCDataBlock(records, header, keyField);
+      AppendResult result = 
cdcWriter.appendBlocks(Collections.singletonList(block));
+
+      // call close to trigger the data flush.
+      this.close();
+
+      return Option.of(result);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to write the cdc data to " + 
cdcWriter.getLogFile().getPath(), e);
+    }
+  }
+
+  private Map<HoodieLogBlock.HeaderMetadataType, String> buildCDCBlockHeader() 
{
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime);
+    if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER))
 {
+      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
HoodieCDCUtils.CDC_SCHEMA_STRING);
+    } else if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE))
 {
+      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
+          HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING);
+    } else {
+      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
+          HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY_STRING);
+    }
+    return header;
+  }
+
+  @Override
+  public void close() {
+    try {
+      if (cdcWriter != null) {
+        cdcWriter.close();
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to close HoodieCDCLogger", e);
+    } finally {
+      cdcData.clear();
+    }
+  }
+
+  public static void setCDCStatIfNeeded(HoodieWriteStat stat,
+                                        String partitionPath,
+                                        HoodieCDCLogger cdcLogger,
+                                        long recordsWritten,
+                                        long insertRecordsWritten,
+                                        FileSystem fs) {
+    try {
+      if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == 
insertRecordsWritten)) {
+        // the following cases where we do not need to write out the cdc file:
+        // case 1: all the data from the previous file slice are deleted. and 
no new data is inserted;
+        // case 2: all the data are new-coming,
+        return;
+      }
+
+      Option<AppendResult> cdcResult = cdcLogger.writeCDCData();

Review Comment:
   Seems like my previous comment got displaced somehow:
   
   Let's extract invocation of `writeCDCData` into the caller's method. This 
method should be scoped to only set CDC info w/in the `HoodieWriteStat`



##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCSupplementalLoggingMode.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+public enum HoodieCDCSupplementalLoggingMode {
+  OP_KEY("op_key"),

Review Comment:
   Let's make these opts consistent among each other by either prefixing all of 
them w/ `cdc_` or not



##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCOperation.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+public enum HoodieCDCOperation {
+  INSERT("i"),
+  UPDATE("u"),
+  DELETE("d");
+
+  private final String value;
+
+  HoodieCDCOperation(String value) {
+    this.value = value;
+  }
+
+  public String getValue() {
+    return this.value;
+  }
+
+  public static HoodieCDCOperation parse(String value) {
+    if (value.equals("i")) {

Review Comment:
   nit: Can use switch-case for that



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.cdc.HoodieCDCOperation;
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.AppendResult;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieUpsertException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * This class encapsulates all the cdc-writing functions.
+ */
+public class HoodieCDCLogger implements Closeable {
+
+  private final String commitTime;
+
+  private final String keyField;
+
+  private final Schema dataSchema;
+
+  private final boolean populateMetaFields;
+
+  // writer for cdc data
+  private final HoodieLogFormat.Writer cdcWriter;
+
+  private final boolean cdcEnabled;
+
+  private final HoodieCDCSupplementalLoggingMode cdcSupplementalLoggingMode;
+
+  private final Schema cdcSchema;
+
+  // the cdc data
+  private final Map<String, HoodieAvroPayload> cdcData;
+
+  public HoodieCDCLogger(
+      String commitTime,
+      HoodieWriteConfig config,
+      HoodieTableConfig tableConfig,
+      Schema schema,
+      HoodieLogFormat.Writer cdcWriter,
+      long maxInMemorySizeInBytes) {
+    try {
+      this.commitTime = commitTime;
+      this.dataSchema = HoodieAvroUtils.removeMetadataFields(schema);
+      this.populateMetaFields = config.populateMetaFields();
+      this.keyField = populateMetaFields ? 
HoodieRecord.RECORD_KEY_METADATA_FIELD
+          : tableConfig.getRecordKeyFieldProp();
+      this.cdcWriter = cdcWriter;
+
+      this.cdcEnabled = 
config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED);
+      this.cdcSupplementalLoggingMode = HoodieCDCSupplementalLoggingMode.parse(
+          
config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE));
+
+      if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER))
 {
+        this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA;
+      } else if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE))
 {
+        this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_RECORDKEY_BEFORE;
+      } else {
+        this.cdcSchema = HoodieCDCUtils.CDC_SCHEMA_OP_AND_RECORDKEY;
+      }
+
+      this.cdcData = new ExternalSpillableMap<>(
+          maxInMemorySizeInBytes,
+          config.getSpillableMapBasePath(),
+          new DefaultSizeEstimator<>(),
+          new DefaultSizeEstimator<>(),
+          config.getCommonConfig().getSpillableDiskMapType(),
+          config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()
+      );
+    } catch (IOException e) {
+      throw new HoodieUpsertException("Failed to initialize HoodieCDCLogger", 
e);
+    }
+  }
+
+  public void put(HoodieRecord hoodieRecord,
+                  GenericRecord oldRecord,
+                  Option<IndexedRecord> newRecord) {
+    if (cdcEnabled) {
+      String recordKey = hoodieRecord.getRecordKey();
+      GenericData.Record cdcRecord;
+      if (newRecord.isPresent()) {
+        GenericRecord record = (GenericRecord) newRecord.get();
+        if (oldRecord == null) {
+          // inserted cdc record
+          cdcRecord = createCDCRecord(HoodieCDCOperation.INSERT, recordKey,
+              null, record);
+        } else {
+          // updated cdc record
+          cdcRecord = createCDCRecord(HoodieCDCOperation.UPDATE, recordKey,
+              oldRecord, record);
+        }
+      } else {
+        // deleted cdc record
+        cdcRecord = createCDCRecord(HoodieCDCOperation.DELETE, recordKey,
+            oldRecord, null);
+      }
+      cdcData.put(recordKey, new HoodieAvroPayload(Option.of(cdcRecord)));
+    }
+  }
+
+  private GenericData.Record createCDCRecord(HoodieCDCOperation operation,
+                                             String recordKey,
+                                             GenericRecord oldRecord,
+                                             GenericRecord newRecord) {
+    GenericData.Record record;
+    if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE_AFTER))
 {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), commitTime,
+          removeCommitMetadata(oldRecord), newRecord);
+    } else if 
(cdcSupplementalLoggingMode.equals(HoodieCDCSupplementalLoggingMode.WITH_BEFORE))
 {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey,
+          removeCommitMetadata(oldRecord));
+    } else {
+      record = HoodieCDCUtils.cdcRecord(operation.getValue(), recordKey);
+    }
+    return record;
+  }
+
+  private GenericRecord removeCommitMetadata(GenericRecord record) {

Review Comment:
   We can re-use `HoodieAvroUtils.rewriteRecordWithNewSchema`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to