This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2d41b024c7 [Improve][Fake] Improve memory usage when split size is 
large (#7821)
2d41b024c7 is described below

commit 2d41b024c79f8511bec6fef9b55f7a65e906fc31
Author: hailin0 <[email protected]>
AuthorDate: Mon Oct 14 00:31:00 2024 +0800

    [Improve][Fake] Improve memory usage when split size is large (#7821)
---
 .../seatunnel/fake/source/FakeDataGenerator.java   | 25 ++++++++++++++++------
 .../seatunnel/fake/source/FakeSourceReader.java    |  7 +++---
 2 files changed, 22 insertions(+), 10 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
index 017b2d2946..be2da99932 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -41,6 +41,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorExc
 import 
org.apache.seatunnel.connectors.seatunnel.fake.utils.FakeDataRandomUtils;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import java.io.IOException;
 import java.lang.reflect.Array;
 import java.math.BigDecimal;
@@ -51,6 +53,7 @@ import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
 import static org.apache.seatunnel.api.table.type.SqlType.TIME;
@@ -105,26 +108,36 @@ public class FakeDataGenerator {
         return seaTunnelRow;
     }
 
+    @VisibleForTesting
+    public List<SeaTunnelRow> generateFakedRows(int rowNum) {
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        generateFakedRows(rowNum, rows::add);
+        return rows;
+    }
+
     /**
      * @param rowNum The number of pieces of data to be generated by the 
current task
-     * @return The generated data
+     * @param consumer The generated data is sent to consumer
+     * @return The number of generated data row count
      */
-    public List<SeaTunnelRow> generateFakedRows(int rowNum) {
+    public long generateFakedRows(int rowNum, Consumer<SeaTunnelRow> consumer) 
{
         // Use manual configuration data preferentially
-        List<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
+        long rowCount = 0;
         if (fakeConfig.getFakeRows() != null) {
             SeaTunnelDataType<?>[] fieldTypes = 
catalogTable.getSeaTunnelRowType().getFieldTypes();
             String[] fieldNames = 
catalogTable.getSeaTunnelRowType().getFieldNames();
             for (FakeConfig.RowData rowData : fakeConfig.getFakeRows()) {
                 customField(rowData, fieldTypes, fieldNames);
-                seaTunnelRows.add(convertRow(rowData));
+                consumer.accept(convertRow(rowData));
+                rowCount++;
             }
         } else {
             for (int i = 0; i < rowNum; i++) {
-                seaTunnelRows.add(randomRow());
+                consumer.accept(randomRow());
+                rowCount++;
             }
         }
-        return seaTunnelRows;
+        return rowCount;
     }
 
     private void customField(
diff --git 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index 063ece63d2..e3309c6be6 100644
--- 
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -89,12 +89,11 @@ public class FakeSourceReader implements 
SourceReader<SeaTunnelRow, FakeSourceSp
             if (null != split) {
                 FakeDataGenerator fakeDataGenerator = 
fakeDataGeneratorMap.get(split.getTableId());
                 // Randomly generated data are sent directly to the downstream 
operator
-                List<SeaTunnelRow> seaTunnelRows =
-                        fakeDataGenerator.generateFakedRows(split.getRowNum());
-                seaTunnelRows.forEach(output::collect);
+                long rowCount =
+                        fakeDataGenerator.generateFakedRows(split.getRowNum(), 
output::collect);
                 log.info(
                         "{} rows of data have been generated in split({}) for 
table {}. Generation time: {}",
-                        seaTunnelRows.size(),
+                        rowCount,
                         split.splitId(),
                         split.getTableId(),
                         latestTimestamp);

Reply via email to