This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 449525110 [flink] fix the source data fetcher thread leak (#4098)
449525110 is described below
commit 4495251102580ff8771d4b590ec5a4ae19b3d940
Author: WenjunMin <[email protected]>
AuthorDate: Fri Aug 30 15:30:18 2024 +0800
[flink] fix the source data fetcher thread leak (#4098)
---
.../main/java/org/apache/paimon/utils/Pool.java | 6 +++
.../flink/source/FileStoreSourceSplitReader.java | 54 ++++++++++++++++------
.../source/FileStoreSourceSplitReaderTest.java | 29 ++++++++++++
3 files changed, 75 insertions(+), 14 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Pool.java
b/paimon-common/src/main/java/org/apache/paimon/utils/Pool.java
index 888bf8874..1e5c7a3cf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Pool.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Pool.java
@@ -20,7 +20,9 @@ package org.apache.paimon.utils;
import javax.annotation.Nullable;
+import java.time.Duration;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
/** A pool to cache and recycle heavyweight objects, to reduce object
allocation. */
public class Pool<T> {
@@ -68,6 +70,10 @@ public class Pool<T> {
return pool.take();
}
+ public T pollEntry(Duration timeout) throws InterruptedException {
+ return pool.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
+ }
+
/** Tries to get the next cached entry. If the pool is empty, this method
returns null. */
@Nullable
public T tryPollEntry() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
index 0a4be3a8a..94e3c67b6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
@@ -26,6 +26,7 @@ import org.apache.paimon.reader.RecordReader.RecordIterator;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.utils.Pool;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -33,24 +34,29 @@ import
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
-import org.apache.flink.connector.file.src.util.Pool;
import org.apache.flink.connector.file.src.util.RecordAndPosition;
import org.apache.flink.table.data.RowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
/** The {@link SplitReader} implementation for the file store source. */
public class FileStoreSourceSplitReader
implements SplitReader<BulkFormat.RecordIterator<RowData>,
FileStoreSourceSplit> {
+ private static final Logger LOG =
LoggerFactory.getLogger(FileStoreSourceSplitReader.class);
+
private final TableRead tableRead;
@Nullable private final RecordLimiter limiter;
@@ -65,6 +71,7 @@ public class FileStoreSourceSplitReader
private RecordIterator<InternalRow> currentFirstBatch;
private boolean paused;
+ private final AtomicBoolean wakeup;
private final FileStoreSourceReaderMetrics metrics;
public FileStoreSourceSplitReader(
@@ -78,19 +85,24 @@ public class FileStoreSourceSplitReader
this.pool.add(new FileStoreRecordIterator());
this.paused = false;
this.metrics = metrics;
+ this.wakeup = new AtomicBoolean(false);
}
@Override
public RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>> fetch()
throws IOException {
if (paused) {
- return new RecordsWithPausedSplit<>();
+ return new EmptyRecordsWithSplitIds<>();
}
checkSplitOrStartNext();
- // pool first, pool size is 1, the underlying implementation does not
allow multiple batches
- // to be read at the same time
- FileStoreRecordIterator iterator = pool();
+ // poll from the pool first, pool size is 1, the underlying
implementation does not allow
+ // multiple batches to be read at the same time
+ FileStoreRecordIterator iterator = poll();
+ if (iterator == null) {
+ LOG.info("Skip waiting for object pool due to wakeup");
+ return new EmptyRecordsWithSplitIds<>();
+ }
RecordIterator<InternalRow> nextBatch;
if (currentFirstBatch != null) {
@@ -113,13 +125,21 @@ public class FileStoreSourceSplitReader
return limiter != null && limiter.reachLimit();
}
- private FileStoreRecordIterator pool() throws IOException {
- try {
- return this.pool.pollEntry();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted");
+ @Nullable
+ private FileStoreRecordIterator poll() throws IOException {
+ FileStoreRecordIterator iterator = null;
+ while (iterator == null && !wakeup.get()) {
+ try {
+ iterator = this.pool.pollEntry(Duration.ofSeconds(10));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted");
+ }
}
+ if (wakeup.get()) {
+ wakeup.compareAndSet(true, false);
+ }
+ return iterator;
}
@Override
@@ -156,7 +176,10 @@ public class FileStoreSourceSplitReader
}
@Override
- public void wakeUp() {}
+ public void wakeUp() {
+ wakeup.compareAndSet(false, true);
+ LOG.info("Wake up the split reader.");
+ }
@Override
public void close() throws Exception {
@@ -294,8 +317,11 @@ public class FileStoreSourceSplitReader
}
}
- /** Indicates that the {@link FileStoreSourceSplitReader} is paused. */
- private static class RecordsWithPausedSplit<T> implements
RecordsWithSplitIds<T> {
+ /**
+ * An empty implementation of {@link RecordsWithSplitIds}. It is used to
indicate that the
+ * {@link FileStoreSourceSplitReader} is paused or wakeup.
+ */
+ private static class EmptyRecordsWithSplitIds<T> implements
RecordsWithSplitIds<T> {
@Nullable
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
index 67b3c3e60..2afb7fa19 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
@@ -97,6 +97,35 @@ public class FileStoreSourceSplitReaderTest {
innerTestOnce(4);
}
+ @Test
+ public void testSplitReaderWakeupAble() throws Exception {
+ TestChangelogDataReadWrite rw = new
TestChangelogDataReadWrite(tempDir.toString());
+ FileStoreSourceSplitReader reader =
createReader(rw.createReadWithKey(), null);
+
+ List<Tuple2<Long, Long>> input = kvs();
+ List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);
+
+ assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 0));
+ reader.fetch();
+
+ Thread thread =
+ new Thread(
+ () -> {
+ try {
+ // block on object pool
+ reader.fetch();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ thread.start();
+ thread.join(20000);
+ assertThat(thread.isAlive()).isTrue();
+ reader.wakeUp();
+ thread.join(15000);
+ assertThat(thread.isAlive()).isFalse();
+ }
+
private FileStoreSourceSplitReader createReader(TableRead tableRead,
@Nullable Long limit) {
return new FileStoreSourceSplitReader(
tableRead,