This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 697768864 [bug] fix connection leak while lookup join paimon table 
#924 (#1361)
697768864 is described below

commit 69776886499d508a46bf42ba626221db2827ab93
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jun 13 15:48:10 2023 +0800

    [bug] fix connection leak while lookup join paimon table #924 (#1361)
---
 .../flink/lookup/FileStoreLookupFunction.java      |  19 ++-
 .../paimon/flink/lookup/TableStreamingReader.java  |  15 +--
 .../flink/lookup/FileStoreLookupFunctionTest.java  | 133 +++++++++++++++++++++
 3 files changed, 152 insertions(+), 15 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 9fde0f139..c9e4e47c9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -26,6 +26,7 @@ import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateFilter;
+import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileIOUtils;
@@ -51,7 +52,6 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
@@ -117,6 +117,11 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
 
     public void open(FunctionContext context) throws Exception {
         String tmpDirectory = getTmpDirectory(context);
+        open(tmpDirectory);
+    }
+
+    // we tag this method friendly for testing
+    void open(String tmpDirectory) throws Exception {
         this.path = new File(tmpDirectory, "lookup-" + UUID.randomUUID());
 
         Options options = Options.fromMap(table.options());
@@ -189,12 +194,14 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
     }
 
     private void refresh() throws Exception {
-        while (true) {
-            Iterator<InternalRow> batch = streamingReader.nextBatch();
-            if (!batch.hasNext()) {
-                return;
+        try (RecordReaderIterator<InternalRow> batch =
+                new RecordReaderIterator<>(streamingReader.getRecordReader())) 
{
+            while (true) {
+                if (!batch.hasNext()) {
+                    return;
+                }
+                this.lookupTable.refresh(batch);
             }
-            this.lookupTable.refresh(batch);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
index 580efe9b9..b66a89c96 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
@@ -23,7 +23,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateFilter;
-import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.EndOfScanException;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -33,7 +33,6 @@ import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.utils.TypeUtils;
 
-import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
 
 import javax.annotation.Nullable;
@@ -41,7 +40,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.function.IntUnaryOperator;
 import java.util.stream.IntStream;
@@ -94,7 +92,7 @@ public class TableStreamingReader {
         }
     }
 
-    public Iterator<InternalRow> nextBatch() throws Exception {
+    public RecordReader<InternalRow> getRecordReader() throws Exception {
         try {
             return read(scan.plan());
         } catch (EndOfScanException e) {
@@ -103,18 +101,17 @@ public class TableStreamingReader {
         }
     }
 
-    private Iterator<InternalRow> read(TableScan.Plan plan) throws IOException 
{
+    private RecordReader<InternalRow> read(TableScan.Plan plan) throws 
IOException {
         TableRead read = readBuilder.newRead();
 
         List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new 
ArrayList<>();
         for (Split split : plan.splits()) {
             readers.add(() -> read.createReader(split));
         }
-        Iterator<InternalRow> iterator =
-                new RecordReaderIterator<>(ConcatRecordReader.create(readers));
+        RecordReader<InternalRow> reader = ConcatRecordReader.create(readers);
         if (recordFilter != null) {
-            return Iterators.filter(iterator, recordFilter::test);
+            reader = reader.filter(recordFilter::test);
         }
-        return iterator;
+        return reader;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
new file mode 100644
index 000000000..259a8da95
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+/** Tests for {@link FileStoreLookupFunction}. */
+public class FileStoreLookupFunctionTest {
+
+    private static final Random RANDOM = new Random();
+
+    private final String commitUser = UUID.randomUUID().toString();
+    private final TraceableFileIO fileIO = new TraceableFileIO();
+    private FileStoreLookupFunction fileStoreLookupFunction;
+    private FileStoreTable fileStoreTable;
+    @TempDir private Path tempDir;
+
+    @Test
+    public void testLookupScanLeak() throws Exception {
+        commit(writeCommit(1));
+        fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 
10L)));
+        Assertions.assertEquals(
+                TraceableFileIO.openInputStreams(s -> 
s.toString().contains(tempDir.toString()))
+                        .size(),
+                0);
+
+        commit(writeCommit(10));
+        fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1, 
10L)));
+        Assertions.assertEquals(
+                TraceableFileIO.openInputStreams(s -> 
s.toString().contains(tempDir.toString()))
+                        .size(),
+                0);
+    }
+
+    @BeforeEach
+    public void before() throws Exception {
+        createFileStoreTable();
+        fileStoreLookupFunction =
+                new FileStoreLookupFunction(fileStoreTable, new int[] {0, 1}, 
new int[] {1}, null);
+        fileStoreLookupFunction.open(tempDir.toString());
+    }
+
+    private static final RowType ROW_TYPE =
+            RowType.of(
+                    new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()},
+                    new String[] {"pt", "k", "v"});
+
+    private static Schema schema() {
+        Options conf = new Options();
+        conf.set(CoreOptions.BUCKET, 2);
+        conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+        conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+
+        return new Schema(
+                ROW_TYPE.getFields(),
+                Collections.singletonList("pt"),
+                Arrays.asList("pt", "k"),
+                conf.toMap(),
+                "");
+    }
+
+    private void commit(List<CommitMessage> messages) {
+        fileStoreTable.newCommit(commitUser).commit(messages);
+    }
+
+    private List<CommitMessage> writeCommit(int number) throws Exception {
+        List<CommitMessage> messages = new ArrayList<>();
+        StreamTableWrite writer = 
fileStoreTable.newStreamWriteBuilder().newWrite();
+        for (int i = 0; i < number; i++) {
+            writer.write(randomRow());
+            messages.addAll(writer.prepareCommit(true, i));
+        }
+        return messages;
+    }
+
+    private InternalRow randomRow() {
+        return GenericRow.of(RANDOM.nextInt(100), RANDOM.nextInt(100), 
RANDOM.nextLong());
+    }
+
+    public void createFileStoreTable() throws Exception {
+        org.apache.paimon.fs.Path path = new 
org.apache.paimon.fs.Path(tempDir.toString());
+        SchemaManager schemaManager = new SchemaManager(fileIO, path);
+        TableSchema tableSchema = schemaManager.createTable(schema());
+        fileStoreTable =
+                FileStoreTableFactory.create(
+                        fileIO, new 
org.apache.paimon.fs.Path(tempDir.toString()), tableSchema);
+    }
+}

Reply via email to