YannByron commented on code in PR #6476:
URL: https://github.com/apache/hudi/pull/6476#discussion_r966816316


##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/CDCExtractor.java:
##########
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.ADD_BASE_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.CDC_LOG_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.MOR_LOG_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REMOVE_BASE_FILE;
+import static 
org.apache.hudi.common.table.cdc.HoodieCDCLogicalFileType.REPLACED_FILE_GROUP;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.isInRange;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+public class CDCExtractor {
+
+  private final HoodieTableMetaClient metaClient;
+
+  private final Path basePath;
+
+  private final FileSystem fs;
+
+  private final String supplementalLoggingMode;
+
+  private final String startInstant;
+
+  private final String endInstant;
+
+  // TODO: this will be used when support the cdc query type of 
'read_optimized'.
+  private final String cdcQueryType;
+
+  private Map<HoodieInstant, HoodieCommitMetadata> commits;
+
+  private HoodieTableFileSystemView fsView;
+
+  public CDCExtractor(
+      HoodieTableMetaClient metaClient,
+      String startInstant,
+      String endInstant,
+      String cdcqueryType) {
+    this.metaClient = metaClient;
+    this.basePath = metaClient.getBasePathV2();
+    this.fs = metaClient.getFs().getFileSystem();
+    this.supplementalLoggingMode = 
metaClient.getTableConfig().cdcSupplementalLoggingMode();
+    this.startInstant = startInstant;
+    this.endInstant = endInstant;
+    if (HoodieTableType.MERGE_ON_READ == metaClient.getTableType()
+        && cdcqueryType.equals("read_optimized")) {
+      throw new HoodieNotSupportedException("The 'read_optimized' cdc query 
type hasn't been supported for now.");
+    }
+    this.cdcQueryType = cdcqueryType;
+    init();
+  }
+
+  private void init() {
+    initInstantAndCommitMetadatas();
+    initFSView();
+  }
+
+  /**
+   * At the granularity of a file group, trace the mapping between
+   * each commit/instant and changes to this file group.
+   */
+  public Map<HoodieFileGroupId, List<Pair<HoodieInstant, CDCFileSplit>>> 
extractor() {

Review Comment:
   done.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/cdc/CDCFileSplit.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.cdc;
+
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.util.Option;
+
+import java.io.Serializable;
+
+/**
+ * This contains all the information that retrieve the change data at a single 
file group and
+ * at a single commit.
+ *
+ * For [[cdcFileType]] = [[CDCFileTypeEnum.ADD_BASE_FILE]], [[cdcFile]] is a 
current version of
+ *   the base file in the group, and [[beforeFileSlice]] is None.
+ * For [[cdcFileType]] = [[CDCFileTypeEnum.REMOVE_BASE_FILE]], [[cdcFile]] is 
null,
+ *   [[beforeFileSlice]] is the previous version of the base file in the group.
+ * For [[cdcFileType]] = [[CDCFileTypeEnum.CDC_LOG_FILE]], [[cdcFile]] is a 
log file with cdc blocks.
+ *   when enable the supplemental logging, both [[beforeFileSlice]] and 
[[afterFileSlice]] are None,
+ *   otherwise these two are the previous and current version of the base file.
+ * For [[cdcFileType]] = [[CDCFileTypeEnum.MOR_LOG_FILE]], [[cdcFile]] is a 
normal log file and
+ *   [[beforeFileSlice]] is the previous version of the file slice.
+ * For [[cdcFileType]] = [[CDCFileTypeEnum.REPLACED_FILE_GROUP]], [[cdcFile]] 
is null,
+ *   [[beforeFileSlice]] is the current version of the file slice.
+ */
+public class CDCFileSplit implements Serializable {

Review Comment:
   done.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/CDCLogRecordReader.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log;
+
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.cdc.CDCUtils;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.exception.HoodieIOException;
+
+import java.io.IOException;
+
+public class CDCLogRecordReader implements ClosableIterator<IndexedRecord> {

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to