This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5f9a2733d5 [core] Fix one record cannot fit into a single page in
ParallelExecution (#5353)
5f9a2733d5 is described below
commit 5f9a2733d5227af913d157c0f0268b4b53f6e5e7
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 26 20:33:27 2025 +0800
[core] Fix one record cannot fit into a single page in ParallelExecution
(#5353)
---
.../java/org/apache/paimon/utils/ParallelExecution.java | 10 ++++++++++
.../org/apache/paimon/utils/ParallelExecutionTest.java | 14 ++++++++++++++
2 files changed, 24 insertions(+)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java
b/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java
index 9939b70c25..f3aabb8bac 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.SimpleCollectingOutputView;
import org.apache.paimon.data.serializer.Serializer;
import org.apache.paimon.memory.ArraySegmentPool;
import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReader;
import javax.annotation.Nullable;
@@ -52,6 +53,7 @@ import java.util.function.Supplier;
public class ParallelExecution<T, E> implements Closeable {
private final Serializer<T> serializer;
+ private final int pageSize;
private final BlockingQueue<MemorySegment> idlePages;
private final BlockingQueue<ParallelBatch<T, E>> results;
private final ExecutorService executorService;
@@ -66,6 +68,7 @@ public class ParallelExecution<T, E> implements Closeable {
int parallelism,
List<Supplier<Pair<RecordReader<T>, E>>> readers) {
this.serializer = serializer;
+ this.pageSize = pageSize;
int totalPages = parallelism * 2;
this.idlePages = new ArrayBlockingQueue<>(totalPages);
for (int i = 0; i < totalPages; i++) {
@@ -127,6 +130,13 @@ public class ParallelExecution<T, E> implements Closeable {
count++;
break;
} catch (EOFException e) {
+ if (count == 0) {
+ throw new RuntimeException(
+ String.format(
+ "Current page size %s is too
small, one record cannot fit into a single page. "
+ + "Please increase the
'page-size' table option.",
+ new
MemorySize(pageSize).toHumanReadableString()));
+ }
sendToResults(outputView, count, pair.getRight());
outputView = null;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java
index 4ec4755d62..4e3feab578 100644
---
a/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java
@@ -34,6 +34,7 @@ import java.util.List;
import java.util.Queue;
import java.util.function.Supplier;
+import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -128,6 +129,19 @@ public class ParallelExecutionTest {
assertThatThrownBy(() ->
collect(execution)).hasMessageContaining(message);
}
+ @Test
+ public void testTooBigRecord() {
+ Supplier<Pair<RecordReader<Integer>, Integer>> supplier =
+ () -> Pair.of(create(new
LinkedList<>(singletonList(singletonList(1)))), 1);
+
+ ParallelExecution<Integer, Integer> execution =
+ new ParallelExecution<>(new IntSerializer(), 2, 2,
singletonList(supplier));
+ assertThatThrownBy(() -> collect(execution))
+ .hasMessageContaining(
+ "Current page size 2 bytes is too small, one record
cannot fit into a single page."
+ + " Please increase the 'page-size' table
option.");
+ }
+
private RecordReader<Integer> create(Queue<List<Integer>> queue) {
return new RecordReader<Integer>() {
@Nullable