danny0405 commented on code in PR #6818:
URL: https://github.com/apache/hudi/pull/6818#discussion_r990585049
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java:
##########
@@ -27,50 +27,94 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
public class HoodieCDCLogRecordIterator implements
ClosableIterator<IndexedRecord> {
- private final HoodieLogFile cdcLogFile;
+ private final FileSystem fs;
- private final HoodieLogFormat.Reader reader;
+ private final Schema cdcSchema;
+
+ private final Iterator<HoodieLogFile> cdcLogFileIter;
+
+ private HoodieLogFormat.Reader reader;
+
+ /**
+ * Due to the hasNext of {@link HoodieLogFormat.Reader} is not idempotent,
+ * Here guarantee idempotent by `hasNextCall` and `nextCall`.
+ */
+ private final AtomicInteger hasNextCall = new AtomicInteger(0);
+ private final AtomicInteger nextCall = new AtomicInteger(0);
private ClosableIterator<IndexedRecord> itr;
- public HoodieCDCLogRecordIterator(
- FileSystem fs,
- Path cdcLogPath,
- Schema cdcSchema) throws IOException {
- this.cdcLogFile = new HoodieLogFile(fs.getFileStatus(cdcLogPath));
- this.reader = new HoodieLogFileReader(fs, cdcLogFile, cdcSchema,
- HoodieLogFileReader.DEFAULT_BUFFER_SIZE, false);
+ public HoodieCDCLogRecordIterator(FileSystem fs, HoodieLogFile[]
cdcLogFiles, Schema cdcSchema) {
+ this.fs = fs;
+ this.cdcSchema = cdcSchema;
+ this.cdcLogFileIter = Arrays.stream(cdcLogFiles).iterator();
}
Review Comment:
Do we have some sort sequence for these files ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]