cshuo commented on code in PR #18436:
URL: https://github.com/apache/hudi/pull/18436#discussion_r3026646418


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcImageManager.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cdc;
+
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.format.FormatUtils;
+import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Function;
+
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
+
+/**
+ * Manages serialized before/after image snapshots for a CDC file group, 
cached by instant time.
+ *
+ * <p>At most two versions (before and after) are kept in memory at once; 
older entries are
+ * evicted and spilled to disk via {@link ExternalSpillableMap}.
+ *
+ * <p>Also owns the I/O-view adapters ({@link BytesArrayInputView} / {@link 
BytesArrayOutputView})
+ * used for serialising {@link RowData} records into byte arrays.
+ */
+public class CdcImageManager implements AutoCloseable {
+
+  private final HoodieWriteConfig writeConfig;
+  private final RowDataSerializer serializer;
+  private final Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc;
+  private final Map<String, ExternalSpillableMap<String, byte[]>> cache;
+
+  public CdcImageManager(
+      RowType rowType,
+      HoodieWriteConfig writeConfig,
+      Function<MergeOnReadInputSplit, ClosableIterator<RowData>> 
splitIteratorFunc) {
+    this.serializer = new RowDataSerializer(rowType);
+    this.writeConfig = writeConfig;
+    this.splitIteratorFunc = splitIteratorFunc;
+    this.cache = new TreeMap<>();
+  }
+
+  public HoodieWriteConfig getWriteConfig() {

Review Comment:
   use lombok annotation.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java:
##########
@@ -915,6 +236,7 @@ public CdcInputFormat build() {
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
+

Review Comment:
   Maybe `fileSlice2Split` and `singleLogFile2Split` should also be moved to 
the common utils, such as `CdcIterators`? 



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/function/HoodieCdcSplitReaderFunction.java:
##########


Review Comment:
   Is it possible refactor this method as an util method in `CdcIterators`, 
since it's almost same with `CdcInputFormat#getRecordIterator`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to