This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 07cc3e89a7 [HUDI-5306] Unify RecordIterator and HoodieParquetReader
with ClosableIterator (#7340)
07cc3e89a7 is described below
commit 07cc3e89a73430832e6b109d215e67ef52aa4089
Author: Danny Chan <[email protected]>
AuthorDate: Thu Dec 1 17:13:59 2022 +0800
[HUDI-5306] Unify RecordIterator and HoodieParquetReader with
ClosableIterator (#7340)
* Unify RecordIterator and HoodieParquetReader with ClosableIterator
* Add a factory clazz for RecordIterator
* Add more documents
---
.../apache/hudi/configuration/OptionsResolver.java | 2 +-
.../java/org/apache/hudi/table/format/CastMap.java | 19 ++-
...Reader.java => ParquetSplitRecordIterator.java} | 24 ++-
...odieParquetReader.java => RecordIterators.java} | 56 +++---
...eader.java => SchemaEvolvedRecordIterator.java} | 25 ++-
.../hudi/table/format/cdc/CdcInputFormat.java | 134 +++++++--------
.../table/format/cow/CopyOnWriteInputFormat.java | 19 ++-
.../table/format/mor/MergeOnReadInputFormat.java | 189 +++++++++------------
8 files changed, 223 insertions(+), 245 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 16b15cf3d2..619674145a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -216,7 +216,7 @@ public class OptionsResolver {
* Returns whether comprehensive schema evolution enabled.
*/
public static boolean isSchemaEvolutionEnabled(Configuration conf) {
- return conf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(),
false);
+ return conf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(),
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue());
}
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java
index 5f29e85adc..36cf870887 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java
@@ -49,14 +49,17 @@ import static
org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
/**
* CastMap is responsible for conversion of flink types when full schema
evolution enabled.
- * Supported cast conversions:
- * Integer => Long, Float, Double, Decimal, String
- * Long => Float, Double, Decimal, String
- * Float => Double, Decimal, String
- * Double => Decimal, String
- * Decimal => Decimal, String
- * String => Decimal, Date
- * Date => String
+ *
+ * <p>Supported cast conversions:
+ * <ul>
+ * <li>Integer => Long, Float, Double, Decimal, String</li>
+ * <li>Long => Float, Double, Decimal, String</li>
+ * <li>Float => Double, Decimal, String</li>
+ * <li>Double => Decimal, String</li>
+ * <li>Decimal => Decimal, String</li>
+ * <li>String => Decimal, Date</li>
+ * <li>Date => String</li>
+ * </ul>
*/
public final class CastMap implements Serializable {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetSplitReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java
similarity index 63%
rename from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetSplitReader.java
rename to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java
index d13c6c7c21..7b26d71f11 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetSplitReader.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java
@@ -18,6 +18,8 @@
package org.apache.hudi.table.format;
+import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.exception.HoodieIOException;
import
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
import org.apache.flink.table.data.RowData;
@@ -27,25 +29,33 @@ import java.io.IOException;
/**
* Hoodie wrapper for flink parquet reader.
*/
-public final class HoodieParquetSplitReader implements HoodieParquetReader {
+public final class ParquetSplitRecordIterator implements
ClosableIterator<RowData> {
private final ParquetColumnarRowSplitReader reader;
- public HoodieParquetSplitReader(ParquetColumnarRowSplitReader reader) {
+ public ParquetSplitRecordIterator(ParquetColumnarRowSplitReader reader) {
this.reader = reader;
}
@Override
- public boolean reachedEnd() throws IOException {
- return reader.reachedEnd();
+ public boolean hasNext() {
+ try {
+ return !reader.reachedEnd();
+ } catch (IOException e) {
+ throw new HoodieIOException("Decides whether the parquet columnar row
split reader reached end exception", e);
+ }
}
@Override
- public RowData nextRecord() {
+ public RowData next() {
return reader.nextRecord();
}
@Override
- public void close() throws IOException {
- reader.close();
+ public void close() {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ throw new HoodieIOException("Close the parquet columnar row split reader
exception", e);
+ }
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
similarity index 67%
rename from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetReader.java
rename to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
index e762f03e98..8657f16ddc 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetReader.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table.format;
+import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
@@ -26,23 +27,17 @@ import org.apache.hudi.util.RowDataProjection;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
-
import org.apache.hadoop.conf.Configuration;
-import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
/**
- * Base interface for hoodie parquet readers.
+ * Factory clazz for record iterators.
*/
-public interface HoodieParquetReader extends Closeable {
-
- boolean reachedEnd() throws IOException;
+public abstract class RecordIterators {
- RowData nextRecord();
-
- static HoodieParquetReader getReader(
+ public static ClosableIterator<RowData> getParquetRecordIterator(
InternalSchemaManager internalSchemaManager,
boolean utcTimestamp,
boolean caseSensitive,
@@ -55,10 +50,9 @@ public interface HoodieParquetReader extends Closeable {
Path path,
long splitStart,
long splitLength) throws IOException {
- Option<RowDataProjection> castProjection;
InternalSchema fileSchema =
internalSchemaManager.getFileSchema(path.getName());
if (fileSchema.isEmptySchema()) {
- return new HoodieParquetSplitReader(
+ return new ParquetSplitRecordIterator(
ParquetSplitReaderUtil.genPartColumnarRowReader(
utcTimestamp,
caseSensitive,
@@ -73,27 +67,25 @@ public interface HoodieParquetReader extends Closeable {
splitLength));
} else {
CastMap castMap = internalSchemaManager.getCastMap(fileSchema,
fieldNames, fieldTypes, selectedFields);
- castProjection = castMap.toRowDataProjection(selectedFields);
- fieldNames = internalSchemaManager.getFileFieldNames(fileSchema,
fieldNames);
- fieldTypes = castMap.getFileFieldTypes();
- }
- HoodieParquetReader reader = new HoodieParquetSplitReader(
- ParquetSplitReaderUtil.genPartColumnarRowReader(
- utcTimestamp,
- caseSensitive,
- conf,
- fieldNames,
- fieldTypes,
- partitionSpec,
- selectedFields,
- batchSize,
- path,
- splitStart,
- splitLength));
- if (castProjection.isPresent()) {
- return new HoodieParquetEvolvedSplitReader(reader, castProjection.get());
- } else {
- return reader;
+ Option<RowDataProjection> castProjection =
castMap.toRowDataProjection(selectedFields);
+ ClosableIterator<RowData> itr = new ParquetSplitRecordIterator(
+ ParquetSplitReaderUtil.genPartColumnarRowReader(
+ utcTimestamp,
+ caseSensitive,
+ conf,
+ internalSchemaManager.getFileFieldNames(fileSchema, fieldNames),
// the reconciled field names
+ castMap.getFileFieldTypes(),
// the reconciled field types
+ partitionSpec,
+ selectedFields,
+ batchSize,
+ path,
+ splitStart,
+ splitLength));
+ if (castProjection.isPresent()) {
+ return new SchemaEvolvedRecordIterator(itr, castProjection.get());
+ } else {
+ return itr;
+ }
}
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetEvolvedSplitReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java
similarity index 63%
rename from
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetEvolvedSplitReader.java
rename to
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java
index 037a377635..739512c7b5 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieParquetEvolvedSplitReader.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java
@@ -18,36 +18,35 @@
package org.apache.hudi.table.format;
+import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.util.RowDataProjection;
import org.apache.flink.table.data.RowData;
-import java.io.IOException;
-
/**
- * Decorates origin hoodie parquet reader with cast projection.
+ * Decorates origin record iterator with cast projection.
*/
-public final class HoodieParquetEvolvedSplitReader implements
HoodieParquetReader {
- private final HoodieParquetReader originReader;
+public final class SchemaEvolvedRecordIterator implements
ClosableIterator<RowData> {
+ private final ClosableIterator<RowData> nested;
private final RowDataProjection castProjection;
- public HoodieParquetEvolvedSplitReader(HoodieParquetReader originReader,
RowDataProjection castProjection) {
- this.originReader = originReader;
+ public SchemaEvolvedRecordIterator(ClosableIterator<RowData> nested,
RowDataProjection castProjection) {
+ this.nested = nested;
this.castProjection = castProjection;
}
@Override
- public boolean reachedEnd() throws IOException {
- return originReader.reachedEnd();
+ public boolean hasNext() {
+ return nested.hasNext();
}
@Override
- public RowData nextRecord() {
- return castProjection.project(originReader.nextRecord());
+ public RowData next() {
+ return castProjection.project(nested.next());
}
@Override
- public void close() throws IOException {
- originReader.close();
+ public void close() {
+ nested.close();
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index 8d2842a160..4e162d8e2b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator;
+import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
@@ -38,7 +39,6 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.table.format.FormatUtils;
-import org.apache.hudi.table.format.HoodieParquetReader;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
@@ -94,11 +94,11 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
}
@Override
- protected RecordIterator initIterator(MergeOnReadInputSplit split) throws
IOException {
+ protected ClosableIterator<RowData> initIterator(MergeOnReadInputSplit
split) throws IOException {
if (split instanceof CdcInputSplit) {
HoodieCDCSupplementalLoggingMode mode =
OptionsResolver.getCDCSupplementalLoggingMode(conf);
ImageManager manager = new ImageManager(conf, tableState.getRowType(),
this::getFileSliceIterator);
- Function<HoodieCDCFileSplit, RecordIterator> recordIteratorFunc =
+ Function<HoodieCDCFileSplit, ClosableIterator<RowData>>
recordIteratorFunc =
cdcFileSplit -> getRecordIteratorV2(split.getTablePath(),
split.getMaxCompactionMemoryInBytes(), cdcFileSplit, mode, manager);
return new CdcFileSplitsIterator((CdcInputSplit) split, manager,
recordIteratorFunc);
} else {
@@ -113,10 +113,10 @@ public class CdcInputFormat extends
MergeOnReadInputFormat {
return new Builder();
}
- private RecordIterator getFileSliceIterator(MergeOnReadInputSplit split) {
+ private ClosableIterator<RowData> getFileSliceIterator(MergeOnReadInputSplit
split) {
if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size()
> 0)) {
// base file only
- return new
BaseFileOnlyIterator(getFullSchemaReader(split.getBasePath().get()));
+ return getBaseFileIteratorWithMetadata(split.getBasePath().get());
} else if (!split.getBasePath().isPresent()) {
// log files only
return new LogFileOnlyIterator(getFullLogFileIterator(split));
@@ -134,11 +134,11 @@ public class CdcInputFormat extends
MergeOnReadInputFormat {
Option.empty(),
false,
this.tableState.getOperationPos(),
- getFullSchemaReader(split.getBasePath().get()));
+ getBaseFileIteratorWithMetadata(split.getBasePath().get()));
}
}
- private RecordIterator getRecordIteratorV2(
+ private ClosableIterator<RowData> getRecordIteratorV2(
String tablePath,
long maxCompactionMemoryInBytes,
HoodieCDCFileSplit fileSplit,
@@ -151,7 +151,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
}
}
- private RecordIterator getRecordIterator(
+ private ClosableIterator<RowData> getRecordIterator(
String tablePath,
long maxCompactionMemoryInBytes,
HoodieCDCFileSplit fileSplit,
@@ -162,7 +162,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
ValidationUtils.checkState(fileSplit.getCdcFiles() != null &&
fileSplit.getCdcFiles().size() == 1,
"CDC file path should exist and be only one");
String path = new Path(tablePath,
fileSplit.getCdcFiles().get(0)).toString();
- return new AddBaseFileIterator(getRequiredSchemaReader(path));
+ return new AddBaseFileIterator(getBaseFileIterator(path));
case BASE_FILE_DELETE:
ValidationUtils.checkState(fileSplit.getBeforeFileSlice().isPresent(),
"Before file slice should exist");
@@ -192,26 +192,26 @@ public class CdcInputFormat extends
MergeOnReadInputFormat {
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
- static class CdcFileSplitsIterator implements RecordIterator {
+ static class CdcFileSplitsIterator implements ClosableIterator<RowData> {
private ImageManager imageManager; // keep a reference to release resource
private final Iterator<HoodieCDCFileSplit> fileSplitIterator;
- private final Function<HoodieCDCFileSplit, RecordIterator>
recordIteratorFunc;
- private RecordIterator recordIterator;
+ private final Function<HoodieCDCFileSplit, ClosableIterator<RowData>>
recordIteratorFunc;
+ private ClosableIterator<RowData> recordIterator;
CdcFileSplitsIterator(
CdcInputSplit inputSplit,
ImageManager imageManager,
- Function<HoodieCDCFileSplit, RecordIterator> recordIteratorFunc) {
+ Function<HoodieCDCFileSplit, ClosableIterator<RowData>>
recordIteratorFunc) {
this.fileSplitIterator =
Arrays.asList(inputSplit.getChanges()).iterator();
this.imageManager = imageManager;
this.recordIteratorFunc = recordIteratorFunc;
}
@Override
- public boolean reachedEnd() throws IOException {
+ public boolean hasNext() {
if (recordIterator != null) {
- if (!recordIterator.reachedEnd()) {
- return false;
+ if (recordIterator.hasNext()) {
+ return true;
} else {
recordIterator.close(); // release resource
recordIterator = null;
@@ -220,18 +220,18 @@ public class CdcInputFormat extends
MergeOnReadInputFormat {
if (fileSplitIterator.hasNext()) {
HoodieCDCFileSplit fileSplit = fileSplitIterator.next();
recordIterator = recordIteratorFunc.apply(fileSplit);
- return recordIterator.reachedEnd();
+ return recordIterator.hasNext();
}
- return true;
+ return false;
}
@Override
- public RowData nextRecord() {
- return recordIterator.nextRecord();
+ public RowData next() {
+ return recordIterator.next();
}
@Override
- public void close() throws IOException {
+ public void close() {
if (recordIterator != null) {
recordIterator.close();
}
@@ -242,63 +242,63 @@ public class CdcInputFormat extends
MergeOnReadInputFormat {
}
}
- static class AddBaseFileIterator implements RecordIterator {
- // base file reader
- private HoodieParquetReader reader;
+ static class AddBaseFileIterator implements ClosableIterator<RowData> {
+ // base file record iterator
+ private ClosableIterator<RowData> nested;
private RowData currentRecord;
- AddBaseFileIterator(HoodieParquetReader reader) {
- this.reader = reader;
+ AddBaseFileIterator(ClosableIterator<RowData> nested) {
+ this.nested = nested;
}
@Override
- public boolean reachedEnd() throws IOException {
- if (!this.reader.reachedEnd()) {
- currentRecord = this.reader.nextRecord();
+ public boolean hasNext() {
+ if (this.nested.hasNext()) {
+ currentRecord = this.nested.next();
currentRecord.setRowKind(RowKind.INSERT);
- return false;
+ return true;
}
- return true;
+ return false;
}
@Override
- public RowData nextRecord() {
+ public RowData next() {
return currentRecord;
}
@Override
- public void close() throws IOException {
- if (this.reader != null) {
- this.reader.close();
- this.reader = null;
+ public void close() {
+ if (this.nested != null) {
+ this.nested.close();
+ this.nested = null;
}
}
}
- static class RemoveBaseFileIterator implements RecordIterator {
- private RecordIterator nested;
+ static class RemoveBaseFileIterator implements ClosableIterator<RowData> {
+ private ClosableIterator<RowData> nested;
private final RowDataProjection projection;
- RemoveBaseFileIterator(MergeOnReadTableState tableState, RecordIterator
iterator) {
+ RemoveBaseFileIterator(MergeOnReadTableState tableState,
ClosableIterator<RowData> iterator) {
this.nested = iterator;
this.projection =
RowDataProjection.instance(tableState.getRequiredRowType(),
tableState.getRequiredPositions());
}
@Override
- public boolean reachedEnd() throws IOException {
- return nested.reachedEnd();
+ public boolean hasNext() {
+ return nested.hasNext();
}
@Override
- public RowData nextRecord() {
- RowData row = nested.nextRecord();
+ public RowData next() {
+ RowData row = nested.next();
row.setRowKind(RowKind.DELETE);
return this.projection.project(row);
}
@Override
- public void close() throws IOException {
+ public void close() {
if (this.nested != null) {
this.nested.close();
this.nested = null;
@@ -306,7 +306,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
}
}
- abstract static class BaseImageIterator implements RecordIterator {
+ abstract static class BaseImageIterator implements ClosableIterator<RowData>
{
private final Schema requiredSchema;
private final int[] requiredPos;
private final GenericRecordBuilder recordBuilder;
@@ -353,18 +353,18 @@ public class CdcInputFormat extends
MergeOnReadInputFormat {
}
@Override
- public boolean reachedEnd() {
+ public boolean hasNext() {
if (this.sideImage != null) {
currentImage = this.sideImage;
this.sideImage = null;
- return false;
+ return true;
} else if (this.cdcItr.hasNext()) {
cdcRecord = (GenericRecord) this.cdcItr.next();
String op = String.valueOf(cdcRecord.get(0));
resolveImage(op);
- return false;
+ return true;
}
- return true;
+ return false;
}
protected abstract RowData getAfterImage(RowKind rowKind, GenericRecord
cdcRecord);
@@ -372,12 +372,12 @@ public class CdcInputFormat extends
MergeOnReadInputFormat {
protected abstract RowData getBeforeImage(RowKind rowKind, GenericRecord
cdcRecord);
@Override
- public RowData nextRecord() {
+ public RowData next() {
return currentImage;
}
@Override
- public void close() throws IOException {
+ public void close() {
if (this.cdcItr != null) {
this.cdcItr.close();
this.cdcItr = null;
@@ -420,7 +420,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
MergeOnReadTableState tableState,
org.apache.hadoop.conf.Configuration hadoopConf,
Schema cdcSchema,
- HoodieCDCFileSplit fileSplit) throws IOException {
+ HoodieCDCFileSplit fileSplit) {
super(hadoopConf, tablePath, tableState, cdcSchema, fileSplit);
}
@@ -515,8 +515,8 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
}
}
- static class ReplaceCommitIterator implements RecordIterator {
- private final RecordIterator itr;
+ static class ReplaceCommitIterator implements ClosableIterator<RowData> {
+ private final ClosableIterator<RowData> itr;
private final RowDataProjection projection;
ReplaceCommitIterator(
@@ -524,16 +524,16 @@ public class CdcInputFormat extends
MergeOnReadInputFormat {
String tablePath,
MergeOnReadTableState tableState,
HoodieCDCFileSplit fileSplit,
- Function<MergeOnReadInputSplit, RecordIterator> splitIteratorFunc) {
+ Function<MergeOnReadInputSplit, ClosableIterator<RowData>>
splitIteratorFunc) {
this.itr = initIterator(tablePath,
StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf), fileSplit,
splitIteratorFunc);
this.projection =
RowDataProjection.instance(tableState.getRequiredRowType(),
tableState.getRequiredPositions());
}
- private RecordIterator initIterator(
+ private ClosableIterator<RowData> initIterator(
String tablePath,
long maxCompactionMemoryInBytes,
HoodieCDCFileSplit fileSplit,
- Function<MergeOnReadInputSplit, RecordIterator> splitIteratorFunc) {
+ Function<MergeOnReadInputSplit, ClosableIterator<RowData>>
splitIteratorFunc) {
// init before images
// the before file slice must exist,
@@ -546,19 +546,19 @@ public class CdcInputFormat extends
MergeOnReadInputFormat {
}
@Override
- public boolean reachedEnd() throws IOException {
- return this.itr.reachedEnd();
+ public boolean hasNext() {
+ return this.itr.hasNext();
}
@Override
- public RowData nextRecord() {
- RowData row = this.itr.nextRecord();
+ public RowData next() {
+ RowData row = this.itr.next();
row.setRowKind(RowKind.DELETE);
return this.projection.project(row);
}
@Override
- public void close() throws IOException {
+ public void close() {
this.itr.close();
}
}
@@ -602,14 +602,14 @@ public class CdcInputFormat extends
MergeOnReadInputFormat {
private final HoodieWriteConfig writeConfig;
private final RowDataSerializer serializer;
- private final Function<MergeOnReadInputSplit, RecordIterator>
splitIteratorFunc;
+ private final Function<MergeOnReadInputSplit, ClosableIterator<RowData>>
splitIteratorFunc;
private final Map<String, ExternalSpillableMap<String, byte[]>> cache;
public ImageManager(
Configuration flinkConf,
RowType rowType,
- Function<MergeOnReadInputSplit, RecordIterator> splitIteratorFunc) {
+ Function<MergeOnReadInputSplit, ClosableIterator<RowData>>
splitIteratorFunc) {
this.serializer = new RowDataSerializer(rowType);
this.splitIteratorFunc = splitIteratorFunc;
this.cache = new TreeMap<>();
@@ -638,12 +638,12 @@ public class CdcInputFormat extends
MergeOnReadInputFormat {
long maxCompactionMemoryInBytes,
FileSlice fileSlice) throws IOException {
MergeOnReadInputSplit inputSplit =
CdcInputFormat.fileSlice2Split(writeConfig.getBasePath(), fileSlice,
maxCompactionMemoryInBytes);
- RecordIterator itr = splitIteratorFunc.apply(inputSplit);
+ ClosableIterator<RowData> itr = splitIteratorFunc.apply(inputSplit);
// initialize the image records map
ExternalSpillableMap<String, byte[]> imageRecordsMap =
FormatUtils.spillableMap(writeConfig, maxCompactionMemoryInBytes);
- while (!itr.reachedEnd()) {
- RowData row = itr.nextRecord();
+ while (itr.hasNext()) {
+ RowData row = itr.next();
String recordKey = row.getString(HOODIE_RECORD_KEY_COL_POS).toString();
ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
serializer.serialize(row, new BytesArrayOutputView(baos));
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
index 453d0fee23..820424549f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
@@ -20,8 +20,9 @@ package org.apache.hudi.table.format.cow;
import java.util.Comparator;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.table.format.HoodieParquetReader;
+import org.apache.hudi.common.util.ClosableIterator;
import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.table.format.RecordIterators;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.flink.api.common.io.FileInputFormat;
@@ -75,7 +76,7 @@ public class CopyOnWriteInputFormat extends
FileInputFormat<RowData> {
private final SerializableConfiguration conf;
private final long limit;
- private transient HoodieParquetReader reader;
+ private transient ClosableIterator<RowData> itr;
private transient long currentReadCount;
/**
@@ -128,7 +129,7 @@ public class CopyOnWriteInputFormat extends
FileInputFormat<RowData> {
}
});
- this.reader = HoodieParquetReader.getReader(
+ this.itr = RecordIterators.getParquetRecordIterator(
internalSchemaManager,
utcTimestamp,
true,
@@ -276,26 +277,26 @@ public class CopyOnWriteInputFormat extends
FileInputFormat<RowData> {
}
@Override
- public boolean reachedEnd() throws IOException {
+ public boolean reachedEnd() {
if (currentReadCount >= limit) {
return true;
} else {
- return reader.reachedEnd();
+ return !itr.hasNext();
}
}
@Override
public RowData nextRecord(RowData reuse) {
currentReadCount++;
- return reader.nextRecord();
+ return itr.next();
}
@Override
public void close() throws IOException {
- if (reader != null) {
- this.reader.close();
+ if (itr != null) {
+ this.itr.close();
}
- this.reader = null;
+ this.itr = null;
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 7f7989e79e..2e40831d46 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -29,12 +29,13 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.FormatUtils;
-import org.apache.hudi.table.format.HoodieParquetReader;
import org.apache.hudi.table.format.InternalSchemaManager;
+import org.apache.hudi.table.format.RecordIterators;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.RowDataProjection;
@@ -94,7 +95,7 @@ public class MergeOnReadInputFormat
/**
* Uniform iterator view for the underneath records.
*/
- private transient RecordIterator iterator;
+ private transient ClosableIterator<RowData> iterator;
// for project push down
/**
@@ -178,17 +179,17 @@ public class MergeOnReadInputFormat
mayShiftInputSplit(split);
}
- protected RecordIterator initIterator(MergeOnReadInputSplit split) throws
IOException {
+ protected ClosableIterator<RowData> initIterator(MergeOnReadInputSplit
split) throws IOException {
if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size()
> 0)) {
if (split.getInstantRange() != null) {
// base file only with commit time filtering
return new BaseFileOnlyFilteringIterator(
split.getInstantRange(),
this.tableState.getRequiredRowType(),
- getReader(split.getBasePath().get(),
getRequiredPosWithCommitTime(this.requiredPos)));
+ getBaseFileIterator(split.getBasePath().get(),
getRequiredPosWithCommitTime(this.requiredPos)));
} else {
// base file only
- return new
BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
+ return getBaseFileIterator(split.getBasePath().get());
}
} else if (!split.getBasePath().isPresent()) {
// log files only
@@ -199,7 +200,7 @@ public class MergeOnReadInputFormat
}
} else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
return new SkipMergeIterator(
- getRequiredSchemaReader(split.getBasePath().get()),
+ getBaseFileIterator(split.getBasePath().get()),
getLogFileIterator(split));
} else if
(split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
return new MergeIterator(
@@ -214,7 +215,7 @@ public class MergeOnReadInputFormat
this.requiredPos,
this.emitDelete,
this.tableState.getOperationPos(),
- getFullSchemaReader(split.getBasePath().get()));
+ getBaseFileIteratorWithMetadata(split.getBasePath().get()));
} else {
throw new HoodieException("Unable to select an Iterator to read the
Hoodie MOR File Split for "
+ "file path: " + split.getBasePath()
@@ -253,14 +254,14 @@ public class MergeOnReadInputFormat
return true;
} else {
// log file reaches end ?
- return this.iterator.reachedEnd();
+ return !this.iterator.hasNext();
}
}
@Override
public RowData nextRecord(RowData o) {
currentReadCount++;
- return this.iterator.nextRecord();
+ return this.iterator.next();
}
@Override
@@ -295,19 +296,19 @@ public class MergeOnReadInputFormat
}
}
- protected HoodieParquetReader getFullSchemaReader(String path) {
+ protected ClosableIterator<RowData> getBaseFileIteratorWithMetadata(String
path) {
try {
- return getReader(path, IntStream.range(0,
this.tableState.getRowType().getFieldCount()).toArray());
+ return getBaseFileIterator(path, IntStream.range(0,
this.tableState.getRowType().getFieldCount()).toArray());
} catch (IOException e) {
throw new HoodieException("Get reader error for path: " + path);
}
}
- protected HoodieParquetReader getRequiredSchemaReader(String path) throws
IOException {
- return getReader(path, this.requiredPos);
+ protected ClosableIterator<RowData> getBaseFileIterator(String path) throws
IOException {
+ return getBaseFileIterator(path, this.requiredPos);
}
- private HoodieParquetReader getReader(String path, int[] requiredPos) throws
IOException {
+ private ClosableIterator<RowData> getBaseFileIterator(String path, int[]
requiredPos) throws IOException {
// generate partition specs.
LinkedHashMap<String, String> partSpec =
FilePathUtils.extractPartitionKeyValues(
new org.apache.hadoop.fs.Path(path).getParent(),
@@ -329,7 +330,7 @@ public class MergeOnReadInputFormat
}
});
- return HoodieParquetReader.getReader(
+ return RecordIterators.getParquetRecordIterator(
internalSchemaManager,
this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
true,
@@ -529,46 +530,12 @@ public class MergeOnReadInputFormat
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
- protected interface RecordIterator {
- boolean reachedEnd() throws IOException;
-
- RowData nextRecord();
-
- void close() throws IOException;
- }
-
- protected static class BaseFileOnlyIterator implements RecordIterator {
- // base file reader
- private final HoodieParquetReader reader;
-
- public BaseFileOnlyIterator(HoodieParquetReader reader) {
- this.reader = reader;
- }
-
- @Override
- public boolean reachedEnd() throws IOException {
- return this.reader.reachedEnd();
- }
-
- @Override
- public RowData nextRecord() {
- return this.reader.nextRecord();
- }
-
- @Override
- public void close() throws IOException {
- if (this.reader != null) {
- this.reader.close();
- }
- }
- }
-
/**
- * Similar with {@link BaseFileOnlyIterator} but with instant time filtering.
+ * Base record iterator with instant time filtering.
*/
- static class BaseFileOnlyFilteringIterator implements RecordIterator {
- // base file reader
- private final HoodieParquetReader reader;
+ static class BaseFileOnlyFilteringIterator implements
ClosableIterator<RowData> {
+ // base file record iterator
+ private final ClosableIterator<RowData> nested;
private final InstantRange instantRange;
private final RowDataProjection projection;
@@ -577,44 +544,44 @@ public class MergeOnReadInputFormat
BaseFileOnlyFilteringIterator(
Option<InstantRange> instantRange,
RowType requiredRowType,
- HoodieParquetReader reader) {
- this.reader = reader;
+ ClosableIterator<RowData> nested) {
+ this.nested = nested;
this.instantRange = instantRange.orElse(null);
int[] positions = IntStream.range(1, 1 +
requiredRowType.getFieldCount()).toArray();
projection = RowDataProjection.instance(requiredRowType, positions);
}
@Override
- public boolean reachedEnd() throws IOException {
- while (!this.reader.reachedEnd()) {
- currentRecord = this.reader.nextRecord();
+ public boolean hasNext() {
+ while (this.nested.hasNext()) {
+ currentRecord = this.nested.next();
if (instantRange != null) {
boolean isInRange =
instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
if (isInRange) {
- return false;
+ return true;
}
} else {
- return false;
+ return true;
}
}
- return true;
+ return false;
}
@Override
- public RowData nextRecord() {
+ public RowData next() {
// can promote: no need to project with null instant range
return projection.project(currentRecord);
}
@Override
- public void close() throws IOException {
- if (this.reader != null) {
- this.reader.close();
+ public void close() {
+ if (this.nested != null) {
+ this.nested.close();
}
}
}
- protected static class LogFileOnlyIterator implements RecordIterator {
+ protected static class LogFileOnlyIterator implements
ClosableIterator<RowData> {
// iterator for log files
private final ClosableIterator<RowData> iterator;
@@ -623,12 +590,12 @@ public class MergeOnReadInputFormat
}
@Override
- public boolean reachedEnd() {
- return !this.iterator.hasNext();
+ public boolean hasNext() {
+ return this.iterator.hasNext();
}
@Override
- public RowData nextRecord() {
+ public RowData next() {
return this.iterator.next();
}
@@ -640,9 +607,9 @@ public class MergeOnReadInputFormat
}
}
- static class SkipMergeIterator implements RecordIterator {
- // base file reader
- private final HoodieParquetReader reader;
+ static class SkipMergeIterator implements ClosableIterator<RowData> {
+ // base file record iterator
+ private final ClosableIterator<RowData> nested;
// iterator for log files
private final ClosableIterator<RowData> iterator;
@@ -653,34 +620,34 @@ public class MergeOnReadInputFormat
private RowData currentRecord;
- SkipMergeIterator(HoodieParquetReader reader, ClosableIterator<RowData>
iterator) {
- this.reader = reader;
+ SkipMergeIterator(ClosableIterator<RowData> nested,
ClosableIterator<RowData> iterator) {
+ this.nested = nested;
this.iterator = iterator;
}
@Override
- public boolean reachedEnd() throws IOException {
- if (!readLogs && !this.reader.reachedEnd()) {
- currentRecord = this.reader.nextRecord();
- return false;
+ public boolean hasNext() {
+ if (!readLogs && this.nested.hasNext()) {
+ currentRecord = this.nested.next();
+ return true;
}
readLogs = true;
if (this.iterator.hasNext()) {
currentRecord = this.iterator.next();
- return false;
+ return true;
}
- return true;
+ return false;
}
@Override
- public RowData nextRecord() {
+ public RowData next() {
return currentRecord;
}
@Override
- public void close() throws IOException {
- if (this.reader != null) {
- this.reader.close();
+ public void close() {
+ if (this.nested != null) {
+ this.nested.close();
}
if (this.iterator != null) {
this.iterator.close();
@@ -688,9 +655,9 @@ public class MergeOnReadInputFormat
}
}
- protected static class MergeIterator implements RecordIterator {
- // base file reader
- private final HoodieParquetReader reader;
+ protected static class MergeIterator implements ClosableIterator<RowData> {
+ // base file record iterator
+ private final ClosableIterator<RowData> nested;
// log keys used for merging
private final Iterator<String> logKeysIterator;
// scanner
@@ -730,12 +697,12 @@ public class MergeOnReadInputFormat
int[] requiredPos,
boolean emitDelete,
int operationPos,
- HoodieParquetReader reader) { // the reader should be with full schema
+ ClosableIterator<RowData> nested) { // the iterator should be with
full schema
this(flinkConf, hadoopConf, split, tableRowType, requiredRowType,
tableSchema,
querySchema,
Option.of(RowDataProjection.instance(requiredRowType, requiredPos)),
Option.of(record -> buildAvroRecordBySchema(record, requiredSchema,
requiredPos, new GenericRecordBuilder(requiredSchema))),
- emitDelete, operationPos, reader);
+ emitDelete, operationPos, nested);
}
public MergeIterator(
@@ -750,9 +717,9 @@ public class MergeOnReadInputFormat
Option<Function<IndexedRecord, GenericRecord>> avroProjection,
boolean emitDelete,
int operationPos,
- HoodieParquetReader reader) { // the reader should be with full schema
+ ClosableIterator<RowData> nested) { // the iterator should be with
full schema
this.tableSchema = tableSchema;
- this.reader = reader;
+ this.nested = nested;
this.scanner = FormatUtils.logScanner(split, tableSchema, querySchema,
flinkConf, hadoopConf);
this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps();
this.logKeysIterator = scanner.getRecords().keySet().iterator();
@@ -766,9 +733,9 @@ public class MergeOnReadInputFormat
}
@Override
- public boolean reachedEnd() throws IOException {
- while (!readLogs && !this.reader.reachedEnd()) {
- currentRecord = this.reader.nextRecord();
+ public boolean hasNext() {
+ while (!readLogs && this.nested.hasNext()) {
+ currentRecord = this.nested.next();
if (instantRange != null) {
boolean isInRange =
instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
if (!isInRange) {
@@ -794,14 +761,14 @@ public class MergeOnReadInputFormat
: mergedAvroRecord.get();
this.currentRecord = (RowData)
avroToRowDataConverter.convert(avroRecord);
this.currentRecord.setRowKind(rowKind);
- return false;
+ return true;
}
}
// project the full record in base with required positions
if (projection.isPresent()) {
currentRecord = projection.get().project(currentRecord);
}
- return false;
+ return true;
}
// read the logs
readLogs = true;
@@ -816,42 +783,48 @@ public class MergeOnReadInputFormat
: insertAvroRecord.get();
this.currentRecord = (RowData)
avroToRowDataConverter.convert(avroRecord);
FormatUtils.setRowKind(this.currentRecord, insertAvroRecord.get(),
this.operationPos);
- return false;
+ return true;
}
}
}
- return true;
+ return false;
}
- private Option<IndexedRecord> getInsertValue(String curKey) throws
IOException {
+ private Option<IndexedRecord> getInsertValue(String curKey) {
final HoodieAvroRecord<?> record = (HoodieAvroRecord)
scanner.getRecords().get(curKey);
if (!emitDelete && HoodieOperation.isDelete(record.getOperation())) {
return Option.empty();
}
- return record.getData().getInsertValue(tableSchema);
+ try {
+ return record.getData().getInsertValue(tableSchema);
+ } catch (IOException e) {
+ throw new HoodieIOException("Get insert value from payload exception",
e);
+ }
}
@Override
- public RowData nextRecord() {
+ public RowData next() {
return currentRecord;
}
@Override
- public void close() throws IOException {
- if (this.reader != null) {
- this.reader.close();
+ public void close() {
+ if (this.nested != null) {
+ this.nested.close();
}
if (this.scanner != null) {
this.scanner.close();
}
}
- private Option<IndexedRecord> mergeRowWithLog(
- RowData curRow,
- String curKey) throws IOException {
+ private Option<IndexedRecord> mergeRowWithLog(RowData curRow, String
curKey) {
final HoodieAvroRecord<?> record = (HoodieAvroRecord)
scanner.getRecords().get(curKey);
GenericRecord historyAvroRecord = (GenericRecord)
rowDataToAvroConverter.convert(tableSchema, curRow);
- return record.getData().combineAndGetUpdateValue(historyAvroRecord,
tableSchema, payloadProps);
+ try {
+ return record.getData().combineAndGetUpdateValue(historyAvroRecord,
tableSchema, payloadProps);
+ } catch (IOException e) {
+ throw new HoodieIOException("Merge base and delta payloads exception",
e);
+ }
}
}