This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 36fa5f102 [core] close files in CompactRewriter to avoid file leaks. 
(#3409)
36fa5f102 is described below

commit 36fa5f1022e1980be80e63d7c4de265acf6f6581
Author: liming.1018 <[email protected]>
AuthorDate: Thu May 30 14:27:11 2024 +0800

    [core] close files in CompactRewriter to avoid file leaks. (#3409)
---
 .../compact/ChangelogMergeTreeRewriter.java        |  23 +-
 .../compact/MergeTreeCompactRewriter.java          |  32 +-
 .../mergetree/ChangelogMergeTreeRewriterTest.java  | 336 +++++++++++++++++++++
 3 files changed, 377 insertions(+), 14 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
index a03d53329..50a7ec3be 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java
@@ -30,7 +30,9 @@ import org.apache.paimon.manifest.FileSource;
 import org.apache.paimon.mergetree.MergeSorter;
 import org.apache.paimon.mergetree.SortedRun;
 import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.ExceptionUtils;
 import org.apache.paimon.utils.FieldsComparator;
+import org.apache.paimon.utils.IOUtils;
 
 import javax.annotation.Nullable;
 
@@ -123,6 +125,7 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
         CloseableIterator<ChangelogResult> iterator = null;
         RollingFileWriter<KeyValue, DataFileMeta> compactFileWriter = null;
         RollingFileWriter<KeyValue, DataFileMeta> changelogFileWriter = null;
+        Exception collectedExceptions = null;
 
         try {
             iterator =
@@ -151,18 +154,22 @@ public abstract class ChangelogMergeTreeRewriter extends 
MergeTreeCompactRewrite
                     }
                 }
             }
