jonvex commented on code in PR #9774:
URL: https://github.com/apache/hudi/pull/9774#discussion_r1349097170
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java:
##########
@@ -19,47 +19,80 @@
package org.apache.hudi.common.table.log;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.avro.Schema;
import java.io.IOException;
import java.util.Iterator;
+import java.util.Map;
import java.util.Properties;
-/**
- * Reads records from base file and merges any updates from log files and
provides iterable over all records in the file slice.
- */
-public class HoodieFileSliceReader<T> implements Iterator<HoodieRecord<T>> {
+public class HoodieFileSliceReader<T> extends LogFileIterator<T> {
+ private Option<Iterator<HoodieRecord>> baseFileIterator;
+ private HoodieMergedLogRecordScanner scanner;
+ private Schema schema;
+ private Properties props;
- private final Iterator<HoodieRecord<T>> recordsIterator;
+ private TypedProperties payloadProps = new TypedProperties();
+ private Option<Pair<String, String>> simpleKeyGenFieldsOpt;
+ Map<String, HoodieRecord> records;
+ HoodieRecordMerger merger;
- public static HoodieFileSliceReader getFileSliceReader(
- Option<HoodieFileReader> baseFileReader, HoodieMergedLogRecordScanner
scanner, Schema schema, Properties props, Option<Pair<String, String>>
simpleKeyGenFieldsOpt) throws IOException {
+ public HoodieFileSliceReader(Option<HoodieFileReader> baseFileReader,
+ HoodieMergedLogRecordScanner scanner,
Schema schema, String preCombineField, HoodieRecordMerger merger,
+ Properties props, Option<Pair<String, String>>
simpleKeyGenFieldsOpt) throws IOException {
+ super(scanner);
if (baseFileReader.isPresent()) {
- Iterator<HoodieRecord> baseIterator =
baseFileReader.get().getRecordIterator(schema);
- while (baseIterator.hasNext()) {
-
scanner.processNextRecord(baseIterator.next().wrapIntoHoodieRecordPayloadWithParams(schema,
props,
- simpleKeyGenFieldsOpt, scanner.isWithOperationField(),
scanner.getPartitionNameOverride(), false, Option.empty()));
- }
+ this.baseFileIterator =
Option.of(baseFileReader.get().getRecordIterator(schema));
+ } else {
+ this.baseFileIterator = Option.empty();
}
- return new HoodieFileSliceReader(scanner.iterator());
+ this.scanner = scanner;
+ this.schema = schema;
+ this.merger = merger;
+ if (preCombineField != null) {
+
payloadProps.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY,
preCombineField);
+ }
+ this.props = props;
+ this.simpleKeyGenFieldsOpt = simpleKeyGenFieldsOpt;
+ this.records = scanner.getRecords();
}
- private HoodieFileSliceReader(Iterator<HoodieRecord<T>> recordsItr) {
- this.recordsIterator = recordsItr;
+ private Boolean hasNextInternal() {
+ while (baseFileIterator.isPresent() && baseFileIterator.get().hasNext()) {
+ try {
+ HoodieRecord currentRecord =
baseFileIterator.get().next().wrapIntoHoodieRecordPayloadWithParams(schema,
props,
+ simpleKeyGenFieldsOpt, scanner.isWithOperationField(),
scanner.getPartitionNameOverride(), false, Option.empty());
+ Option<HoodieRecord> logRecord =
removeLogRecord(currentRecord.getRecordKey());
+ if (!logRecord.isPresent()) {
Review Comment:
I tried to mimic the implementation from
https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala.
We will eventually use the HoodieFilegroupReader for both places. I think we
should get this fix into 0.14.1 using the same logic we have. We can discuss
improvements to mor base file log merging in the HoodieFilegroupReader
implementation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]