This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 81710bb40 [core][bug] readers don't close while io exception happening
(#1037)
81710bb40 is described below
commit 81710bb400e37defe415cc706a2e278c2e69d981
Author: YeJunHao <[email protected]>
AuthorDate: Wed Apr 26 21:26:08 2023 +0800
[core][bug] readers don't close while io exception happening (#1037)
---
.../apache/paimon/mergetree/MergeTreeReaders.java | 21 +++++--
.../compact/ChangelogMergeTreeRewriter.java | 6 +-
.../apache/paimon/format/orc/OrcReaderFactory.java | 67 ++++++++++++----------
3 files changed, 56 insertions(+), 38 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
index d77ffaf56..624aca091 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
@@ -28,6 +28,7 @@ import
org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
import org.apache.paimon.mergetree.compact.SortMergeReader;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.utils.IOUtils;
import java.io.IOException;
import java.util.ArrayList;
@@ -69,10 +70,7 @@ public class MergeTreeReaders {
Comparator<InternalRow> userKeyComparator,
MergeFunctionWrapper<KeyValue> mergeFunctionWrapper)
throws IOException {
- List<RecordReader<KeyValue>> readers = new ArrayList<>();
- for (SortedRun run : section) {
- readers.add(readerForRun(run, readerFactory));
- }
+ List<RecordReader<KeyValue>> readers = readerForSection(section,
readerFactory);
if (readers.size() == 1) {
return readers.get(0);
} else {
@@ -91,4 +89,19 @@ public class MergeTreeReaders {
}
return ConcatRecordReader.create(readers);
}
+
+ public static List<RecordReader<KeyValue>> readerForSection(
+ List<SortedRun> runs, KeyValueFileReaderFactory readerFactory)
throws IOException {
+ List<RecordReader<KeyValue>> readers = new ArrayList<>();
+ try {
+ for (SortedRun run : runs) {
+ readers.add(readerForRun(run, readerFactory));
+ }
+ } catch (IOException e) {
+ // if one of the readers creating failed, we need to close them
all.
+ readers.forEach(IOUtils::closeQuietly);
+ throw e;
+ }
+ return readers;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index be3a79358..4e704db5b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -69,10 +69,8 @@ public abstract class ChangelogMergeTreeRewriter extends
MergeTreeCompactRewrite
for (List<SortedRun> section : sections) {
sectionReaders.add(
() -> {
- List<RecordReader<KeyValue>> runReaders = new
ArrayList<>();
- for (SortedRun run : section) {
- runReaders.add(MergeTreeReaders.readerForRun(run,
readerFactory));
- }
+ List<RecordReader<KeyValue>> runReaders =
+ MergeTreeReaders.readerForSection(section,
readerFactory);
return new SortMergeReader<>(
runReaders, keyComparator,
createMergeWrapper(outputLevel));
});
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index b5267fd7b..3b794f039 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -31,6 +31,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader.RecordIterator;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Pool;
@@ -252,41 +253,47 @@ public class OrcReaderFactory implements
FormatReaderFactory {
long splitLength)
throws IOException {
org.apache.orc.Reader orcReader = createReader(conf, fileIO, path);
-
- // get offset and length for the stripes that start in the split
- Pair<Long, Long> offsetAndLength =
- getOffsetAndLengthForSplit(splitStart, splitLength,
orcReader.getStripes());
-
- // create ORC row reader configuration
- org.apache.orc.Reader.Options options =
- new org.apache.orc.Reader.Options()
- .schema(schema)
- .range(offsetAndLength.getLeft(),
offsetAndLength.getRight())
- .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
-
.skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf))
-
.tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));
-
- // configure filters
- if (!conjunctPredicates.isEmpty()) {
- SearchArgument.Builder b = SearchArgumentFactory.newBuilder();
- b = b.startAnd();
- for (OrcFilters.Predicate predicate : conjunctPredicates) {
- predicate.add(b);
+ try {
+ // get offset and length for the stripes that start in the split
+ Pair<Long, Long> offsetAndLength =
+ getOffsetAndLengthForSplit(splitStart, splitLength,
orcReader.getStripes());
+
+ // create ORC row reader configuration
+ org.apache.orc.Reader.Options options =
+ new org.apache.orc.Reader.Options()
+ .schema(schema)
+ .range(offsetAndLength.getLeft(),
offsetAndLength.getRight())
+ .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
+
.skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf))
+ .tolerateMissingSchema(
+
OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));
+
+ // configure filters
+ if (!conjunctPredicates.isEmpty()) {
+ SearchArgument.Builder b = SearchArgumentFactory.newBuilder();
+ b = b.startAnd();
+ for (OrcFilters.Predicate predicate : conjunctPredicates) {
+ predicate.add(b);
+ }
+ b = b.end();
+ options.searchArgument(b.build(), new String[] {});
}
- b = b.end();
- options.searchArgument(b.build(), new String[] {});
- }
- // configure selected fields
- options.include(computeProjectionMask(schema, selectedFields));
+ // configure selected fields
+ options.include(computeProjectionMask(schema, selectedFields));
- // create ORC row reader
- RecordReader orcRowsReader = orcReader.rows(options);
+ // create ORC row reader
+ RecordReader orcRowsReader = orcReader.rows(options);
- // assign ids
- schema.getId();
+ // assign ids
+ schema.getId();
- return orcRowsReader;
+ return orcRowsReader;
+ } catch (IOException e) {
+ // exception happened, we need to close the reader
+ IOUtils.closeQuietly(orcReader);
+ throw e;
+ }
}
private static VectorizedRowBatch createBatchWrapper(TypeDescription
schema, int batchSize) {