+        } catch (Exception e) {
+            collectedExceptions = e;
         } finally {
-            if (iterator != null) {
-                iterator.close();
-            }
-            if (compactFileWriter != null) {
-                compactFileWriter.close();
-            }
-            if (changelogFileWriter != null) {
-                changelogFileWriter.close();
+            try {
+                IOUtils.closeAll(iterator, compactFileWriter, 
changelogFileWriter);
+            } catch (Exception e) {
+                collectedExceptions = ExceptionUtils.firstOrSuppressed(e, 
collectedExceptions);
             }
         }
 
+        if (null != collectedExceptions) {
+            compactFileWriter.abort();
+            changelogFileWriter.abort();
+            throw collectedExceptions;
+        }
+
         List<DataFileMeta> before = extractFilesFromSections(sections);
         List<DataFileMeta> after =
                 compactFileWriter != null
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
index 89e82fe3f..df299fd84 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java
@@ -32,7 +32,9 @@ import org.apache.paimon.mergetree.MergeTreeReaders;
 import org.apache.paimon.mergetree.SortedRun;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.utils.ExceptionUtils;
 import org.apache.paimon.utils.FieldsComparator;
+import org.apache.paimon.utils.IOUtils;
 
 import javax.annotation.Nullable;
 
@@ -75,13 +77,31 @@ public class MergeTreeCompactRewriter extends 
AbstractCompactRewriter {
             int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections) throws Exception {
         RollingFileWriter<KeyValue, DataFileMeta> writer =
                 writerFactory.createRollingMergeTreeFileWriter(outputLevel, 
FileSource.COMPACT);
-        RecordReader<KeyValue> reader =
-                readerForMergeTree(sections, new 
ReducerMergeFunctionWrapper(mfFactory.create()));
-        if (dropDelete) {
-            reader = new DropDeleteReader(reader);
+        RecordReader<KeyValue> reader = null;
+        Exception collectedExceptions = null;
+        try {
+            reader =
+                    readerForMergeTree(
+                            sections, new 
ReducerMergeFunctionWrapper(mfFactory.create()));
+            if (dropDelete) {
+                reader = new DropDeleteReader(reader);
+            }
+            writer.write(new RecordReaderIterator<>(reader));
+        } catch (Exception e) {
+            collectedExceptions = e;
+        } finally {
+            try {
+                IOUtils.closeAll(reader, writer);
+            } catch (Exception e) {
+                collectedExceptions = ExceptionUtils.firstOrSuppressed(e, 
collectedExceptions);
+            }
         }
-        writer.write(new RecordReaderIterator<>(reader));
-        writer.close();
+
+        if (null != collectedExceptions) {
+            writer.abort();
+            throw collectedExceptions;
+        }
+
         List<DataFileMeta> before = extractFilesFromSections(sections);
         notifyRewriteCompactBefore(before);
         return new CompactResult(before, writer.result());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java
new file mode 100644
index 000000000..44e2a292b
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ChangelogMergeTreeRewriterTest.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.format.FlushingFileFormat;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.FileReaderFactory;
+import org.apache.paimon.io.KeyValueFileReaderFactory;
+import org.apache.paimon.io.KeyValueFileWriterFactory;
+import org.apache.paimon.io.RollingFileWriter;
+import org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter;
+import org.apache.paimon.mergetree.compact.ChangelogResult;
+import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.mergetree.compact.FullChangelogMergeFunctionWrapper;
+import org.apache.paimon.mergetree.compact.MergeFunctionWrapper;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.KeyValueFieldsExtractor;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
+import static 
org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/** Tests for {@link ChangelogMergeTreeRewriter}. */
+public class ChangelogMergeTreeRewriterTest {
+    @TempDir java.nio.file.Path tempDir;
+    private Path path;
+    private Comparator<InternalRow> comparator;
+    private SchemaManager schemaManager;
+    private TableSchema tableSchema;
+    private RowType keyType;
+    private RowType valueType;
+
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        path = new Path(tempDir.toString());
+        comparator = Comparator.comparingInt(o -> o.getInt(0));
+        schemaManager = new SchemaManager(LocalFileIO.create(), path);
+        RowType recordType =
+                DataTypes.ROW(
+                        DataTypes.FIELD(0, "key", DataTypes.INT()),
+                        DataTypes.FIELD(1, "value", DataTypes.INT()));
+        tableSchema =
+                schemaManager.createTable(
+                        new Schema(
+                                recordType.getFields(),
+                                Collections.emptyList(),
+                                Collections.singletonList("key"),
+                                Collections.emptyMap(),
+                                ""));
+        keyType = recordType.project(Collections.singletonList("key"));
+        valueType = recordType.project(Collections.singletonList("value"));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testRewriteFailAndCleanupFiles(boolean rewriteChangelog) 
throws Exception {
+        List<List<SortedRun>> sections = createTestSections(2);
+        Path testPath = new Path(path, UUID.randomUUID().toString());
+        try (ChangelogMergeTreeRewriter rewriter =
+                new TestRewriter(
+                        createReaderFactory(schemaManager, tableSchema, 
keyType, valueType),
+                        createWriterFactory(testPath, keyType, valueType),
+                        comparator,
+                        new MergeSorter(
+                                new CoreOptions(new Options()),
+                                tableSchema.logicalPrimaryKeysType(),
+                                tableSchema.logicalRowType(),
+                                null),
+                        rewriteChangelog,
+                        true)) {
+            try {
+                rewriter.rewrite(5, true, sections);
+                fail();
+            } catch (IOException ignore) {
+                // ignore
+            }
+
+            List<java.nio.file.Path> files =
+                    Files.walk(Paths.get(testPath.toString()))
+                            .filter(Files::isRegularFile)
+                            .filter(
+                                    p ->
+                                            p.getFileName()
+                                                            .toString()
+                                                            .startsWith(
+                                                                    
DataFilePathFactory
+                                                                            
.DATA_FILE_PREFIX)
+                                                    || p.getFileName()
+                                                            .toString()
+                                                            .startsWith(
+                                                                    
DataFilePathFactory
+                                                                            
.CHANGELOG_FILE_PREFIX))
+                            .collect(Collectors.toList());
+            Assertions.assertEquals(0, files.size());
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testRewriteSuccess(boolean rewriteChangelog) throws Exception {
+        List<List<SortedRun>> sections = createTestSections(2);
+        Path testPath = new Path(path, UUID.randomUUID().toString());
+        try (ChangelogMergeTreeRewriter rewriter =
+                new TestRewriter(
+                        createReaderFactory(schemaManager, tableSchema, 
keyType, valueType),
+                        createWriterFactory(testPath, keyType, valueType),
+                        comparator,
+                        new MergeSorter(
+                                new CoreOptions(new Options()),
+                                tableSchema.logicalPrimaryKeysType(),
+                                tableSchema.logicalRowType(),
+                                null),
+                        rewriteChangelog,
+                        false)) {
+
+            rewriter.rewrite(5, true, sections);
+            List<java.nio.file.Path> files =
+                    Files.walk(Paths.get(testPath.toString()))
+                            .filter(Files::isRegularFile)
+                            .filter(
+                                    p ->
+                                            p.getFileName()
+                                                            .toString()
+                                                            .startsWith(
+                                                                    
DataFilePathFactory
+                                                                            
.DATA_FILE_PREFIX)
+                                                    || p.getFileName()
+                                                            .toString()
+                                                            .startsWith(
+                                                                    
DataFilePathFactory
+                                                                            
.CHANGELOG_FILE_PREFIX))
+                            .collect(Collectors.toList());
+            if (rewriteChangelog) {
+                Assertions.assertEquals(2, files.size()); // changelog + data 
file
+            } else {
+                Assertions.assertEquals(1, files.size()); // data file
+            }
+        }
+    }
+
+    private KeyValueFileWriterFactory createWriterFactory(
+            Path path, RowType keyType, RowType valueType) {
+        String formatIdentifier = "avro";
+        return KeyValueFileWriterFactory.builder(
+                        LocalFileIO.create(),
+                        0,
+                        keyType,
+                        valueType,
+                        new FlushingFileFormat(formatIdentifier),
+                        Collections.singletonMap(formatIdentifier, 
createNonPartFactory(path)),
+                        TARGET_FILE_SIZE.defaultValue().getBytes())
+                .build(BinaryRow.EMPTY_ROW, 0, new CoreOptions(new Options()));
+    }
+
+    private KeyValueFileReaderFactory createReaderFactory(
+            SchemaManager schemaManager, TableSchema schema, RowType keyType, 
RowType valueType) {
+        return KeyValueFileReaderFactory.builder(
+                        LocalFileIO.create(),
+                        schemaManager,
+                        schema,
+                        keyType,
+                        valueType,
+                        ignore -> new FlushingFileFormat("avro"),
+                        createNonPartFactory(path),
+                        new KeyValueFieldsExtractor() {
+                            @Override
+                            public List<DataField> keyFields(TableSchema 
schema) {
+                                return keyType.getFields();
+                            }
+
+                            @Override
+                            public List<DataField> valueFields(TableSchema 
schema) {
+                                return valueType.getFields();
+                            }
+                        },
+                        new CoreOptions(new HashMap<>()))
+                .build(BinaryRow.EMPTY_ROW, 0, DeletionVector.emptyFactory());
+    }
+
+    private List<List<SortedRun>> createTestSections(int numSections) throws 
IOException {
+        List<List<SortedRun>> sections = new ArrayList<>();
+        createFile(Collections.singletonMap(1, 1), keyType, valueType);
+        for (int i = 0; i < numSections; i++) {
+            sections.add(
+                    Collections.singletonList(
+                            SortedRun.fromSorted(
+                                    createFile(
+                                            Collections.singletonMap(1, 
numSections),
+                                            keyType,
+                                            valueType))));
+        }
+        return sections;
+    }
+
+    private List<DataFileMeta> createFile(
+            Map<Integer, Integer> kvs, RowType keyType, RowType valueType) 
throws IOException {
+        KeyValueFileWriterFactory writerFactory = createWriterFactory(path, 
keyType, valueType);
+        RollingFileWriter<KeyValue, DataFileMeta> writer =
+                writerFactory.createRollingChangelogFileWriter(0);
+
+        try {
+            for (Map.Entry<Integer, Integer> kv : kvs.entrySet()) {
+                GenericRow key = new GenericRow(1);
+                key.setField(0, kv.getKey());
+
+                GenericRow value = new GenericRow(1);
+                value.setField(0, kv.getValue());
+
+                writer.write(new KeyValue().replace(key, RowKind.INSERT, 
value));
+            }
+        } finally {
+            writer.close();
+        }
+        return writer.result();
+    }
+
+    private static class TestRewriter extends ChangelogMergeTreeRewriter {
+        private static final int MAX_LEVEL = 5;
+        private final boolean rewriteChangelog;
+        private final boolean closeWithException;
+
+        public TestRewriter(
+                FileReaderFactory<KeyValue> readerFactory,
+                KeyValueFileWriterFactory writerFactory,
+                Comparator<InternalRow> keyComparator,
+                MergeSorter mergeSorter,
+                boolean rewriteChangelog,
+                boolean closeWithException) {
+            super(
+                    MAX_LEVEL,
+                    CoreOptions.MergeEngine.DEDUPLICATE,
+                    readerFactory,
+                    writerFactory,
+                    keyComparator,
+                    null,
+                    DeduplicateMergeFunction.factory(),
+                    mergeSorter,
+                    true,
+                    true);
+            this.rewriteChangelog = rewriteChangelog;
+            this.closeWithException = closeWithException;
+        }
+
+        @Override
+        protected boolean rewriteChangelog(
+                int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections) {
+            return rewriteChangelog;
+        }
+
+        @Override
+        protected UpgradeStrategy upgradeStrategy(int outputLevel, 
DataFileMeta file) {
+            return UpgradeStrategy.CHANGELOG_WITH_REWRITE;
+        }
+
+        @Override
+        protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int 
outputLevel) {
+            return new FullChangelogMergeFunctionWrapper(
+                    mfFactory.create(), MAX_LEVEL, null, false);
+        }
+
+        @Override
+        protected <T> RecordReader<T> readerForMergeTree(
+                List<List<SortedRun>> sections, MergeFunctionWrapper<T> 
mergeFunctionWrapper)
+                throws IOException {
+            RecordReader<T> reader = super.readerForMergeTree(sections, 
mergeFunctionWrapper);
+            return new RecordReader<T>() {
+                @Nullable
+                @Override
+                public RecordIterator<T> readBatch() throws IOException {
+                    return reader.readBatch();
+                }
+
+                @Override
+                public void close() throws IOException {
+                    reader.close();
+                    if (closeWithException) {
+                        throw new IOException("Test exception during 
closing.");
+                    }
+                }
+            };
+        }
+    }
+}

Reply via email to