alexeykudinkin commented on code in PR #6476:
URL: https://github.com/apache/hudi/pull/6476#discussion_r971364028


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.SerializableRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.cdc.HoodieCDCOperation;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.AppendResult;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieUpsertException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class HoodieCDCLogger<T extends HoodieRecordPayload> implements 
Closeable {
+
+  private final String partitionPath;
+
+  private final String fileName;
+
+  private final String commitTime;
+
+  private final List<String> keyFields;
+
+  private final int taskPartitionId;
+
+  private final boolean populateMetaFields;
+
+  // writer for cdc data
+  private final HoodieLogFormat.Writer cdcWriter;
+
+  private final boolean cdcEnabled;
+
+  private final String cdcSupplementalLoggingMode;
+
+  // the cdc data
+  private final Map<String, SerializableRecord> cdcData;
+
+  private final Function<GenericRecord, GenericRecord> rewriteRecordFunc;
+
+  // the count of records currently being written, used to generate the same 
seqno for the cdc data
+  private final AtomicLong writtenRecordCount = new AtomicLong(-1);
+
+  public HoodieCDCLogger(
+      String partitionPath,
+      String fileName,
+      String commitTime,
+      HoodieWriteConfig config,
+      List<String> keyFields,
+      int taskPartitionId,
+      HoodieLogFormat.Writer cdcWriter,
+      long maxInMemorySizeInBytes,
+      Function<GenericRecord, GenericRecord> rewriteRecordFunc) {
+    try {
+      this.partitionPath = partitionPath;
+      this.fileName = fileName;
+      this.commitTime = commitTime;
+      this.keyFields = keyFields;
+      this.taskPartitionId = taskPartitionId;
+      this.populateMetaFields = config.populateMetaFields();
+      this.cdcWriter = cdcWriter;
+      this.rewriteRecordFunc = rewriteRecordFunc;
+
+      this.cdcEnabled = 
config.getBooleanOrDefault(HoodieTableConfig.CDC_ENABLED);
+      this.cdcSupplementalLoggingMode = 
config.getStringOrDefault(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE);
+      this.cdcData = new ExternalSpillableMap<>(
+          maxInMemorySizeInBytes,
+          config.getSpillableMapBasePath(),
+          new DefaultSizeEstimator<>(),
+          new DefaultSizeEstimator<>(),
+          config.getCommonConfig().getSpillableDiskMapType(),
+          config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()
+      );
+    } catch (IOException e) {
+      throw new HoodieUpsertException("Failed to initialize HoodieCDCLogger", 
e);
+    }
+  }
+
+  public void put(HoodieRecord<T> hoodieRecord, GenericRecord oldRecord, 
Option<IndexedRecord> indexedRecord) {
+    if (cdcEnabled) {
+      String recordKey;
+      if (oldRecord == null) {
+        recordKey = hoodieRecord.getRecordKey();
+      } else {
+        recordKey = StringUtils.join(

Review Comment:
   Please check my previous comment that we have to use proper KeyGen here



##########
hudi-common/src/main/java/org/apache/hudi/avro/SerializableRecord.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.avro.generic.GenericData;
+
+import java.io.Serializable;
+
+/**
+ * In some cases like putting the [[GenericData.Record]] into 
[[ExternalSpillableMap]],
+ * objects is asked to extend [[Serializable]].
+ *
+ * This class wraps [[GenericData.Record]].
+ */
+public class SerializableRecord implements Serializable {

Review Comment:
   This is not going to work -- Avro's `Schema` is not serializable by Java's 
default serialization protocol



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -385,6 +414,9 @@ protected void writeToFile(HoodieKey key, GenericRecord 
avroRecord, boolean shou
     } else {
       fileWriter.writeAvroWithMetadata(key, rewriteRecord(avroRecord));
     }
+    if (cdcEnabled) {
+      cdcLogger.getAndIncrement();

Review Comment:
   This should not be needed



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -399,9 +431,36 @@ protected void writeIncomingRecords() throws IOException {
     }
   }
 
+  private void setCDCStatIfNeeded(HoodieWriteStat stat) {
+    try {
+      Option<AppendResult> cdcResult;
+      if (cdcLogger == null || recordsWritten == 0L || (recordsWritten == 
insertRecordsWritten)) {
+        // the following cases where we do not need to write out the cdc file:
+        // case 1: all the data from the previous file slice are deleted. and 
no new data is inserted;
+        // case 2: all the data are new-coming,
+        cdcResult = Option.empty();
+      } else {
+        cdcResult = cdcLogger.writeCDCData();

Review Comment:
   Flushing CDC logger should be occurring w/in the Handles' `close` method not 
in `setCDCStatIfNeeded` (setter should just be tasked w/ setting up the 
WriteStat)



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java:
##########
@@ -44,6 +44,11 @@ public class HoodieWriteStat implements Serializable {
    */
   private String path;
 
+  /**
+   * Relative cdc file path that store the CDC data.

Review Comment:
   Let's stay consistent (make it CDC)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -114,6 +119,11 @@ protected HoodieWriteHandle(HoodieWriteConfig config, 
String instantTime, String
                               HoodieTable<T, I, K, O> hoodieTable, 
Option<Schema> overriddenSchema,
                               TaskContextSupplier taskContextSupplier) {
     super(config, Option.of(instantTime), hoodieTable);
+    if (config.populateMetaFields()) {

Review Comment:
   Please check my comment above regarding KeyGens



##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCLogicalFileType.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+/**
+ * Here define four cdc file types. The different cdc file type will decide 
which file will be
+ * used to extract the change data, and how to do this.
+ *
+ * CDC_LOG_FILE:
+ *   For this type, there must be a real cdc log file from which we get the 
whole/part change data.
+ *   when `hoodie.table.cdc.supplemental.logging.mode` is 
'cdc_data_before_after', it keeps all the fields about the
+ *   change data, including `op`, `ts_ms`, `before` and `after`. So read it 
and return directly,
+ *   no more other files need to be loaded.
+ *   when `hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before', 
it keeps the `op`, the key and the
+ *   `before` of the changing record. When `op` is equal to 'i' or 'u', need 
to get the current record from the
+ *   current base/log file as `after`.
+ *   when `hoodie.table.cdc.supplemental.logging.mode` is 'op_key', it just 
keeps the `op` and the key of
+ *   the changing record. When `op` is equal to 'i', `before` is null and get 
the current record
+ *   from the current base/log file as `after`. When `op` is equal to 'u', get 
the previous
+ *   record from the previous file slice as `before`, and get the current 
record from the
+ *   current base/log file as `after`. When `op` is equal to 'd', get the 
previous record from
+ *   the previous file slice as `before`, and `after` is null.
+ *
+ * ADD_BASE_FILE:
+ *   For this type, there must be a base file at the current instant. All the 
records from this
+ *   file is new-coming, so we can load this, mark all the records with `i`, 
and treat them as
+ *   the value of `after`. The value of `before` for each record is null.
+ *
+ * REMOVE_BASE_FILE:
+ *   For this type, there must be an empty file at the current instant, but a 
non-empty base file
+ *   at the previous instant. First we find this base file that has the same 
file group and belongs
+ *   to the previous instant. Then load this, mark all the records with `d`, 
and treat them as
+ *   the value of `before`. The value of `after` for each record is null.
+ *
+ * MOR_LOG_FILE:
+ *   For this type, a normal log file of mor table will be used. First we need 
to load the previous
+ *   file slice(including the base file and other log files in the same file 
group). Then for each
+ *   record from the log file, get the key of this, and execute the following 
steps:
+ *     1) if the record is deleted,

Review Comment:
   Why do we need to reconstruct the op here?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordReader.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.exception.HoodieIOException;
+
+import java.io.IOException;
+
+public class HoodieCDCLogRecordReader implements 
ClosableIterator<IndexedRecord> {
+
+  private final HoodieLogFile cdcLogFile;
+
+  private final HoodieLogFormat.Reader reader;
+
+  private ClosableIterator<IndexedRecord> itr;
+
+  public HoodieCDCLogRecordReader(
+      FileSystem fs,
+      Path cdcLogPath,
+      String cdcSupplementalLoggingMode) throws IOException {
+    this.cdcLogFile = new HoodieLogFile(fs.getFileStatus(cdcLogPath));
+    this.reader = new HoodieLogFileReader(fs, cdcLogFile,
+      
HoodieCDCUtils.schemaBySupplementalLoggingMode(cdcSupplementalLoggingMode),
+      HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false);
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (null == itr || !itr.hasNext()) {

Review Comment:
   Let's keep code style consistent (`itr == null`)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/EmptyRelation.scala:
##########
@@ -31,17 +32,21 @@ import scala.util.control.NonFatal
  * BaseRelation representing empty RDD.
  * @param sqlContext instance of SqlContext.
  */
-class EmptyRelation(val sqlContext: SQLContext, metaClient: 
HoodieTableMetaClient) extends BaseRelation with TableScan {
+class EmptyRelation(val sqlContext: SQLContext, metaClient: 
HoodieTableMetaClient, isCDCQuery: Boolean) extends BaseRelation with TableScan 
{

Review Comment:
   Instead, let's modify `EmptyRelation` to accept `schema`



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -210,17 +215,24 @@ class DefaultSource extends RelationProvider
     }
     val metaClient = HoodieTableMetaClient.builder().setConf(
       
sqlContext.sparkSession.sessionState.newHadoopConf()).setBasePath(path.get).build()
-    val schemaResolver = new TableSchemaResolver(metaClient)
-    val sqlSchema =
-      try {
-        val avroSchema = schemaResolver.getTableAvroSchema
-        AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
-      } catch {
-        case _: Exception =>
-          require(schema.isDefined, "Fail to resolve source schema")
-          schema.get
-      }
-    (shortName(), sqlSchema)
+
+    if (CDCRelation.isCDCTable(metaClient) &&
+        
parameters.get(QUERY_TYPE.key).contains(QUERY_TYPE_INCREMENTAL_OPT_VAL) &&

Review Comment:
   We can re-use `isCdcQuery`



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java:
##########
@@ -354,6 +354,11 @@ public Stream<HoodieInstant> getInstants() {
     return instants.stream();
   }
 
+  @Override
+  public List<HoodieInstant> getInstantsAsList() {
+    return instants;

Review Comment:
   Where's this method used? Can't find any usage



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.SerializableRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.cdc.HoodieCDCOperation;
+import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.log.AppendResult;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieUpsertException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class HoodieCDCLogger<T extends HoodieRecordPayload> implements 
Closeable {

Review Comment:
   I don't think we need to parameterize this by `HoodieRecordPayload`



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java:
##########
@@ -236,6 +241,44 @@ public static <T> T fromJsonString(String jsonStr, 
Class<T> clazz) throws Except
     return JsonUtils.getObjectMapper().readValue(jsonStr, clazz);
   }
 
+  /**
+   * parse the bytes of deltacommit, and get the base file and the log files 
belonging to this
+   * provided file group.
+   */
+  // TODO: refactor this method to avoid doing the json tree walking 
(HUDI-4822).
+  public static Option<Pair<String, List<String>>> 
getFileSliceForFileGroupFromDeltaCommit(

Review Comment:
   What's this method for? I don't even see it being used anywhere



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -210,17 +215,24 @@ class DefaultSource extends RelationProvider
     }
     val metaClient = HoodieTableMetaClient.builder().setConf(
       
sqlContext.sparkSession.sessionState.newHadoopConf()).setBasePath(path.get).build()
-    val schemaResolver = new TableSchemaResolver(metaClient)
-    val sqlSchema =
-      try {
-        val avroSchema = schemaResolver.getTableAvroSchema
-        AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
-      } catch {
-        case _: Exception =>
-          require(schema.isDefined, "Fail to resolve source schema")
-          schema.get
-      }
-    (shortName(), sqlSchema)
+
+    if (CDCRelation.isCDCTable(metaClient) &&

Review Comment:
   `isCDCEnabled` ("CDCTable" reference is confusing)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala:
##########
@@ -210,17 +215,24 @@ class DefaultSource extends RelationProvider
     }
     val metaClient = HoodieTableMetaClient.builder().setConf(
       
sqlContext.sparkSession.sessionState.newHadoopConf()).setBasePath(path.get).build()
-    val schemaResolver = new TableSchemaResolver(metaClient)
-    val sqlSchema =
-      try {
-        val avroSchema = schemaResolver.getTableAvroSchema
-        AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
-      } catch {
-        case _: Exception =>
-          require(schema.isDefined, "Fail to resolve source schema")
-          schema.get
-      }
-    (shortName(), sqlSchema)
+
+    if (CDCRelation.isCDCTable(metaClient) &&

Review Comment:
   Let's also limit this conditional to fetching schema:
   ```
   val schema = if (cdc) {
     // cdc-schema
   } else {
     // ...
   }
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.exception.HoodieException;
+
+public class HoodieCDCUtils {
+
+  /* the `op` column represents how a record is changed. */
+  public static final String CDC_OPERATION_TYPE = "op";
+
+  /* the `ts_ms` column represents when a record is changed. */
+  public static final String CDC_COMMIT_TIMESTAMP = "ts_ms";

Review Comment:
   We can just shorten this to `ts` and annotate that this corresponds to epoch 
(ms since 1970)



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java:
##########
@@ -70,6 +75,11 @@ public class HoodieWriteStat implements Serializable {
    */
   private long numInserts;
 
+  /**
+   * Total number of cdc bytes written.
+   */
+  private long cdcWriteBytes;

Review Comment:
   `cdcWrittenBytes`



##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCUtils.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.exception.HoodieException;
+
+public class HoodieCDCUtils {
+
+  /* the `op` column represents how a record is changed. */
+  public static final String CDC_OPERATION_TYPE = "op";
+
+  /* the `ts_ms` column represents when a record is changed. */
+  public static final String CDC_COMMIT_TIMESTAMP = "ts_ms";
+
+  /* the pre-image before one record is changed */
+  public static final String CDC_BEFORE_IMAGE = "before";
+
+  /* the post-image after one record is changed */
+  public static final String CDC_AFTER_IMAGE = "after";
+
+  /* the key of the changed record */
+  public static final String CDC_RECORD_KEY = "record_key";
+
+  public static final String[] CDC_COLUMNS = new String[] {
+      CDC_OPERATION_TYPE,
+      CDC_COMMIT_TIMESTAMP,
+      CDC_BEFORE_IMAGE,
+      CDC_AFTER_IMAGE
+  };
+
+  /**
+   * This is the standard CDC output format.
+   * Also, this is the schema of cdc log file in the case 
`hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'.
+   */
+  public static final String CDC_SCHEMA_STRING = 
"{\"type\":\"record\",\"name\":\"Record\","
+      + "\"fields\":["
+      + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"ts_ms\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"before\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"after\",\"type\":[\"string\",\"null\"]}"
+      + "]}";
+
+  public static final Schema CDC_SCHEMA = new 
Schema.Parser().parse(CDC_SCHEMA_STRING);
+
+  /**
+   * The schema of cdc log file in the case 
`hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before'.
+   */
+  public static final String CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING = 
"{\"type\":\"record\",\"name\":\"Record\","
+      + "\"fields\":["
+      + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"record_key\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"before\",\"type\":[\"string\",\"null\"]}"
+      + "]}";
+
+  public static final Schema CDC_SCHEMA_OP_RECORDKEY_BEFORE =
+      new Schema.Parser().parse(CDC_SCHEMA_OP_RECORDKEY_BEFORE_STRING);
+
+  /**
+   * The schema of cdc log file in the case 
`hoodie.table.cdc.supplemental.logging.mode` is 'op_key'.
+   */
+  public static final String CDC_SCHEMA_OP_AND_RECORDKEY_STRING = 
"{\"type\":\"record\",\"name\":\"Record\","
+      + "\"fields\":["
+      + "{\"name\":\"op\",\"type\":[\"string\",\"null\"]},"
+      + "{\"name\":\"record_key\",\"type\":[\"string\",\"null\"]}"
+      + "]}";
+
+  public static final Schema CDC_SCHEMA_OP_AND_RECORDKEY =
+      new Schema.Parser().parse(CDC_SCHEMA_OP_AND_RECORDKEY_STRING);
+
+  public static final Schema schemaBySupplementalLoggingMode(String 
supplementalLoggingMode) {
+    switch (supplementalLoggingMode) {
+      case HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE_AFTER:
+        return CDC_SCHEMA;
+      case HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_WITH_BEFORE:
+        return CDC_SCHEMA_OP_RECORDKEY_BEFORE;
+      case HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE_OP_KEY:
+        return CDC_SCHEMA_OP_AND_RECORDKEY;
+      default:
+        throw new HoodieException("not support this supplemental logging mode: 
" + supplementalLoggingMode);
+    }
+  }
+
+  /**
+   * Build the cdc record which has all the cdc fields when 
`hoodie.table.cdc.supplemental.logging.mode` is 'cdc_data_before_after'.
+   */
+  public static GenericData.Record cdcRecord(
+      String op, String commitTime, GenericRecord before, GenericRecord after) 
{
+    String beforeJsonStr = recordToJson(before);

Review Comment:
   Why are we using JSON and not Avro for ex? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to