This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.4 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit b70d87337cce60ebd1776c2de703ce133a3edd0a Author: YeJunHao <[email protected]> AuthorDate: Wed Apr 26 21:26:08 2023 +0800 [core][bug] readers don't close while io exception happening (#1037) --- .../apache/paimon/mergetree/MergeTreeReaders.java | 21 +++++-- .../compact/ChangelogMergeTreeRewriter.java | 6 +- .../apache/paimon/format/orc/OrcReaderFactory.java | 67 ++++++++++++---------- 3 files changed, 56 insertions(+), 38 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java index d77ffaf56..624aca091 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java @@ -28,6 +28,7 @@ import org.apache.paimon.mergetree.compact.MergeFunctionWrapper; import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper; import org.apache.paimon.mergetree.compact.SortMergeReader; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.utils.IOUtils; import java.io.IOException; import java.util.ArrayList; @@ -69,10 +70,7 @@ public class MergeTreeReaders { Comparator<InternalRow> userKeyComparator, MergeFunctionWrapper<KeyValue> mergeFunctionWrapper) throws IOException { - List<RecordReader<KeyValue>> readers = new ArrayList<>(); - for (SortedRun run : section) { - readers.add(readerForRun(run, readerFactory)); - } + List<RecordReader<KeyValue>> readers = readerForSection(section, readerFactory); if (readers.size() == 1) { return readers.get(0); } else { @@ -91,4 +89,19 @@ public class MergeTreeReaders { } return ConcatRecordReader.create(readers); } + + public static List<RecordReader<KeyValue>> readerForSection( + List<SortedRun> runs, KeyValueFileReaderFactory readerFactory) throws IOException { + List<RecordReader<KeyValue>> readers = new ArrayList<>(); + try { + for (SortedRun run : runs) { + readers.add(readerForRun(run, readerFactory)); + } + } catch (IOException e) { + // if one of the readers creating failed, we need to close them all. + readers.forEach(IOUtils::closeQuietly); + throw e; + } + return readers; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java index be3a79358..4e704db5b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java @@ -69,10 +69,8 @@ public abstract class ChangelogMergeTreeRewriter extends MergeTreeCompactRewrite for (List<SortedRun> section : sections) { sectionReaders.add( () -> { - List<RecordReader<KeyValue>> runReaders = new ArrayList<>(); - for (SortedRun run : section) { - runReaders.add(MergeTreeReaders.readerForRun(run, readerFactory)); - } + List<RecordReader<KeyValue>> runReaders = + MergeTreeReaders.readerForSection(section, readerFactory); return new SortMergeReader<>( runReaders, keyComparator, createMergeWrapper(outputLevel)); }); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index b5267fd7b..3b794f039 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -31,6 +31,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.reader.RecordReader.RecordIterator; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.IOUtils; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Pool; @@ -252,41 +253,47 @@ public class OrcReaderFactory implements FormatReaderFactory { long splitLength) throws IOException { org.apache.orc.Reader orcReader = createReader(conf, fileIO, path); - - // get offset and length for the stripes that start in the split - Pair<Long, Long> offsetAndLength = - getOffsetAndLengthForSplit(splitStart, splitLength, orcReader.getStripes()); - - // create ORC row reader configuration - org.apache.orc.Reader.Options options = - new org.apache.orc.Reader.Options() - .schema(schema) - .range(offsetAndLength.getLeft(), offsetAndLength.getRight()) - .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf)) - .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf)) - .tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf)); - - // configure filters - if (!conjunctPredicates.isEmpty()) { - SearchArgument.Builder b = SearchArgumentFactory.newBuilder(); - b = b.startAnd(); - for (OrcFilters.Predicate predicate : conjunctPredicates) { - predicate.add(b); + try { + // get offset and length for the stripes that start in the split + Pair<Long, Long> offsetAndLength = + getOffsetAndLengthForSplit(splitStart, splitLength, orcReader.getStripes()); + + // create ORC row reader configuration + org.apache.orc.Reader.Options options = + new org.apache.orc.Reader.Options() + .schema(schema) + .range(offsetAndLength.getLeft(), offsetAndLength.getRight()) + .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf)) + .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf)) + .tolerateMissingSchema( + OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf)); + + // configure filters + if (!conjunctPredicates.isEmpty()) { + SearchArgument.Builder b = SearchArgumentFactory.newBuilder(); + b = b.startAnd(); + for (OrcFilters.Predicate predicate : conjunctPredicates) { + predicate.add(b); + } + b = b.end(); + options.searchArgument(b.build(), new String[] {}); } - b = b.end(); - options.searchArgument(b.build(), new String[] {}); - } - // configure selected fields - options.include(computeProjectionMask(schema, selectedFields)); + // configure selected fields + options.include(computeProjectionMask(schema, selectedFields)); - // create ORC row reader - RecordReader orcRowsReader = orcReader.rows(options); + // create ORC row reader + RecordReader orcRowsReader = orcReader.rows(options); - // assign ids - schema.getId(); + // assign ids + schema.getId(); - return orcRowsReader; + return orcRowsReader; + } catch (IOException e) { + // exception happened, we need to close the reader + IOUtils.closeQuietly(orcReader); + throw e; + } } private static VectorizedRowBatch createBatchWrapper(TypeDescription schema, int batchSize) {
