This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch release-0.12.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d228a10770ac97f9f3407626839da27470ab89da Author: Danny Chan <[email protected]> AuthorDate: Thu Dec 1 17:13:59 2022 +0800 [HUDI-5306] Unify RecordIterator and HoodieParquetReader with ClosableIterator (#7340) * Unify RecordIterator and HoodieParquetReader with ClosableIterator * Add a factory clazz for RecordIterator * Add more documents --- .../apache/hudi/configuration/OptionsResolver.java | 2 +- .../java/org/apache/hudi/table/format/CastMap.java | 19 +- ...Reader.java => ParquetSplitRecordIterator.java} | 24 ++- ...odieParquetReader.java => RecordIterators.java} | 56 +++--- ...eader.java => SchemaEvolvedRecordIterator.java} | 25 ++- .../table/format/cow/CopyOnWriteInputFormat.java | 19 +- .../table/format/mor/MergeOnReadInputFormat.java | 195 +++++++++------------ 7 files changed, 159 insertions(+), 181 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index dd272e17fb0..2d56ea4e043 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -207,7 +207,7 @@ public class OptionsResolver { * Returns whether comprehensive schema evolution enabled. */ public static boolean isSchemaEvolutionEnabled(Configuration conf) { - return conf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), false); + return conf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue()); } // ------------------------------------------------------------------------- diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java index 5f29e85adc2..36cf8708875 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java @@ -49,14 +49,17 @@ import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR; /** * CastMap is responsible for conversion of flink types when full schema evolution enabled. - * Supported cast conversions: - * Integer => Long, Float, Double, Decimal, String - * Long => Float, Double, Decimal, String - * Float => Double, Decimal, String - * Double => Decimal, String - * Decimal => Decimal, String - * String => Decimal, Date - * Date => String + * + * <p>Supported cast conversions: + * <ul> + * <li>Integer => Long, Float, Double, Decimal, String</li> + * <li>Long => Float, Double, Decimal, String</li> + * <li>Float => Double, Decimal, String</li> + * <li>Double => Decimal, String</li> + * <li>Decimal => Decimal, String</li> + * <li>String => Decimal, Date</li> + * <li>Date => String</li> + * </ul> */ public final class CastMap implements Serializable { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetSplitReader.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java similarity index 63% rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetSplitReader.java rename to hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java index d13c6c7c21a..7b26d71f115 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetSplitReader.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java @@ -18,6 +18,8 @@ package org.apache.hudi.table.format; +import org.apache.hudi.common.util.ClosableIterator; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader; import org.apache.flink.table.data.RowData; @@ -27,25 +29,33 @@ import java.io.IOException; /** * Hoodie wrapper for flink parquet reader. */ -public final class HoodieParquetSplitReader implements HoodieParquetReader { +public final class ParquetSplitRecordIterator implements ClosableIterator<RowData> { private final ParquetColumnarRowSplitReader reader; - public HoodieParquetSplitReader(ParquetColumnarRowSplitReader reader) { + public ParquetSplitRecordIterator(ParquetColumnarRowSplitReader reader) { this.reader = reader; } @Override - public boolean reachedEnd() throws IOException { - return reader.reachedEnd(); + public boolean hasNext() { + try { + return !reader.reachedEnd(); + } catch (IOException e) { + throw new HoodieIOException("Decides whether the parquet columnar row split reader reached end exception", e); + } } @Override - public RowData nextRecord() { + public RowData next() { return reader.nextRecord(); } @Override - public void close() throws IOException { - reader.close(); + public void close() { + try { + reader.close(); + } catch (IOException e) { + throw new HoodieIOException("Close the parquet columnar row split reader exception", e); + } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetReader.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java similarity index 67% rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetReader.java rename to hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java index e762f03e983..8657f16ddc9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetReader.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.format; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.common.util.Option; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil; @@ -26,23 +27,17 @@ import org.apache.hudi.util.RowDataProjection; import org.apache.flink.core.fs.Path; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; - import org.apache.hadoop.conf.Configuration; -import java.io.Closeable; import java.io.IOException; import java.util.Map; /** - * Base interface for hoodie parquet readers. + * Factory clazz for record iterators. */ -public interface HoodieParquetReader extends Closeable { - - boolean reachedEnd() throws IOException; +public abstract class RecordIterators { - RowData nextRecord(); - - static HoodieParquetReader getReader( + public static ClosableIterator<RowData> getParquetRecordIterator( InternalSchemaManager internalSchemaManager, boolean utcTimestamp, boolean caseSensitive, @@ -55,10 +50,9 @@ public interface HoodieParquetReader extends Closeable { Path path, long splitStart, long splitLength) throws IOException { - Option<RowDataProjection> castProjection; InternalSchema fileSchema = internalSchemaManager.getFileSchema(path.getName()); if (fileSchema.isEmptySchema()) { - return new HoodieParquetSplitReader( + return new ParquetSplitRecordIterator( ParquetSplitReaderUtil.genPartColumnarRowReader( utcTimestamp, caseSensitive, @@ -73,27 +67,25 @@ public interface HoodieParquetReader extends Closeable { splitLength)); } else { CastMap castMap = internalSchemaManager.getCastMap(fileSchema, fieldNames, fieldTypes, selectedFields); - castProjection = castMap.toRowDataProjection(selectedFields); - fieldNames = internalSchemaManager.getFileFieldNames(fileSchema, fieldNames); - fieldTypes = castMap.getFileFieldTypes(); - } - HoodieParquetReader reader = new HoodieParquetSplitReader( - ParquetSplitReaderUtil.genPartColumnarRowReader( - utcTimestamp, - caseSensitive, - conf, - fieldNames, - fieldTypes, - partitionSpec, - selectedFields, - batchSize, - path, - splitStart, - splitLength)); - if (castProjection.isPresent()) { - return new HoodieParquetEvolvedSplitReader(reader, castProjection.get()); - } else { - return reader; + Option<RowDataProjection> castProjection = castMap.toRowDataProjection(selectedFields); + ClosableIterator<RowData> itr = new ParquetSplitRecordIterator( + ParquetSplitReaderUtil.genPartColumnarRowReader( + utcTimestamp, + caseSensitive, + conf, + internalSchemaManager.getFileFieldNames(fileSchema, fieldNames), // the reconciled field names + castMap.getFileFieldTypes(), // the reconciled field types + partitionSpec, + selectedFields, + batchSize, + path, + splitStart, + splitLength)); + if (castProjection.isPresent()) { + return new SchemaEvolvedRecordIterator(itr, castProjection.get()); + } else { + return itr; + } } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetEvolvedSplitReader.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java similarity index 63% rename from hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetEvolvedSplitReader.java rename to hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java index 037a3776359..739512c7b55 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetEvolvedSplitReader.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java @@ -18,36 +18,35 @@ package org.apache.hudi.table.format; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.util.RowDataProjection; import org.apache.flink.table.data.RowData; -import java.io.IOException; - /** - * Decorates origin hoodie parquet reader with cast projection. + * Decorates origin record iterator with cast projection. */ -public final class HoodieParquetEvolvedSplitReader implements HoodieParquetReader { - private final HoodieParquetReader originReader; +public final class SchemaEvolvedRecordIterator implements ClosableIterator<RowData> { + private final ClosableIterator<RowData> nested; private final RowDataProjection castProjection; - public HoodieParquetEvolvedSplitReader(HoodieParquetReader originReader, RowDataProjection castProjection) { - this.originReader = originReader; + public SchemaEvolvedRecordIterator(ClosableIterator<RowData> nested, RowDataProjection castProjection) { + this.nested = nested; this.castProjection = castProjection; } @Override - public boolean reachedEnd() throws IOException { - return originReader.reachedEnd(); + public boolean hasNext() { + return nested.hasNext(); } @Override - public RowData nextRecord() { - return castProjection.project(originReader.nextRecord()); + public RowData next() { + return castProjection.project(nested.next()); } @Override - public void close() throws IOException { - originReader.close(); + public void close() { + nested.close(); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java index 453d0fee232..820424549f0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java @@ -20,8 +20,9 @@ package org.apache.hudi.table.format.cow; import java.util.Comparator; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.table.format.HoodieParquetReader; +import org.apache.hudi.common.util.ClosableIterator; import org.apache.hudi.table.format.InternalSchemaManager; +import org.apache.hudi.table.format.RecordIterators; import org.apache.hudi.util.DataTypeUtils; import org.apache.flink.api.common.io.FileInputFormat; @@ -75,7 +76,7 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { private final SerializableConfiguration conf; private final long limit; - private transient HoodieParquetReader reader; + private transient ClosableIterator<RowData> itr; private transient long currentReadCount; /** @@ -128,7 +129,7 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { } }); - this.reader = HoodieParquetReader.getReader( + this.itr = RecordIterators.getParquetRecordIterator( internalSchemaManager, utcTimestamp, true, @@ -276,26 +277,26 @@ public class CopyOnWriteInputFormat extends FileInputFormat<RowData> { } @Override - public boolean reachedEnd() throws IOException { + public boolean reachedEnd() { if (currentReadCount >= limit) { return true; } else { - return reader.reachedEnd(); + return !itr.hasNext(); } } @Override public RowData nextRecord(RowData reuse) { currentReadCount++; - return reader.nextRecord(); + return itr.next(); } @Override public void close() throws IOException { - if (reader != null) { - this.reader.close(); + if (itr != null) { + this.itr.close(); } - this.reader = null; + this.itr = null; } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 3103e27e857..d58a4cd0bfd 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -29,12 +29,13 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.format.FilePathUtils; import org.apache.hudi.table.format.FormatUtils; -import org.apache.hudi.table.format.HoodieParquetReader; import org.apache.hudi.table.format.InternalSchemaManager; +import org.apache.hudi.table.format.RecordIterators; import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.DataTypeUtils; import org.apache.hudi.util.RowDataProjection; @@ -94,7 +95,7 @@ public class MergeOnReadInputFormat /** * Uniform iterator view for the underneath records. */ - private transient RecordIterator iterator; + private transient ClosableIterator<RowData> iterator; // for project push down /** @@ -180,10 +181,10 @@ public class MergeOnReadInputFormat this.iterator = new BaseFileOnlyFilteringIterator( split.getInstantRange(), this.tableState.getRequiredRowType(), - getReader(split.getBasePath().get(), getRequiredPosWithCommitTime(this.requiredPos))); + getBaseFileIterator(split.getBasePath().get(), getRequiredPosWithCommitTime(this.requiredPos))); } else { // base file only - this.iterator = new BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get())); + this.iterator = getBaseFileIterator(split.getBasePath().get()); } } else if (!split.getBasePath().isPresent()) { // log files only @@ -194,7 +195,7 @@ public class MergeOnReadInputFormat } } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) { this.iterator = new SkipMergeIterator( - getRequiredSchemaReader(split.getBasePath().get()), + getBaseFileIterator(split.getBasePath().get()), getLogFileIterator(split)); } else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) { this.iterator = new MergeIterator( @@ -209,7 +210,7 @@ public class MergeOnReadInputFormat this.requiredPos, this.emitDelete, this.tableState.getOperationPos(), - getFullSchemaReader(split.getBasePath().get())); + getBaseFileIteratorWithMetadata(split.getBasePath().get())); } else { throw new HoodieException("Unable to select an Iterator to read the Hoodie MOR File Split for " + "file path: " + split.getBasePath() @@ -249,14 +250,14 @@ public class MergeOnReadInputFormat return true; } else { // log file reaches end ? - return this.iterator.reachedEnd(); + return !this.iterator.hasNext(); } } @Override public RowData nextRecord(RowData o) { currentReadCount++; - return this.iterator.nextRecord(); + return this.iterator.next(); } @Override @@ -291,19 +292,19 @@ public class MergeOnReadInputFormat } } - protected HoodieParquetReader getFullSchemaReader(String path) { + protected ClosableIterator<RowData> getBaseFileIteratorWithMetadata(String path) { try { - return getReader(path, IntStream.range(0, this.tableState.getRowType().getFieldCount()).toArray()); + return getBaseFileIterator(path, IntStream.range(0, this.tableState.getRowType().getFieldCount()).toArray()); } catch (IOException e) { throw new HoodieException("Get reader error for path: " + path); } } - protected HoodieParquetReader getRequiredSchemaReader(String path) throws IOException { - return getReader(path, this.requiredPos); + protected ClosableIterator<RowData> getBaseFileIterator(String path) throws IOException { + return getBaseFileIterator(path, this.requiredPos); } - private HoodieParquetReader getReader(String path, int[] requiredPos) throws IOException { + private ClosableIterator<RowData> getBaseFileIterator(String path, int[] requiredPos) throws IOException { // generate partition specs. LinkedHashMap<String, String> partSpec = FilePathUtils.extractPartitionKeyValues( new org.apache.hadoop.fs.Path(path).getParent(), @@ -325,7 +326,7 @@ public class MergeOnReadInputFormat } }); - return HoodieParquetReader.getReader( + return RecordIterators.getParquetRecordIterator( internalSchemaManager, this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE), true, @@ -525,46 +526,12 @@ public class MergeOnReadInputFormat // ------------------------------------------------------------------------- // Inner Class // ------------------------------------------------------------------------- - private interface RecordIterator { - boolean reachedEnd() throws IOException; - - RowData nextRecord(); - - void close() throws IOException; - } - - static class BaseFileOnlyIterator implements RecordIterator { - // base file reader - private final HoodieParquetReader reader; - - public BaseFileOnlyIterator(HoodieParquetReader reader) { - this.reader = reader; - } - - @Override - public boolean reachedEnd() throws IOException { - return this.reader.reachedEnd(); - } - - @Override - public RowData nextRecord() { - return this.reader.nextRecord(); - } - - @Override - public void close() throws IOException { - if (this.reader != null) { - this.reader.close(); - } - } - } - /** - * Similar with {@link BaseFileOnlyIterator} but with instant time filtering. + * Base record iterator with instant time filtering. */ - static class BaseFileOnlyFilteringIterator implements RecordIterator { - // base file reader - private final HoodieParquetReader reader; + static class BaseFileOnlyFilteringIterator implements ClosableIterator<RowData> { + // base file record iterator + private final ClosableIterator<RowData> nested; private final InstantRange instantRange; private final RowDataProjection projection; @@ -573,44 +540,44 @@ public class MergeOnReadInputFormat BaseFileOnlyFilteringIterator( Option<InstantRange> instantRange, RowType requiredRowType, - HoodieParquetReader reader) { - this.reader = reader; + ClosableIterator<RowData> nested) { + this.nested = nested; this.instantRange = instantRange.orElse(null); int[] positions = IntStream.range(1, 1 + requiredRowType.getFieldCount()).toArray(); projection = RowDataProjection.instance(requiredRowType, positions); } @Override - public boolean reachedEnd() throws IOException { - while (!this.reader.reachedEnd()) { - currentRecord = this.reader.nextRecord(); + public boolean hasNext() { + while (this.nested.hasNext()) { + currentRecord = this.nested.next(); if (instantRange != null) { boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString()); if (isInRange) { - return false; + return true; } } else { - return false; + return true; } } - return true; + return false; } @Override - public RowData nextRecord() { + public RowData next() { // can promote: no need to project with null instant range return projection.project(currentRecord); } @Override - public void close() throws IOException { - if (this.reader != null) { - this.reader.close(); + public void close() { + if (this.nested != null) { + this.nested.close(); } } } - static class LogFileOnlyIterator implements RecordIterator { + protected static class LogFileOnlyIterator implements ClosableIterator<RowData> { // iterator for log files private final ClosableIterator<RowData> iterator; @@ -619,12 +586,12 @@ public class MergeOnReadInputFormat } @Override - public boolean reachedEnd() { - return !this.iterator.hasNext(); + public boolean hasNext() { + return this.iterator.hasNext(); } @Override - public RowData nextRecord() { + public RowData next() { return this.iterator.next(); } @@ -636,9 +603,9 @@ public class MergeOnReadInputFormat } } - static class SkipMergeIterator implements RecordIterator { - // base file reader - private final HoodieParquetReader reader; + static class SkipMergeIterator implements ClosableIterator<RowData> { + // base file record iterator + private final ClosableIterator<RowData> nested; // iterator for log files private final ClosableIterator<RowData> iterator; @@ -649,34 +616,34 @@ public class MergeOnReadInputFormat private RowData currentRecord; - SkipMergeIterator(HoodieParquetReader reader, ClosableIterator<RowData> iterator) { - this.reader = reader; + SkipMergeIterator(ClosableIterator<RowData> nested, ClosableIterator<RowData> iterator) { + this.nested = nested; this.iterator = iterator; } @Override - public boolean reachedEnd() throws IOException { - if (!readLogs && !this.reader.reachedEnd()) { - currentRecord = this.reader.nextRecord(); - return false; + public boolean hasNext() { + if (!readLogs && this.nested.hasNext()) { + currentRecord = this.nested.next(); + return true; } readLogs = true; if (this.iterator.hasNext()) { currentRecord = this.iterator.next(); - return false; + return true; } - return true; + return false; } @Override - public RowData nextRecord() { + public RowData next() { return currentRecord; } @Override - public void close() throws IOException { - if (this.reader != null) { - this.reader.close(); + public void close() { + if (this.nested != null) { + this.nested.close(); } if (this.iterator != null) { this.iterator.close(); @@ -684,9 +651,9 @@ public class MergeOnReadInputFormat } } - static class MergeIterator implements RecordIterator { - // base file reader - private final HoodieParquetReader reader; + protected static class MergeIterator implements ClosableIterator<RowData> { + // base file record iterator + private final ClosableIterator<RowData> nested; // log keys used for merging private final Iterator<String> logKeysIterator; // scanner @@ -726,12 +693,12 @@ public class MergeOnReadInputFormat int[] requiredPos, boolean emitDelete, int operationPos, - HoodieParquetReader reader) { // the reader should be with full schema + ClosableIterator<RowData> nested) { // the iterator should be with full schema this(flinkConf, hadoopConf, split, tableRowType, requiredRowType, tableSchema, querySchema, Option.of(RowDataProjection.instance(requiredRowType, requiredPos)), Option.of(record -> buildAvroRecordBySchema(record, requiredSchema, requiredPos, new GenericRecordBuilder(requiredSchema))), - emitDelete, operationPos, reader); + emitDelete, operationPos, nested); } public MergeIterator( @@ -746,9 +713,9 @@ public class MergeOnReadInputFormat Option<Function<IndexedRecord, GenericRecord>> avroProjection, boolean emitDelete, int operationPos, - HoodieParquetReader reader) { // the reader should be with full schema + ClosableIterator<RowData> nested) { // the iterator should be with full schema this.tableSchema = tableSchema; - this.reader = reader; + this.nested = nested; this.scanner = FormatUtils.logScanner(split, tableSchema, querySchema, flinkConf, hadoopConf); this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps(); this.logKeysIterator = scanner.getRecords().keySet().iterator(); @@ -762,9 +729,9 @@ public class MergeOnReadInputFormat } @Override - public boolean reachedEnd() throws IOException { - while (!readLogs && !this.reader.reachedEnd()) { - currentRecord = this.reader.nextRecord(); + public boolean hasNext() { + while (!readLogs && this.nested.hasNext()) { + currentRecord = this.nested.next(); if (instantRange != null) { boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString()); if (!isInRange) { @@ -786,18 +753,18 @@ public class MergeOnReadInputFormat continue; } IndexedRecord avroRecord = avroProjection.isPresent() - ? avroProjection.get().apply(mergedAvroRecord.get()) - : mergedAvroRecord.get(); + ? avroProjection.get().apply(mergedAvroRecord.get()) + : mergedAvroRecord.get(); this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord); this.currentRecord.setRowKind(rowKind); - return false; + return true; } } // project the full record in base with required positions if (projection.isPresent()) { currentRecord = projection.get().project(currentRecord); } - return false; + return true; } // read the logs readLogs = true; @@ -808,46 +775,52 @@ public class MergeOnReadInputFormat if (insertAvroRecord.isPresent()) { // the record is a DELETE if insertAvroRecord not present, skipping IndexedRecord avroRecord = avroProjection.isPresent() - ? avroProjection.get().apply(insertAvroRecord.get()) - : insertAvroRecord.get(); + ? avroProjection.get().apply(insertAvroRecord.get()) + : insertAvroRecord.get(); this.currentRecord = (RowData) avroToRowDataConverter.convert(avroRecord); FormatUtils.setRowKind(this.currentRecord, insertAvroRecord.get(), this.operationPos); - return false; + return true; } } } - return true; + return false; } - private Option<IndexedRecord> getInsertValue(String curKey) throws IOException { + private Option<IndexedRecord> getInsertValue(String curKey) { final HoodieAvroRecord<?> record = (HoodieAvroRecord) scanner.getRecords().get(curKey); if (!emitDelete && HoodieOperation.isDelete(record.getOperation())) { return Option.empty(); } - return record.getData().getInsertValue(tableSchema); + try { + return record.getData().getInsertValue(tableSchema); + } catch (IOException e) { + throw new HoodieIOException("Get insert value from payload exception", e); + } } @Override - public RowData nextRecord() { + public RowData next() { return currentRecord; } @Override - public void close() throws IOException { - if (this.reader != null) { - this.reader.close(); + public void close() { + if (this.nested != null) { + this.nested.close(); } if (this.scanner != null) { this.scanner.close(); } } - private Option<IndexedRecord> mergeRowWithLog( - RowData curRow, - String curKey) throws IOException { + private Option<IndexedRecord> mergeRowWithLog(RowData curRow, String curKey) { final HoodieAvroRecord<?> record = (HoodieAvroRecord) scanner.getRecords().get(curKey); GenericRecord historyAvroRecord = (GenericRecord) rowDataToAvroConverter.convert(tableSchema, curRow); - return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema, payloadProps); + try { + return record.getData().combineAndGetUpdateValue(historyAvroRecord, tableSchema, payloadProps); + } catch (IOException e) { + throw new HoodieIOException("Merge base and delta payloads exception", e); + } } }
