This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 449525110 [flink] fix the source data fetcher thread leak (#4098)
449525110 is described below

commit 4495251102580ff8771d4b590ec5a4ae19b3d940
Author: WenjunMin <[email protected]>
AuthorDate: Fri Aug 30 15:30:18 2024 +0800

    [flink] fix the source data fetcher thread leak (#4098)
---
 .../main/java/org/apache/paimon/utils/Pool.java    |  6 +++
 .../flink/source/FileStoreSourceSplitReader.java   | 54 ++++++++++++++++------
 .../source/FileStoreSourceSplitReaderTest.java     | 29 ++++++++++++
 3 files changed, 75 insertions(+), 14 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Pool.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/Pool.java
index 888bf8874..1e5c7a3cf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Pool.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Pool.java
@@ -20,7 +20,9 @@ package org.apache.paimon.utils;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /** A pool to cache and recycle heavyweight objects, to reduce object 
allocation. */
 public class Pool<T> {
@@ -68,6 +70,10 @@ public class Pool<T> {
         return pool.take();
     }
 
+    public T pollEntry(Duration timeout) throws InterruptedException {
+        return pool.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
+    }
+
     /** Tries to get the next cached entry. If the pool is empty, this method 
returns null. */
     @Nullable
     public T tryPollEntry() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
index 0a4be3a8a..94e3c67b6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
@@ -26,6 +26,7 @@ import org.apache.paimon.reader.RecordReader.RecordIterator;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.utils.Pool;
 
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -33,24 +34,29 @@ import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.src.util.MutableRecordAndPosition;
-import org.apache.flink.connector.file.src.util.Pool;
 import org.apache.flink.connector.file.src.util.RecordAndPosition;
 import org.apache.flink.table.data.RowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /** The {@link SplitReader} implementation for the file store source. */
 public class FileStoreSourceSplitReader
         implements SplitReader<BulkFormat.RecordIterator<RowData>, 
FileStoreSourceSplit> {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreSourceSplitReader.class);
+
     private final TableRead tableRead;
 
     @Nullable private final RecordLimiter limiter;
@@ -65,6 +71,7 @@ public class FileStoreSourceSplitReader
     private RecordIterator<InternalRow> currentFirstBatch;
 
     private boolean paused;
+    private final AtomicBoolean wakeup;
     private final FileStoreSourceReaderMetrics metrics;
 
     public FileStoreSourceSplitReader(
@@ -78,19 +85,24 @@ public class FileStoreSourceSplitReader
         this.pool.add(new FileStoreRecordIterator());
         this.paused = false;
         this.metrics = metrics;
+        this.wakeup = new AtomicBoolean(false);
     }
 
     @Override
     public RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>> fetch() 
throws IOException {
         if (paused) {
-            return new RecordsWithPausedSplit<>();
+            return new EmptyRecordsWithSplitIds<>();
         }
 
         checkSplitOrStartNext();
 
-        // pool first, pool size is 1, the underlying implementation does not 
allow multiple batches
-        // to be read at the same time
-        FileStoreRecordIterator iterator = pool();
+        // poll from the pool first, pool size is 1, the underlying 
implementation does not allow
+        // multiple batches to be read at the same time
+        FileStoreRecordIterator iterator = poll();
+        if (iterator == null) {
+            LOG.info("Skip waiting for object pool due to wakeup");
+            return new EmptyRecordsWithSplitIds<>();
+        }
 
         RecordIterator<InternalRow> nextBatch;
         if (currentFirstBatch != null) {
@@ -113,13 +125,21 @@ public class FileStoreSourceSplitReader
         return limiter != null && limiter.reachLimit();
     }
 
-    private FileStoreRecordIterator pool() throws IOException {
-        try {
-            return this.pool.pollEntry();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IOException("Interrupted");
+    @Nullable
+    private FileStoreRecordIterator poll() throws IOException {
+        FileStoreRecordIterator iterator = null;
+        while (iterator == null && !wakeup.get()) {
+            try {
+                iterator = this.pool.pollEntry(Duration.ofSeconds(10));
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException("Interrupted");
+            }
         }
+        if (wakeup.get()) {
+            wakeup.compareAndSet(true, false);
+        }
+        return iterator;
     }
 
     @Override
@@ -156,7 +176,10 @@ public class FileStoreSourceSplitReader
     }
 
     @Override
-    public void wakeUp() {}
+    public void wakeUp() {
+        wakeup.compareAndSet(false, true);
+        LOG.info("Wake up the split reader.");
+    }
 
     @Override
     public void close() throws Exception {
@@ -294,8 +317,11 @@ public class FileStoreSourceSplitReader
         }
     }
 
-    /** Indicates that the {@link FileStoreSourceSplitReader} is paused. */
-    private static class RecordsWithPausedSplit<T> implements 
RecordsWithSplitIds<T> {
+    /**
+     * An empty implementation of {@link RecordsWithSplitIds}. It is used to 
indicate that the
+     * {@link FileStoreSourceSplitReader} is paused or wakeup.
+     */
+    private static class EmptyRecordsWithSplitIds<T> implements 
RecordsWithSplitIds<T> {
 
         @Nullable
         @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
index 67b3c3e60..2afb7fa19 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
@@ -97,6 +97,35 @@ public class FileStoreSourceSplitReaderTest {
         innerTestOnce(4);
     }
 
+    @Test
+    public void testSplitReaderWakeupAble() throws Exception {
+        TestChangelogDataReadWrite rw = new 
TestChangelogDataReadWrite(tempDir.toString());
+        FileStoreSourceSplitReader reader = 
createReader(rw.createReadWithKey(), null);
+
+        List<Tuple2<Long, Long>> input = kvs();
+        List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);
+
+        assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 0));
+        reader.fetch();
+
+        Thread thread =
+                new Thread(
+                        () -> {
+                            try {
+                                // block on object pool
+                                reader.fetch();
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+        thread.start();
+        thread.join(20000);
+        assertThat(thread.isAlive()).isTrue();
+        reader.wakeUp();
+        thread.join(15000);
+        assertThat(thread.isAlive()).isFalse();
+    }
+
     private FileStoreSourceSplitReader createReader(TableRead tableRead, 
@Nullable Long limit) {
         return new FileStoreSourceSplitReader(
                 tableRead,

Reply via email to