This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2d41b024c7 [Improve][Fake] Improve memory usage when split size is
large (#7821)
2d41b024c7 is described below
commit 2d41b024c79f8511bec6fef9b55f7a65e906fc31
Author: hailin0 <[email protected]>
AuthorDate: Mon Oct 14 00:31:00 2024 +0800
[Improve][Fake] Improve memory usage when split size is large (#7821)
---
.../seatunnel/fake/source/FakeDataGenerator.java | 25 ++++++++++++++++------
.../seatunnel/fake/source/FakeSourceReader.java | 7 +++---
2 files changed, 22 insertions(+), 10 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
index 017b2d2946..be2da99932 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java
@@ -41,6 +41,8 @@ import
org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorExc
import
org.apache.seatunnel.connectors.seatunnel.fake.utils.FakeDataRandomUtils;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+import com.google.common.annotations.VisibleForTesting;
+
import java.io.IOException;
import java.lang.reflect.Array;
import java.math.BigDecimal;
@@ -51,6 +53,7 @@ import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.function.Consumer;
import java.util.function.Function;
import static org.apache.seatunnel.api.table.type.SqlType.TIME;
@@ -105,26 +108,36 @@ public class FakeDataGenerator {
return seaTunnelRow;
}
+ @VisibleForTesting
+ public List<SeaTunnelRow> generateFakedRows(int rowNum) {
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ generateFakedRows(rowNum, rows::add);
+ return rows;
+ }
+
/**
* @param rowNum The number of pieces of data to be generated by the
current task
- * @return The generated data
+ * @param consumer The generated data is sent to consumer
+ * @return The number of generated data row count
*/
- public List<SeaTunnelRow> generateFakedRows(int rowNum) {
+ public long generateFakedRows(int rowNum, Consumer<SeaTunnelRow> consumer)
{
// Use manual configuration data preferentially
- List<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
+ long rowCount = 0;
if (fakeConfig.getFakeRows() != null) {
SeaTunnelDataType<?>[] fieldTypes =
catalogTable.getSeaTunnelRowType().getFieldTypes();
String[] fieldNames =
catalogTable.getSeaTunnelRowType().getFieldNames();
for (FakeConfig.RowData rowData : fakeConfig.getFakeRows()) {
customField(rowData, fieldTypes, fieldNames);
- seaTunnelRows.add(convertRow(rowData));
+ consumer.accept(convertRow(rowData));
+ rowCount++;
}
} else {
for (int i = 0; i < rowNum; i++) {
- seaTunnelRows.add(randomRow());
+ consumer.accept(randomRow());
+ rowCount++;
}
}
- return seaTunnelRows;
+ return rowCount;
}
private void customField(
diff --git
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index 063ece63d2..e3309c6be6 100644
---
a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++
b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -89,12 +89,11 @@ public class FakeSourceReader implements
SourceReader<SeaTunnelRow, FakeSourceSp
if (null != split) {
FakeDataGenerator fakeDataGenerator =
fakeDataGeneratorMap.get(split.getTableId());
// Randomly generated data are sent directly to the downstream
operator
- List<SeaTunnelRow> seaTunnelRows =
- fakeDataGenerator.generateFakedRows(split.getRowNum());
- seaTunnelRows.forEach(output::collect);
+ long rowCount =
+ fakeDataGenerator.generateFakedRows(split.getRowNum(),
output::collect);
log.info(
"{} rows of data have been generated in split({}) for
table {}. Generation time: {}",
- seaTunnelRows.size(),
+ rowCount,
split.splitId(),
split.getTableId(),
latestTimestamp);