This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0ce17121ea [Feature] Support flow control in zeta (#5502)
0ce17121ea is described below

commit 0ce17121ea8ea26be43a55e6e0728b84c80aebda
Author: Jia Fan <[email protected]>
AuthorDate: Sat Sep 16 12:02:16 2023 +0800

    [Feature] Support flow control in zeta (#5502)
---
 docs/en/concept/speed-limit.md                     |  42 +++++
 docs/sidebars.js                                   |   3 +-
 .../seatunnel/api/common/metrics/MetricNames.java  |   5 +-
 .../apache/seatunnel/api/env/EnvCommonOptions.java |  13 ++
 .../apache/seatunnel/api/env/EnvOptionRule.java    |   2 +
 .../seatunnel/api/table/type/SeaTunnelRow.java     | 179 ++++++++++++++++++++-
 .../seatunnel/api/table/type/SeaTunnelRowTest.java |  90 +++++++++++
 .../core/starter/flowcontrol/FlowControlGate.java  |  42 +++++
 .../starter/flowcontrol/FlowControlStrategy.java   |  50 ++++++
 .../starter/flowcontrol/FlowControlGateTest.java   | 112 +++++++++++++
 .../server/dag/physical/PhysicalPlanGenerator.java |   5 +-
 .../server/task/SeaTunnelSourceCollector.java      |  69 +++++++-
 .../engine/server/task/SourceSeaTunnelTask.java    |  48 +++++-
 .../engine/server/task/flow/SinkFlowLifeCycle.java |  14 ++
 14 files changed, 667 insertions(+), 7 deletions(-)

diff --git a/docs/en/concept/speed-limit.md b/docs/en/concept/speed-limit.md
new file mode 100644
index 0000000000..0bb451b38b
--- /dev/null
+++ b/docs/en/concept/speed-limit.md
@@ -0,0 +1,42 @@
+# Speed Control
+
+## Introduction
+
+The SeaTunnel provides a powerful speed control feature that allows you to 
manage the rate at which data is synchronized.
+This functionality is essential when you need to ensure efficient and 
controlled data transfer between systems.
+The speed control is primarily governed by two key parameters: 
`read_limit.rows_per_second` and `read_limit.bytes_per_second`.
+This document will guide you through the usage of these parameters and how to 
leverage them effectively.
+
+## Support Those Engines
+
+> SeaTunnel Zeta<br/>
+
+## Configuration
+
+To use the speed control feature, you need to configure the 
`read_limit.rows_per_second` or `read_limit.bytes_per_second` parameters in 
your job config.
+
+Example env config in your config file:
+
+```hocon
+env {
+    job.mode=STREAMING
+    job.name=SeaTunnel_Job
+    read_limit.bytes_per_second=7000000
+    read_limit.rows_per_second=400
+}
+source {
+    MySQL-CDC {
+      // ignore...
+    }
+}
+transform {
+}
+sink {
+    Console {
+    }
+}
+```
+
+We have placed `read_limit.bytes_per_second` and `read_limit.rows_per_second` 
in the `env` parameters, completing the speed control configuration.
+You can configure both of these parameters simultaneously or choose to 
configure only one of them. The value of each `value` represents the maximum 
rate at which each thread is restricted.
+Therefore, when configuring the respective values, please take into account 
the parallelism of your tasks.
diff --git a/docs/sidebars.js b/docs/sidebars.js
index a8f2527413..764cb34f57 100644
--- a/docs/sidebars.js
+++ b/docs/sidebars.js
@@ -89,7 +89,8 @@ const sidebars = {
                 "concept/config",
                 "concept/connector-v2-features",
                 'concept/schema-feature',
-                'concept/JobEnvConfig'
+                'concept/JobEnvConfig',
+                'concept/speed-limit'
             ]
         },
         "Connector-v2-release-state",
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
index a3511d92b4..b1fc60e0f1 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/MetricNames.java
@@ -26,8 +26,11 @@ public final class MetricNames {
     public static final String RECEIVED_BATCHES = "receivedBatches";
 
     public static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount";
-
+    public static final String SOURCE_RECEIVED_BYTES = "SourceReceivedBytes";
     public static final String SOURCE_RECEIVED_QPS = "SourceReceivedQPS";
+    public static final String SOURCE_RECEIVED_BYTES_PER_SECONDS = 
"SourceReceivedBytesPerSeconds";
     public static final String SINK_WRITE_COUNT = "SinkWriteCount";
+    public static final String SINK_WRITE_BYTES = "SinkWriteBytes";
     public static final String SINK_WRITE_QPS = "SinkWriteQPS";
+    public static final String SINK_WRITE_BYTES_PER_SECONDS = 
"SinkWriteBytesPerSeconds";
 }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index d076cd5367..0c010bfb84 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -51,6 +51,19 @@ public interface EnvCommonOptions {
                     .withDescription(
                             "The interval (in milliseconds) between two 
consecutive checkpoints.");
 
+    Option<Integer> READ_LIMIT_ROW_PER_SECOND =
+            Options.key("read_limit.rows_per_second")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The each parallelism row limit per second for 
read data from source.");
+
+    Option<Integer> READ_LIMIT_BYTES_PER_SECOND =
+            Options.key("read_limit.bytes_per_second")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The each parallelism bytes limit per second for 
read data from source.");
     Option<Long> CHECKPOINT_TIMEOUT =
             Options.key("checkpoint.timeout")
                     .longType()
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
index 09310f080c..d4caa710d8 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
@@ -31,6 +31,8 @@ public class EnvOptionRule {
                         EnvCommonOptions.JARS,
                         EnvCommonOptions.CHECKPOINT_INTERVAL,
                         EnvCommonOptions.CHECKPOINT_TIMEOUT,
+                        EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND,
+                        EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND,
                         EnvCommonOptions.CUSTOM_PARAMETERS)
                 .build();
     }
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index 1966d3142c..bd05e0808d 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -21,6 +21,7 @@ import 
org.apache.seatunnel.api.table.factory.SupportMultipleTable;
 
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Map;
 import java.util.Objects;
 
 /** SeaTunnel row type. */
@@ -33,6 +34,8 @@ public final class SeaTunnelRow implements Serializable {
     /** The array to store the actual internal format values. */
     private final Object[] fields;
 
+    private volatile int size;
+
     public SeaTunnelRow(int arity) {
         this.fields = new Object[arity];
     }
@@ -97,6 +100,180 @@ public final class SeaTunnelRow implements Serializable {
         return this.fields[pos] == null;
     }
 
+    public int getBytesSize(SeaTunnelRowType rowType) {
+        if (size == 0) {
+            int s = 0;
+            for (int i = 0; i < fields.length; i++) {
+                s += getBytesForValue(fields[i], rowType.getFieldType(i));
+            }
+            size = s;
+        }
+        return size;
+    }
+
+    /** faster version of {@link #getBytesSize(SeaTunnelRowType)}. */
+    private int getBytesForValue(Object v, SeaTunnelDataType<?> dataType) {
+        if (v == null) {
+            return 0;
+        }
+        SqlType sqlType = dataType.getSqlType();
+        switch (sqlType) {
+            case STRING:
+                return ((String) v).length();
+            case BOOLEAN:
+            case TINYINT:
+                return 1;
+            case SMALLINT:
+                return 2;
+            case INT:
+            case FLOAT:
+                return 4;
+            case BIGINT:
+            case DOUBLE:
+                return 8;
+            case DECIMAL:
+                return 36;
+            case NULL:
+                return 0;
+            case BYTES:
+                return ((byte[]) v).length;
+            case DATE:
+                return 24;
+            case TIME:
+                return 12;
+            case TIMESTAMP:
+                return 48;
+            case ARRAY:
+                return getBytesForArray(v, ((ArrayType) 
dataType).getElementType());
+            case MAP:
+                int size = 0;
+                MapType<?, ?> mapType = ((MapType<?, ?>) dataType);
+                for (Map.Entry<?, ?> entry : ((Map<?, ?>) v).entrySet()) {
+                    size +=
+                            getBytesForValue(entry.getKey(), 
mapType.getKeyType())
+                                    + getBytesForValue(entry.getValue(), 
mapType.getValueType());
+                }
+                return size;
+            case ROW:
+                int rowSize = 0;
+                SeaTunnelRowType rowType = ((SeaTunnelRowType) dataType);
+                SeaTunnelDataType<?>[] types = rowType.getFieldTypes();
+                SeaTunnelRow row = (SeaTunnelRow) v;
+                for (int i = 0; i < types.length; i++) {
+                    rowSize += getBytesForValue(row.fields[i], types[i]);
+                }
+                return rowSize;
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + 
sqlType);
+        }
+    }
+
+    private int getBytesForArray(Object v, BasicType<?> dataType) {
+        switch (dataType.getSqlType()) {
+            case STRING:
+                int s = 0;
+                for (String i : ((String[]) v)) {
+                    s += i.length();
+                }
+                return s;
+            case BOOLEAN:
+                return ((Boolean[]) v).length;
+            case TINYINT:
+                return ((Byte[]) v).length;
+            case SMALLINT:
+                return ((Short[]) v).length * 2;
+            case INT:
+                return ((Integer[]) v).length * 4;
+            case FLOAT:
+                return ((Float[]) v).length * 4;
+            case BIGINT:
+                return ((Long[]) v).length * 8;
+            case DOUBLE:
+                return ((Double[]) v).length * 8;
+            case NULL:
+            default:
+                return 0;
+        }
+    }
+
+    public int getBytesSize() {
+        if (size == 0) {
+            int s = 0;
+            for (Object field : fields) {
+                s += getBytesForValue(field);
+            }
+            size = s;
+        }
+        return size;
+    }
+
+    private int getBytesForValue(Object v) {
+        if (v == null) {
+            return 0;
+        }
+        String clazz = v.getClass().getSimpleName();
+        switch (clazz) {
+            case "String":
+                return ((String) v).length();
+            case "Boolean":
+            case "Byte":
+                return 1;
+            case "Short":
+                return 2;
+            case "Integer":
+            case "Float":
+                return 4;
+            case "Long":
+            case "Double":
+                return 8;
+            case "BigDecimal":
+                return 36;
+            case "byte[]":
+                return ((byte[]) v).length;
+            case "LocalDate":
+                return 24;
+            case "LocalTime":
+                return 12;
+            case "LocalDateTime":
+                return 48;
+            case "String[]":
+                int s = 0;
+                for (String i : ((String[]) v)) {
+                    s += i.length();
+                }
+                return s;
+            case "Boolean[]":
+                return ((Boolean[]) v).length;
+            case "Byte[]":
+                return ((Byte[]) v).length;
+            case "Short[]":
+                return ((Short[]) v).length * 2;
+            case "Integer[]":
+                return ((Integer[]) v).length * 4;
+            case "Long[]":
+                return ((Long[]) v).length * 8;
+            case "Float[]":
+                return ((Float[]) v).length * 4;
+            case "Double[]":
+                return ((Double[]) v).length * 8;
+            case "HashMap":
+                int size = 0;
+                for (Map.Entry<?, ?> entry : ((Map<?, ?>) v).entrySet()) {
+                    size += getBytesForValue(entry.getKey()) + 
getBytesForValue(entry.getValue());
+                }
+                return size;
+            case "SeaTunnelRow":
+                int rowSize = 0;
+                SeaTunnelRow row = (SeaTunnelRow) v;
+                for (int i = 0; i < row.fields.length; i++) {
+                    rowSize += getBytesForValue(row.fields[i]);
+                }
+                return rowSize;
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + 
clazz);
+        }
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -106,7 +283,7 @@ public final class SeaTunnelRow implements Serializable {
             return false;
         }
         SeaTunnelRow that = (SeaTunnelRow) o;
-        return tableId == that.tableId
+        return Objects.equals(tableId, that.tableId)
                 && kind == that.kind
                 && Arrays.deepEquals(fields, that.fields);
     }
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTest.java
new file mode 100644
index 0000000000..eaad7f9576
--- /dev/null
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/type/SeaTunnelRowTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.type;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SeaTunnelRowTest {
+
+    @Test
+    void testForRowSize() {
+        Map<String, Object> map = new HashMap<>();
+        map.put(
+                "key1",
+                new SeaTunnelRow(
+                        new Object[] {
+                            1, "test", 1L, new BigDecimal("3333.333"),
+                        }));
+        map.put(
+                "key2",
+                new SeaTunnelRow(
+                        new Object[] {
+                            1, "test", 1L, new BigDecimal("3333.333"),
+                        }));
+        SeaTunnelRow row =
+                new SeaTunnelRow(
+                        new Object[] {
+                            1,
+                            "test",
+                            1L,
+                            map,
+                            new BigDecimal("3333.333"),
+                            new String[] {"test2", "test", "3333.333"}
+                        });
+
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"f0", "f1", "f2", "f3", "f4", "f5"},
+                        new SeaTunnelDataType<?>[] {
+                            BasicType.INT_TYPE,
+                            BasicType.STRING_TYPE,
+                            BasicType.LONG_TYPE,
+                            new MapType<>(
+                                    BasicType.STRING_TYPE,
+                                    new SeaTunnelRowType(
+                                            new String[] {"f0", "f1", "f2", 
"f3"},
+                                            new SeaTunnelDataType<?>[] {
+                                                BasicType.INT_TYPE,
+                                                BasicType.STRING_TYPE,
+                                                BasicType.LONG_TYPE,
+                                                new DecimalType(10, 3)
+                                            })),
+                            new DecimalType(10, 3),
+                            ArrayType.STRING_ARRAY_TYPE
+                        });
+
+        Assertions.assertEquals(181, row.getBytesSize(rowType));
+
+        SeaTunnelRow row2 =
+                new SeaTunnelRow(
+                        new Object[] {
+                            1,
+                            "test",
+                            1L,
+                            map,
+                            new BigDecimal("3333.333"),
+                            new String[] {"test2", "test", "3333.333"}
+                        });
+        Assertions.assertEquals(181, row2.getBytesSize());
+    }
+}
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java
new file mode 100644
index 0000000000..10b2301329
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.flowcontrol;
+
+import 
org.apache.seatunnel.shade.com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+public class FlowControlGate {
+
+    private final RateLimiter bytesRateLimiter;
+    private final RateLimiter countRateLimiter;
+
+    private FlowControlGate(FlowControlStrategy flowControlStrategy) {
+        this.bytesRateLimiter = 
RateLimiter.create(flowControlStrategy.getBytesPerSecond());
+        this.countRateLimiter = 
RateLimiter.create(flowControlStrategy.getCountPreSecond());
+    }
+
+    public void audit(SeaTunnelRow row) {
+        bytesRateLimiter.acquire(row.getBytesSize());
+        countRateLimiter.acquire();
+    }
+
+    public static FlowControlGate create(FlowControlStrategy 
flowControlStrategy) {
+        return new FlowControlGate(flowControlStrategy);
+    }
+}
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java
new file mode 100644
index 0000000000..2547d7061e
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.flowcontrol;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class FlowControlStrategy {
+
+    int bytesPerSecond;
+    int countPreSecond;
+
+    public FlowControlStrategy(int bytesPerSecond, int countPreSecond) {
+        if (bytesPerSecond <= 0 || countPreSecond <= 0) {
+            throw new IllegalArgumentException(
+                    "bytesPerSecond and countPreSecond must be positive");
+        }
+        this.bytesPerSecond = bytesPerSecond;
+        this.countPreSecond = countPreSecond;
+    }
+
+    public static FlowControlStrategy of(int bytesPerSecond, int 
countPreSecond) {
+        return new FlowControlStrategy(bytesPerSecond, countPreSecond);
+    }
+
+    public static FlowControlStrategy ofBytes(int bytesPerSecond) {
+        return new FlowControlStrategy(bytesPerSecond, Integer.MAX_VALUE);
+    }
+
+    public static FlowControlStrategy ofCount(int countPreSecond) {
+        return new FlowControlStrategy(Integer.MAX_VALUE, countPreSecond);
+    }
+}
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGateTest.java
 
