This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 697768864 [bug] fix connection leak while lookup join paimon table
#924 (#1361)
697768864 is described below
commit 69776886499d508a46bf42ba626221db2827ab93
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jun 13 15:48:10 2023 +0800
[bug] fix connection leak while lookup join paimon table #924 (#1361)
---
.../flink/lookup/FileStoreLookupFunction.java | 19 ++-
.../paimon/flink/lookup/TableStreamingReader.java | 15 +--
.../flink/lookup/FileStoreLookupFunctionTest.java | 133 +++++++++++++++++++++
3 files changed, 152 insertions(+), 15 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index 9fde0f139..c9e4e47c9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -26,6 +26,7 @@ import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateFilter;
+import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
@@ -51,7 +52,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
@@ -117,6 +117,11 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
public void open(FunctionContext context) throws Exception {
String tmpDirectory = getTmpDirectory(context);
+ open(tmpDirectory);
+ }
+
+ // we tag this method friendly for testing
+ void open(String tmpDirectory) throws Exception {
this.path = new File(tmpDirectory, "lookup-" + UUID.randomUUID());
Options options = Options.fromMap(table.options());
@@ -189,12 +194,14 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
}
private void refresh() throws Exception {
- while (true) {
- Iterator<InternalRow> batch = streamingReader.nextBatch();
- if (!batch.hasNext()) {
- return;
+ try (RecordReaderIterator<InternalRow> batch =
+ new RecordReaderIterator<>(streamingReader.getRecordReader()))
{
+ while (true) {
+ if (!batch.hasNext()) {
+ return;
+ }
+ this.lookupTable.refresh(batch);
}
- this.lookupTable.refresh(batch);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
index 580efe9b9..b66a89c96 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/TableStreamingReader.java
@@ -23,7 +23,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateFilter;
-import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.ReadBuilder;
@@ -33,7 +33,6 @@ import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.TypeUtils;
-import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
import javax.annotation.Nullable;
@@ -41,7 +40,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.function.IntUnaryOperator;
import java.util.stream.IntStream;
@@ -94,7 +92,7 @@ public class TableStreamingReader {
}
}
- public Iterator<InternalRow> nextBatch() throws Exception {
+ public RecordReader<InternalRow> getRecordReader() throws Exception {
try {
return read(scan.plan());
} catch (EndOfScanException e) {
@@ -103,18 +101,17 @@ public class TableStreamingReader {
}
}
- private Iterator<InternalRow> read(TableScan.Plan plan) throws IOException
{
+ private RecordReader<InternalRow> read(TableScan.Plan plan) throws
IOException {
TableRead read = readBuilder.newRead();
List<ConcatRecordReader.ReaderSupplier<InternalRow>> readers = new
ArrayList<>();
for (Split split : plan.splits()) {
readers.add(() -> read.createReader(split));
}
- Iterator<InternalRow> iterator =
- new RecordReaderIterator<>(ConcatRecordReader.create(readers));
+ RecordReader<InternalRow> reader = ConcatRecordReader.create(readers);
if (recordFilter != null) {
- return Iterators.filter(iterator, recordFilter::test);
+ reader = reader.filter(recordFilter::test);
}
- return iterator;
+ return reader;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
new file mode 100644
index 000000000..259a8da95
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+/** Tests for {@link FileStoreLookupFunction}. */
+public class FileStoreLookupFunctionTest {
+
+ private static final Random RANDOM = new Random();
+
+ private final String commitUser = UUID.randomUUID().toString();
+ private final TraceableFileIO fileIO = new TraceableFileIO();
+ private FileStoreLookupFunction fileStoreLookupFunction;
+ private FileStoreTable fileStoreTable;
+ @TempDir private Path tempDir;
+
+ @Test
+ public void testLookupScanLeak() throws Exception {
+ commit(writeCommit(1));
+ fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1,
10L)));
+ Assertions.assertEquals(
+ TraceableFileIO.openInputStreams(s ->
s.toString().contains(tempDir.toString()))
+ .size(),
+ 0);
+
+ commit(writeCommit(10));
+ fileStoreLookupFunction.lookup(new FlinkRowData(GenericRow.of(1, 1,
10L)));
+ Assertions.assertEquals(
+ TraceableFileIO.openInputStreams(s ->
s.toString().contains(tempDir.toString()))
+ .size(),
+ 0);
+ }
+
+ @BeforeEach
+ public void before() throws Exception {
+ createFileStoreTable();
+ fileStoreLookupFunction =
+ new FileStoreLookupFunction(fileStoreTable, new int[] {0, 1},
new int[] {1}, null);
+ fileStoreLookupFunction.open(tempDir.toString());
+ }
+
+ private static final RowType ROW_TYPE =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()},
+ new String[] {"pt", "k", "v"});
+
+ private static Schema schema() {
+ Options conf = new Options();
+ conf.set(CoreOptions.BUCKET, 2);
+ conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(4096 * 3));
+ conf.set(CoreOptions.PAGE_SIZE, new MemorySize(4096));
+
+ return new Schema(
+ ROW_TYPE.getFields(),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "k"),
+ conf.toMap(),
+ "");
+ }
+
+ private void commit(List<CommitMessage> messages) {
+ fileStoreTable.newCommit(commitUser).commit(messages);
+ }
+
+ private List<CommitMessage> writeCommit(int number) throws Exception {
+ List<CommitMessage> messages = new ArrayList<>();
+ StreamTableWrite writer =
fileStoreTable.newStreamWriteBuilder().newWrite();
+ for (int i = 0; i < number; i++) {
+ writer.write(randomRow());
+ messages.addAll(writer.prepareCommit(true, i));
+ }
+ return messages;
+ }
+
+ private InternalRow randomRow() {
+ return GenericRow.of(RANDOM.nextInt(100), RANDOM.nextInt(100),
RANDOM.nextLong());
+ }
+
+ public void createFileStoreTable() throws Exception {
+ org.apache.paimon.fs.Path path = new
org.apache.paimon.fs.Path(tempDir.toString());
+ SchemaManager schemaManager = new SchemaManager(fileIO, path);
+ TableSchema tableSchema = schemaManager.createTable(schema());
+ fileStoreTable =
+ FileStoreTableFactory.create(
+ fileIO, new
org.apache.paimon.fs.Path(tempDir.toString()), tableSchema);
+ }
+}