This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.4
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit b70d87337cce60ebd1776c2de703ce133a3edd0a
Author: YeJunHao <[email protected]>
AuthorDate: Wed Apr 26 21:26:08 2023 +0800

    [core][bug] readers don't close while io exception happening (#1037)
---
 .../apache/paimon/mergetree/MergeTreeReaders.java  | 21 +++++--
 .../compact/ChangelogMergeTreeRewriter.java        |  6 +-
 .../apache/paimon/format/orc/OrcReaderFactory.java | 67 ++++++++++++----------
 3 files changed, 56 insertions(+), 38 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
index d77ffaf56..624aca091 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
@@ -28,6 +28,7 @@ import 
org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
 import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper;
 import org.apache.paimon.mergetree.compact.SortMergeReader;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.utils.IOUtils;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -69,10 +70,7 @@ public class MergeTreeReaders {
             Comparator<InternalRow> userKeyComparator,
             MergeFunctionWrapper<KeyValue> mergeFunctionWrapper)
             throws IOException {
-        List<RecordReader<KeyValue>> readers = new ArrayList<>();
-        for (SortedRun run : section) {
-            readers.add(readerForRun(run, readerFactory));
-        }
+        List<RecordReader<KeyValue>> readers = readerForSection(section, 
readerFactory);
         if (readers.size() == 1) {
             return readers.get(0);
         } else {
@@ -91,4 +89,19 @@ public class MergeTreeReaders {
         }
         return ConcatRecordReader.create(readers);
     }
+
+    public static List<RecordReader<KeyValue>> readerForSection(
+            List<SortedRun> runs, KeyValueFileReaderFactory readerFactory) 
throws IOException {
+        List<RecordReader<KeyValue>> readers = new ArrayList<>();
+        try {
+            for (SortedRun run : runs) {
+                readers.add(readerForRun(run, readerFactory));
+            }
+        } catch (IOException e) {
+            // if one of the readers creating failed, we need to close them 
all.
+            readers.forEach(IOUtils::closeQuietly);
+            throw e;
+        }
+        return readers;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index be3a79358..4e704db5b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -69,10 +69,8 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
         for (List<SortedRun> section : sections) {
             sectionReaders.add(
                     () -> {
-                        List<RecordReader<KeyValue>> runReaders = new 
ArrayList<>();
-                        for (SortedRun run : section) {
-                            runReaders.add(MergeTreeReaders.readerForRun(run, 
readerFactory));
-                        }
+                        List<RecordReader<KeyValue>> runReaders =
+                                MergeTreeReaders.readerForSection(section, 
readerFactory);
                         return new SortMergeReader<>(
                                 runReaders, keyComparator, 
createMergeWrapper(outputLevel));
                     });
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index b5267fd7b..3b794f039 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -31,6 +31,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.reader.RecordReader.RecordIterator;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.IOUtils;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Pool;
 
@@ -252,41 +253,47 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
             long splitLength)
             throws IOException {
         org.apache.orc.Reader orcReader = createReader(conf, fileIO, path);
-
-        // get offset and length for the stripes that start in the split
-        Pair<Long, Long> offsetAndLength =
-                getOffsetAndLengthForSplit(splitStart, splitLength, 
orcReader.getStripes());
-
-        // create ORC row reader configuration
-        org.apache.orc.Reader.Options options =
-                new org.apache.orc.Reader.Options()
-                        .schema(schema)
-                        .range(offsetAndLength.getLeft(), 
offsetAndLength.getRight())
-                        .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
-                        
.skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf))
-                        
.tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));
-
-        // configure filters
-        if (!conjunctPredicates.isEmpty()) {
-            SearchArgument.Builder b = SearchArgumentFactory.newBuilder();
-            b = b.startAnd();
-            for (OrcFilters.Predicate predicate : conjunctPredicates) {
-                predicate.add(b);
+        try {
+            // get offset and length for the stripes that start in the split
+            Pair<Long, Long> offsetAndLength =
+                    getOffsetAndLengthForSplit(splitStart, splitLength, 
orcReader.getStripes());
+
+            // create ORC row reader configuration
+            org.apache.orc.Reader.Options options =
+                    new org.apache.orc.Reader.Options()
+                            .schema(schema)
+                            .range(offsetAndLength.getLeft(), 
offsetAndLength.getRight())
+                            .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
+                            
.skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf))
+                            .tolerateMissingSchema(
+                                    
OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));
+
+            // configure filters
+            if (!conjunctPredicates.isEmpty()) {
+                SearchArgument.Builder b = SearchArgumentFactory.newBuilder();
+                b = b.startAnd();
+                for (OrcFilters.Predicate predicate : conjunctPredicates) {
+                    predicate.add(b);
+                }
+                b = b.end();
+                options.searchArgument(b.build(), new String[] {});
             }
-            b = b.end();
-            options.searchArgument(b.build(), new String[] {});
-        }
 
-        // configure selected fields
-        options.include(computeProjectionMask(schema, selectedFields));
+            // configure selected fields
+            options.include(computeProjectionMask(schema, selectedFields));
 
-        // create ORC row reader
-        RecordReader orcRowsReader = orcReader.rows(options);
+            // create ORC row reader
+            RecordReader orcRowsReader = orcReader.rows(options);
 
-        // assign ids
-        schema.getId();
+            // assign ids
+            schema.getId();
 
-        return orcRowsReader;
+            return orcRowsReader;
+        } catch (IOException e) {
+            // exception happened, we need to close the reader
+            IOUtils.closeQuietly(orcReader);
+            throw e;
+        }
     }
 
     private static VectorizedRowBatch createBatchWrapper(TypeDescription 
schema, int batchSize) {

Reply via email to