b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGateTest.java
new file mode 100644
index 0000000000..1bcb695bf5
--- /dev/null
+++ 
b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGateTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.flowcontrol;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class FlowControlGateTest {
+
+    private static final int rowSize = 181;
+
+    @Test
+    public void testWithBytes() {
+        Clock clock = Clock.systemDefaultZone();
+        FlowControlGate flowControlGate = 
FlowControlGate.create(FlowControlStrategy.ofBytes(100));
+        List<SeaTunnelRow> rows = getRows(10);
+        long start = clock.millis();
+        for (SeaTunnelRow row : rows) {
+            flowControlGate.audit(row);
+        }
+        long end = clock.millis();
+        long useTime = rowSize * 10 / 100 * 1000;
+
+        Assertions.assertTrue(end - start > useTime * 0.8 && end - start < 
useTime * 1.2);
+    }
+
+    @Test
+    public void testWithCount() {
+        Clock clock = Clock.systemDefaultZone();
+        FlowControlGate flowControlGate = 
FlowControlGate.create(FlowControlStrategy.ofCount(2));
+        List<SeaTunnelRow> rows = getRows(10);
+        long start = clock.millis();
+        for (SeaTunnelRow row : rows) {
+            flowControlGate.audit(row);
+        }
+        long end = clock.millis();
+        long useTime = 10 / 2 * 1000;
+
+        Assertions.assertTrue(end - start > useTime * 0.8 && end - start < 
useTime * 1.2);
+    }
+
+    @Test
+    public void testWithBytesAndCount() {
+        Clock clock = Clock.systemDefaultZone();
+        FlowControlGate flowControlGate = 
FlowControlGate.create(FlowControlStrategy.of(100, 2));
+        List<SeaTunnelRow> rows = getRows(10);
+        long start = clock.millis();
+        for (SeaTunnelRow row : rows) {
+            flowControlGate.audit(row);
+        }
+        long end = clock.millis();
+        long useTime = rowSize * 10 / 100 * 1000;
+
+        Assertions.assertTrue(end - start > useTime * 0.8 && end - start < 
useTime * 1.2);
+    }
+
+    /** return row list with size, each row size is 181 */
+    private List<SeaTunnelRow> getRows(int size) {
+        Map<String, Object> map = new HashMap<>();
+        map.put(
+                "key1",
+                new SeaTunnelRow(
+                        new Object[] {
+                            1, "test", 1L, new BigDecimal("3333.333"),
+                        }));
+        map.put(
+                "key2",
+                new SeaTunnelRow(
+                        new Object[] {
+                            1, "test", 1L, new BigDecimal("3333.333"),
+                        }));
+
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            rows.add(
+                    new SeaTunnelRow(
+                            new Object[] {
+                                1,
+                                "test",
+                                1L,
+                                map,
+                                new BigDecimal("3333.333"),
+                                new String[] {"test2", "test", "3333.333"}
+                            }));
+        }
+        return rows;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index c6173cf064..5d0f571d49 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -555,7 +555,10 @@ public class PhysicalPlanGenerator {
                                                                         
(PhysicalExecutionFlow<
                                                                                
         SourceAction,
                                                                                
         SourceConfig>)
-                                                                               
 f);
+                                                                               
 f,
+                                                                        
jobImmutableInformation
+                                                                               
 .getJobConfig()
+                                                                               
 .getEnvOptions());
                                                             } else {
                                                                 return new 
TransformSeaTunnelTask(
                                                                         
jobImmutableInformation
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index 0514d83c86..6e44089e8f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -22,15 +22,28 @@ import org.apache.seatunnel.api.common.metrics.Meter;
 import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
+import 
org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventDispatcher;
+import org.apache.seatunnel.api.table.event.handler.DataTypeChangeEventHandler;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
 import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
+import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
 
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
 
@@ -48,22 +61,65 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
     private final Counter sourceReceivedCount;
 
     private final Meter sourceReceivedQPS;
+    private final Counter sourceReceivedBytes;
+
+    private final Meter sourceReceivedBytesPerSeconds;
 
     private volatile boolean emptyThisPollNext;
+    private final DataTypeChangeEventHandler dataTypeChangeEventHandler =
+            new DataTypeChangeEventDispatcher();
+    private Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
+    private SeaTunnelDataType rowType;
+    private FlowControlGate flowControlGate;
 
     public SeaTunnelSourceCollector(
             Object checkpointLock,
             List<OneInputFlowLifeCycle<Record<?>>> outputs,
-            MetricsContext metricsContext) {
+            MetricsContext metricsContext,
+            FlowControlStrategy flowControlStrategy,
+            SeaTunnelDataType rowType) {
         this.checkpointLock = checkpointLock;
         this.outputs = outputs;
+        this.rowType = rowType;
+        if (rowType instanceof MultipleRowType) {
+            ((MultipleRowType) rowType)
+                    .iterator()
+                    .forEachRemaining(
+                            type -> {
+                                this.rowTypeMap.put(type.getKey(), 
type.getValue());
+                            });
+        }
         sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT);
         sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS);
+        sourceReceivedBytes = metricsContext.counter(SOURCE_RECEIVED_BYTES);
+        sourceReceivedBytesPerSeconds = 
metricsContext.meter(SOURCE_RECEIVED_BYTES_PER_SECONDS);
+        if (flowControlStrategy != null) {
+            flowControlGate = FlowControlGate.create(flowControlStrategy);
+        }
     }
 
     @Override
     public void collect(T row) {
         try {
+            if (row instanceof SeaTunnelRow) {
+                int size;
+                if (rowType instanceof SeaTunnelRowType) {
+                    size = ((SeaTunnelRow) 
row).getBytesSize((SeaTunnelRowType) rowType);
+                } else if (rowType instanceof MultipleRowType) {
+                    size =
+                            ((SeaTunnelRow) row)
+                                    .getBytesSize(
+                                            rowTypeMap.get(((SeaTunnelRow) 
row).getTableId()));
+                } else {
+                    throw new SeaTunnelEngineException(
+                            "Unsupported row type: " + 
rowType.getClass().getName());
+                }
+                sourceReceivedBytes.inc(size);
+                sourceReceivedBytesPerSeconds.markEvent(size);
+                if (flowControlGate != null) {
+                    flowControlGate.audit((SeaTunnelRow) row);
+                }
+            }
             sendRecordToNext(new Record<>(row));
             emptyThisPollNext = false;
             sourceReceivedCount.inc();
@@ -76,6 +132,17 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
     @Override
     public void collect(SchemaChangeEvent event) {
         try {
+            if (rowType instanceof SeaTunnelRowType) {
+                rowType = dataTypeChangeEventHandler.reset((SeaTunnelRowType) 
rowType).apply(event);
+            } else if (rowType instanceof MultipleRowType) {
+                String tableId = event.tablePath().toString();
+                rowTypeMap.put(
+                        tableId,
+                        
dataTypeChangeEventHandler.reset(rowTypeMap.get(tableId)).apply(event));
+            } else {
+                throw new SeaTunnelEngineException(
+                        "Unsupported row type: " + 
rowType.getClass().getName());
+            }
             sendRecordToNext(new Record<>(event));
         } catch (IOException e) {
             throw new RuntimeException(e);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index 8650dc7f2a..80a0dff04e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -18,8 +18,10 @@
 package org.apache.seatunnel.engine.server.task;
 
 import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
 import 
org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
@@ -34,6 +36,7 @@ import lombok.Getter;
 import lombok.NonNull;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 public class SourceSeaTunnelTask<T, SplitT extends SourceSplit> extends 
SeaTunnelTask {
@@ -44,15 +47,18 @@ public class SourceSeaTunnelTask<T, SplitT extends 
SourceSplit> extends SeaTunne
 
     private transient Object checkpointLock;
     @Getter private transient Serializer<SplitT> splitSerializer;
+    private final Map<String, Object> envOption;
     private final PhysicalExecutionFlow<SourceAction, SourceConfig> sourceFlow;
 
     public SourceSeaTunnelTask(
             long jobID,
             TaskLocation taskID,
             int indexID,
-            PhysicalExecutionFlow<SourceAction, SourceConfig> executionFlow) {
+            PhysicalExecutionFlow<SourceAction, SourceConfig> executionFlow,
+            Map<String, Object> envOption) {
         super(jobID, taskID, indexID, executionFlow);
         this.sourceFlow = executionFlow;
+        this.envOption = envOption;
     }
 
     @Override
@@ -69,7 +75,11 @@ public class SourceSeaTunnelTask<T, SplitT extends 
SourceSplit> extends SeaTunne
         } else {
             this.collector =
                     new SeaTunnelSourceCollector<>(
-                            checkpointLock, outputs, this.getMetricsContext());
+                            checkpointLock,
+                            outputs,
+                            this.getMetricsContext(),
+                            getFlowControlStrategy(),
+                            
sourceFlow.getAction().getSource().getProducedType());
             ((SourceFlowLifeCycle<T, SplitT>) 
startFlowLifeCycle).setCollector(collector);
         }
     }
@@ -111,4 +121,38 @@ public class SourceSeaTunnelTask<T, SplitT extends 
SourceSplit> extends SeaTunne
                 (SourceFlowLifeCycle<T, SplitT>) startFlowLifeCycle;
         sourceFlow.triggerBarrier(barrier);
     }
+
+    private FlowControlStrategy getFlowControlStrategy() {
+        FlowControlStrategy strategy;
+        if 
(envOption.containsKey(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
+                && 
envOption.containsKey(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())) {
+            strategy =
+                    FlowControlStrategy.of(
+                            Integer.parseInt(
+                                    envOption
+                                            
.get(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
+                                            .toString()),
+                            Integer.parseInt(
+                                    envOption
+                                            
.get(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())
+                                            .toString()));
+        } else if 
(envOption.containsKey(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())) {
+            strategy =
+                    FlowControlStrategy.ofBytes(
+                            Integer.parseInt(
+                                    envOption
+                                            
.get(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
+                                            .toString()));
+        } else if 
(envOption.containsKey(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())) {
+            strategy =
+                    FlowControlStrategy.ofCount(
+                            Integer.parseInt(
+                                    envOption
+                                            
.get(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())
+                                            .toString()));
+        } else {
+            strategy = null;
+        }
+        return strategy;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index c51e3483c0..202e0c2e8b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
 import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
@@ -53,6 +54,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
 import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
@@ -87,6 +90,10 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
 
     private Meter sinkWriteQPS;
 
+    private Counter sinkWriteBytes;
+
+    private Meter sinkWriteBytesPerSeconds;
+
     private final boolean containAggCommitter;
 
     public SinkFlowLifeCycle(
@@ -107,6 +114,8 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
         this.metricsContext = metricsContext;
         sinkWriteCount = metricsContext.counter(SINK_WRITE_COUNT);
         sinkWriteQPS = metricsContext.meter(SINK_WRITE_QPS);
+        sinkWriteBytes = metricsContext.counter(SINK_WRITE_BYTES);
+        sinkWriteBytesPerSeconds = 
metricsContext.meter(SINK_WRITE_BYTES_PER_SECONDS);
     }
 
     @Override
@@ -227,6 +236,11 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                 writer.write((T) record.getData());
                 sinkWriteCount.inc();
                 sinkWriteQPS.markEvent();
+                if (record.getData() instanceof SeaTunnelRow) {
+                    long size = ((SeaTunnelRow) 
record.getData()).getBytesSize();
+                    sinkWriteBytes.inc(size);
+                    sinkWriteBytesPerSeconds.markEvent(size);
+                }
             }
         } catch (Exception e) {
             throw new RuntimeException(e);


Reply via email to