This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 8348f1a10 [Feature][Zeta] Support shuffle multiple rows by tableId
(#4147)
8348f1a10 is described below
commit 8348f1a10895460cba2d65489be4905c5099289b
Author: hailin0 <[email protected]>
AuthorDate: Wed Feb 22 16:59:03 2023 +0800
[Feature][Zeta] Support shuffle multiple rows by tableId (#4147)
---
pom.xml | 2 +-
.../seatunnel/api/table/type/MultipleRowType.java | 15 +-
.../row/SeaTunnelRowDebeziumDeserializeSchema.java | 13 +-
.../source/MySqlIncrementalSourceFactory.java | 2 +-
.../seatunnel/console/sink/ConsoleSinkWriter.java | 4 +-
.../engine/common/config/EngineConfig.java | 3 +-
.../engine/core/dag/actions/AbstractAction.java | 31 +-
.../seatunnel/engine/core/dag/actions/Action.java | 2 +
.../seatunnel/engine/core/dag/actions/Config.java} | 28 +-
.../engine/core/dag/actions/ShuffleAction.java | 29 +-
.../actions/{Action.java => ShuffleConfig.java} | 50 +--
.../dag/actions/ShuffleMultipleRowStrategy.java | 83 +++++
.../core/dag/actions/ShufflePartitionStrategy.java | 88 +++++
.../engine/core/dag/actions/ShuffleStrategy.java | 71 ++++
.../engine/core/dag/actions/SinkAction.java | 22 +-
.../dag/actions/{Action.java => SinkConfig.java} | 34 +-
.../seatunnel/engine/server/dag/DAGUtils.java | 9 +-
.../dag/execution/ExecutionPlanGenerator.java | 383 +++++++++++++--------
.../server/dag/physical/PhysicalPlanGenerator.java | 136 +++++---
.../engine/server/dag/physical/PlanUtils.java | 9 +-
.../server/execution/TaskExecutionContext.java | 4 +
.../seatunnel/engine/server/master/JobMaster.java | 44 ++-
.../engine/server/task/SeaTunnelTask.java | 13 +-
.../server/task/flow/ShuffleSinkFlowLifeCycle.java | 124 ++-----
.../task/flow/ShuffleSourceFlowLifeCycle.java | 13 +-
.../apache/seatunnel/engine/server/TestUtils.java | 10 +
.../server/checkpoint/CheckpointPlanTest.java | 13 +-
.../seatunnel/engine/server/dag/TaskTest.java | 22 +-
28 files changed, 830 insertions(+), 427 deletions(-)
diff --git a/pom.xml b/pom.xml
index 28a61e1e1..e2b1b5b39 100644
--- a/pom.xml
+++ b/pom.xml
@@ -142,7 +142,7 @@
<javax.servlet.jap.version>2.1</javax.servlet.jap.version>
<hadoop.binary.version>2.7</hadoop.binary.version>
<jackson.version>2.12.6</jackson.version>
- <lombok.version>1.18.0</lombok.version>
+ <lombok.version>1.18.24</lombok.version>
<commons-compress.version>1.20</commons-compress.version>
<skip.pmd.check>false</skip.pmd.check>
<maven.deploy.skip>false</maven.deploy.skip>
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MultipleRowType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MultipleRowType.java
index 5d7fe330f..8602e4205 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MultipleRowType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MultipleRowType.java
@@ -17,21 +17,28 @@
package org.apache.seatunnel.api.table.type;
-import lombok.RequiredArgsConstructor;
+import lombok.Getter;
-import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.Map;
-@RequiredArgsConstructor
public class MultipleRowType implements SeaTunnelDataType<SeaTunnelRow>,
Iterable<Map.Entry<String, SeaTunnelRowType>> {
private final Map<String, SeaTunnelRowType> rowTypeMap;
+ @Getter
+ private String[] tableIds;
public MultipleRowType(String[] tableIds, SeaTunnelRowType[] rowTypes) {
- Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
+ Map<String, SeaTunnelRowType> rowTypeMap = new LinkedHashMap<>();
for (int i = 0; i < tableIds.length; i++) {
rowTypeMap.put(tableIds[i], rowTypes[i]);
}
+ this.tableIds = tableIds;
+ this.rowTypeMap = rowTypeMap;
+ }
+
+ public MultipleRowType(Map<String, SeaTunnelRowType> rowTypeMap) {
+ this.tableIds = rowTypeMap.keySet().toArray(new String[0]);
this.rowTypeMap = rowTypeMap;
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
index c94d59064..2c0087213 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.cdc.debezium.row;
import static com.google.common.base.Preconditions.checkNotNull;
-import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.MultipleRowType;
@@ -114,34 +113,32 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
Schema valueSchema = record.valueSchema();
Struct sourceStruct =
messageStruct.getStruct(Envelope.FieldName.SOURCE);
- String database = sourceStruct.getString(DATABASE_NAME_KEY);
String tableName =
sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY);
- String tableId = database + ":" + tableName;
- SeaTunnelRowDebeziumDeserializationConverters converters =
multipleRowConverters.getOrDefault(tableId, singleRowConverter);
+ SeaTunnelRowDebeziumDeserializationConverters converters =
multipleRowConverters.getOrDefault(tableName, singleRowConverter);
if (operation == Envelope.Operation.CREATE || operation ==
Envelope.Operation.READ) {
SeaTunnelRow insert = extractAfterRow(converters, record,
messageStruct, valueSchema);
insert.setRowKind(RowKind.INSERT);
- insert.setTableId(tableId);
+ insert.setTableId(tableName);
validator.validate(insert, RowKind.INSERT);
collector.collect(insert);
} else if (operation == Envelope.Operation.DELETE) {
SeaTunnelRow delete = extractBeforeRow(converters, record,
messageStruct, valueSchema);
validator.validate(delete, RowKind.DELETE);
delete.setRowKind(RowKind.DELETE);
- delete.setTableId(tableId);
+ delete.setTableId(tableName);
collector.collect(delete);
} else {
SeaTunnelRow before = extractBeforeRow(converters, record,
messageStruct, valueSchema);
validator.validate(before, RowKind.UPDATE_BEFORE);
before.setRowKind(RowKind.UPDATE_BEFORE);
- before.setTableId(tableId);
+ before.setTableId(tableName);
collector.collect(before);
SeaTunnelRow after = extractAfterRow(converters, record,
messageStruct, valueSchema);
validator.validate(after, RowKind.UPDATE_AFTER);
after.setRowKind(RowKind.UPDATE_AFTER);
- after.setTableId(tableId);
+ after.setTableId(tableName);
collector.collect(after);
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index f97894522..b6ad57e3d 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -84,7 +84,7 @@ public class MySqlIncrementalSourceFactory implements
TableSourceFactory, Suppor
} else {
Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
for (CatalogTable catalogTable : context.getCatalogTables()) {
- String tableId =
catalogTable.getTableId().getDatabaseName() + ":" +
catalogTable.getTableId().getTableName();
+ String tableId = catalogTable.getTableId().getTableName();
rowTypeMap.put(tableId,
catalogTable.getTableSchema().toPhysicalRowDataType());
}
dataType = new MultipleRowType(rowTypeMap);
diff --git
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index 84bad94b9..d3ade7480 100644
---
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
public class ConsoleSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
private final SeaTunnelRowType seaTunnelRowType;
- public static final AtomicLong CNT = new AtomicLong(0);
+ public final AtomicLong rowCounter = new AtomicLong(0);
public SinkWriter.Context context;
public ConsoleSinkWriter(SeaTunnelRowType seaTunnelRowType,
SinkWriter.Context context) {
@@ -54,7 +54,7 @@ public class ConsoleSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
for (int i = 0; i < fieldTypes.length; i++) {
arr[i] = fieldToString(fieldTypes[i], fields[i]);
}
- log.info("subtaskIndex={} rowIndex={}: SeaTunnelRow#tableId={}
SeaTunnelRow#kind={} : {}", context.getIndexOfSubtask(), CNT.incrementAndGet(),
element.getTableId(), element.getRowKind(), StringUtils.join(arr, ", "));
+ log.info("subtaskIndex={} rowIndex={}: SeaTunnelRow#tableId={}
SeaTunnelRow#kind={} : {}", context.getIndexOfSubtask(),
rowCounter.incrementAndGet(), element.getTableId(), element.getRowKind(),
StringUtils.join(arr, ", "));
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index a51b8a128..e0898aafd 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -64,8 +64,9 @@ public class EngineConfig {
this.jobMetricsBackupInterval = jobMetricsBackupInterval;
}
- public void setQueueType(QueueType queueType) {
+ public EngineConfig setQueueType(QueueType queueType) {
checkNotNull(queueType);
this.queueType = queueType;
+ return this;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
index 33c94cea5..8b2cf3670 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/AbstractAction.java
@@ -34,17 +34,31 @@ public abstract class AbstractAction implements Action {
private final Set<URL> jarUrls;
- protected AbstractAction(long id, @NonNull String name, @NonNull
List<Action> upstreams, @NonNull Set<URL> jarUrls) {
- this.id = id;
- this.name = name;
- this.upstreams = upstreams;
- this.jarUrls = jarUrls;
+ private final Config config;
+
+ protected AbstractAction(long id,
+ @NonNull String name,
+ @NonNull Set<URL> jarUrls) {
+ this(id, name, new ArrayList<>(), jarUrls);
}
- protected AbstractAction(long id, @NonNull String name, @NonNull Set<URL>
jarUrls) {
+ protected AbstractAction(long id,
+ @NonNull String name,
+ @NonNull List<Action> upstreams,
+ @NonNull Set<URL> jarUrls) {
+ this(id, name, upstreams, jarUrls, null);
+ }
+
+ protected AbstractAction(long id,
+ @NonNull String name,
+ @NonNull List<Action> upstreams,
+ @NonNull Set<URL> jarUrls,
+ Config config) {
this.id = id;
this.name = name;
+ this.upstreams = upstreams;
this.jarUrls = jarUrls;
+ this.config = config;
}
@NonNull
@@ -88,4 +102,9 @@ public abstract class AbstractAction implements Action {
public Set<URL> getJarUrls() {
return jarUrls;
}
+
+ @Override
+ public Config getConfig() {
+ return config;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
index b06fcb7a7..bf8ebd1df 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
@@ -42,4 +42,6 @@ public interface Action extends Serializable {
long getId();
Set<URL> getJarUrls();
+
+ Config getConfig();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/config/PartitionConfig.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Config.java
similarity index 54%
rename from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/config/PartitionConfig.java
rename to
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Config.java
index 74f81e7f5..d053a6e7f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/config/PartitionConfig.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Config.java
@@ -15,31 +15,9 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.dag.physical.config;
+package org.apache.seatunnel.engine.core.dag.actions;
-public class PartitionConfig implements FlowConfig {
+import java.io.Serializable;
- private final int partitionCount;
-
- private final int targetCount;
-
- private final int parallelismIndex;
-
- public PartitionConfig(int partitionCount, int targetCount, int
parallelismIndex) {
- this.partitionCount = partitionCount;
- this.targetCount = targetCount;
- this.parallelismIndex = parallelismIndex;
- }
-
- public int getPartitionCount() {
- return partitionCount;
- }
-
- public int getTargetCount() {
- return targetCount;
- }
-
- public int getParallelismIndex() {
- return parallelismIndex;
- }
+public interface Config extends Serializable {
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
index 8c6654659..18858b42e 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleAction.java
@@ -17,36 +17,21 @@
package org.apache.seatunnel.engine.core.dag.actions;
-import org.apache.seatunnel.api.transform.PartitionSeaTunnelTransform;
-
import lombok.NonNull;
-import java.net.URL;
-import java.util.List;
-import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
public class ShuffleAction extends AbstractAction {
- private final PartitionSeaTunnelTransform partitionTransformation;
-
- public ShuffleAction(long id,
- @NonNull String name,
- @NonNull List<Action> upstreams,
- @NonNull PartitionSeaTunnelTransform
partitionTransformation,
- @NonNull Set<URL> jarUrls) {
- super(id, name, upstreams, jarUrls);
- this.partitionTransformation = partitionTransformation;
- }
-
public ShuffleAction(long id,
@NonNull String name,
- @NonNull PartitionSeaTunnelTransform
partitionTransformation,
- @NonNull Set<URL> jarUrls) {
- super(id, name, jarUrls);
- this.partitionTransformation = partitionTransformation;
+ @NonNull ShuffleConfig shuffleConfig) {
+ super(id, name, new ArrayList<>(), new HashSet<>(), shuffleConfig);
}
- public PartitionSeaTunnelTransform getPartitionTransformation() {
- return partitionTransformation;
+ @Override
+ public ShuffleConfig getConfig() {
+ return (ShuffleConfig) super.getConfig();
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleConfig.java
similarity index 56%
copy from
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
copy to
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleConfig.java
index b06fcb7a7..3b145e611 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleConfig.java
@@ -17,29 +17,29 @@
package org.apache.seatunnel.engine.core.dag.actions;
-import lombok.NonNull;
-
-import java.io.Serializable;
-import java.net.URL;
-import java.util.List;
-import java.util.Set;
-
-public interface Action extends Serializable {
- @NonNull
- String getName();
-
- void setName(@NonNull String name);
-
- @NonNull
- List<Action> getUpstream();
-
- void addUpstream(@NonNull Action action);
-
- int getParallelism();
-
- void setParallelism(int parallelism);
-
- long getId();
-
- Set<URL> getJarUrls();
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Tolerate;
+
+import java.util.concurrent.TimeUnit;
+
+@Getter
+@Setter
+@ToString
+@Builder(toBuilder = true)
+public class ShuffleConfig implements Config {
+ public static final int DEFAULT_BATCH_SIZE = 1024;
+ public static final long DEFAULT_BATCH_FLUSH_INTERVAL =
TimeUnit.SECONDS.toMillis(3);
+
+ @Builder.Default
+ private int batchSize = DEFAULT_BATCH_SIZE;
+ @Builder.Default
+ private long batchFlushInterval = DEFAULT_BATCH_FLUSH_INTERVAL;
+ private ShuffleStrategy shuffleStrategy;
+
+ @Tolerate
+ public ShuffleConfig() {
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
new file mode 100644
index 000000000..c84ea04ec
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import com.hazelcast.collection.IQueue;
+import com.hazelcast.core.HazelcastInstance;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import lombok.experimental.Tolerate;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+@SuperBuilder(toBuilder = true)
+@Getter
+@Setter
+@ToString
+public class ShuffleMultipleRowStrategy extends ShuffleStrategy {
+ private MultipleRowType inputRowType;
+ private String targetTableId;
+
+ @Tolerate
+ public ShuffleMultipleRowStrategy() {
+ }
+
+ @Override
+ public Map<String, IQueue<Record<?>>> createShuffles(HazelcastInstance
hazelcast, int pipelineId, int inputIndex) {
+ Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
+ for (Map.Entry<String, SeaTunnelRowType> entry : inputRowType) {
+ String tableId = entry.getKey();
+ String queueName = generateQueueName(pipelineId, inputIndex,
tableId);
+ IQueue<Record<?>> queue = getIQueue(hazelcast, queueName);
+ // clear old data when job restore
+ queue.clear();
+ shuffleMap.put(queueName, queue);
+ }
+ return shuffleMap;
+ }
+
+ @Override
+ public String createShuffleKey(Record<?> record, int pipelineId, int
inputIndex) {
+ String tableId = ((SeaTunnelRow) record.getData()).getTableId();
+ return generateQueueName(pipelineId, inputIndex, tableId);
+ }
+
+ @Override
+ public IQueue<Record<?>>[] getShuffles(HazelcastInstance hazelcast, int
pipelineId, int targetIndex) {
+ IQueue<Record<?>>[] queues = new IQueue[getInputPartitions()];
+ for (int inputIndex = 0; inputIndex < getInputPartitions();
inputIndex++) {
+ Objects.requireNonNull(targetTableId);
+ String queueName = generateQueueName(pipelineId, inputIndex,
targetTableId);
+ queues[inputIndex] = getIQueue(hazelcast, queueName);
+ }
+ return queues;
+ }
+
+ private String generateQueueName(int pipelineId, int inputIndex, String
tableId) {
+ return "ShuffleMultipleRow-Queue[" + getJobId() + "-" + pipelineId +
"-" + inputIndex + "-" + tableId + "]";
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
new file mode 100644
index 000000000..e77f59709
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShufflePartitionStrategy.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.seatunnel.api.table.type.Record;
+
+import com.hazelcast.collection.IQueue;
+import com.hazelcast.core.HazelcastInstance;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import lombok.experimental.Tolerate;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+@SuperBuilder
+@Getter
+@Setter
+@ToString
+public class ShufflePartitionStrategy extends ShuffleStrategy {
+ private final Map<Integer, String[]> inputQueueMapping = new HashMap<>();
+ private int targetPartitions;
+
+ @Tolerate
+ public ShufflePartitionStrategy() {
+ }
+
+ @Override
+ public Map<String, IQueue<Record<?>>> createShuffles(HazelcastInstance
hazelcast, int pipelineId, int inputIndex) {
+ checkArgument(inputIndex >= 0 && inputIndex < getInputPartitions());
+ Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
+ for (int targetIndex = 0; targetIndex < targetPartitions;
targetIndex++) {
+ String queueName = generateQueueName(pipelineId, inputIndex,
targetIndex);
+ IQueue<Record<?>> queue = getIQueue(hazelcast, queueName);
+ // clear old data when job restore
+ queue.clear();
+ shuffleMap.put(queueName, queue);
+ }
+ return shuffleMap;
+ }
+
+ @Override
+ public String createShuffleKey(Record<?> record, int pipelineId, int
inputIndex) {
+ String[] inputQueueNames =
inputQueueMapping.computeIfAbsent(inputIndex, key -> {
+ String[] queueNames = new String[targetPartitions];
+ for (int targetIndex = 0; targetIndex < targetPartitions;
targetIndex++) {
+ queueNames[targetIndex] = generateQueueName(pipelineId, key,
targetIndex);
+ }
+ return queueNames;
+ });
+ return
inputQueueNames[ThreadLocalRandom.current().nextInt(targetPartitions)];
+ }
+
+ @Override
+ public IQueue<Record<?>>[] getShuffles(HazelcastInstance hazelcast, int
pipelineId, int targetIndex) {
+ checkArgument(targetIndex >= 0 && targetIndex < targetPartitions);
+ IQueue<Record<?>>[] shuffles = new IQueue[getInputPartitions()];
+ for (int inputIndex = 0; inputIndex < getInputPartitions();
inputIndex++) {
+ String queueName = generateQueueName(pipelineId, inputIndex,
targetIndex);
+ shuffles[inputIndex] = getIQueue(hazelcast, queueName);
+ }
+ return shuffles;
+ }
+
+ private String generateQueueName(int pipelineId, int inputIndex, int
targetIndex) {
+ return String.format("ShufflePartition-Queue[%s-%s-%s-%s]",
getJobId(), pipelineId, inputIndex, targetIndex);
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleStrategy.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleStrategy.java
new file mode 100644
index 000000000..36cbe117c
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleStrategy.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.dag.actions;
+
+import org.apache.seatunnel.api.table.type.Record;
+
+import com.hazelcast.collection.IQueue;
+import com.hazelcast.config.QueueConfig;
+import com.hazelcast.core.HazelcastInstance;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import lombok.experimental.Tolerate;
+
+import java.io.Serializable;
+import java.util.Map;
+
+@SuperBuilder(toBuilder = true)
+@Getter
+@Setter
+@ToString
+public abstract class ShuffleStrategy implements Serializable {
+ private static final int DEFAULT_QUEUE_SIZE = 2048;
+ private static final int DEFAULT_QUEUE_BACKUP_COUNT = 0;
+ private static final int DEFAULT_QUEUE_ASYNC_BACKUP_COUNT = 0;
+
+ protected long jobId;
+ protected int inputPartitions;
+ @Builder.Default
+ protected int queueMaxSize = DEFAULT_QUEUE_SIZE;
+ @Builder.Default
+ protected int queueBackupCount = DEFAULT_QUEUE_BACKUP_COUNT;
+ @Builder.Default
+ protected int queueAsyncBackupCount = DEFAULT_QUEUE_ASYNC_BACKUP_COUNT;
+ protected int queueEmptyQueueTtl;
+
+ @Tolerate
+ public ShuffleStrategy(){}
+
+ public abstract Map<String, IQueue<Record<?>>>
createShuffles(HazelcastInstance hazelcast, int pipelineId, int inputIndex);
+
+ public abstract String createShuffleKey(Record<?> record, int pipelineId,
int inputIndex);
+
+ public abstract IQueue<Record<?>>[] getShuffles(HazelcastInstance
hazelcast, int pipelineId, int targetIndex);
+
+ protected IQueue<Record<?>> getIQueue(HazelcastInstance hazelcast, String
queueName) {
+ QueueConfig targetQueueConfig =
hazelcast.getConfig().getQueueConfig(queueName);
+ targetQueueConfig.setMaxSize(queueMaxSize);
+ targetQueueConfig.setBackupCount(queueBackupCount);
+ targetQueueConfig.setAsyncBackupCount(queueAsyncBackupCount);
+ targetQueueConfig.setEmptyQueueTtl(queueEmptyQueueTtl);
+ return hazelcast.getQueue(queueName);
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
index e9b83ead3..411bfcd73 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkAction.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
import lombok.NonNull;
import java.net.URL;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -31,22 +32,35 @@ public class SinkAction<IN, StateT, CommitInfoT,
AggregatedCommitInfoT> extends
public SinkAction(long id,
@NonNull String name,
- @NonNull List<Action> upstreams,
@NonNull SeaTunnelSink<IN, StateT, CommitInfoT,
AggregatedCommitInfoT> sink,
@NonNull Set<URL> jarUrls) {
- super(id, name, upstreams, jarUrls);
- this.sink = sink;
+ this(id, name, new ArrayList<>(), sink, jarUrls);
}
public SinkAction(long id,
@NonNull String name,
+ @NonNull List<Action> upstreams,
@NonNull SeaTunnelSink<IN, StateT, CommitInfoT,
AggregatedCommitInfoT> sink,
@NonNull Set<URL> jarUrls) {
- super(id, name, jarUrls);
+ this(id, name, upstreams, sink, jarUrls, null);
+ }
+
+ public SinkAction(long id,
+ @NonNull String name,
+ @NonNull List<Action> upstreams,
+ @NonNull SeaTunnelSink<IN, StateT, CommitInfoT,
AggregatedCommitInfoT> sink,
+ @NonNull Set<URL> jarUrls,
+ SinkConfig config) {
+ super(id, name, upstreams, jarUrls, config);
this.sink = sink;
}
public SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
getSink() {
return sink;
}
+
+ @Override
+ public SinkConfig getConfig() {
+ return (SinkConfig) super.getConfig();
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java
similarity index 65%
copy from
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
copy to
seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java
index b06fcb7a7..17d925452 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/Action.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/SinkConfig.java
@@ -17,29 +17,13 @@
package org.apache.seatunnel.engine.core.dag.actions;
-import lombok.NonNull;
-
-import java.io.Serializable;
-import java.net.URL;
-import java.util.List;
-import java.util.Set;
-
-public interface Action extends Serializable {
- @NonNull
- String getName();
-
- void setName(@NonNull String name);
-
- @NonNull
- List<Action> getUpstream();
-
- void addUpstream(@NonNull Action action);
-
- int getParallelism();
-
- void setParallelism(int parallelism);
-
- long getId();
-
- Set<URL> getJarUrls();
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+@Getter
+@NoArgsConstructor
+@AllArgsConstructor
+public class SinkConfig implements Config {
+ private String multipleRowTableId;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
index e0552ae20..462ebe2a4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.dag;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.core.dag.actions.ActionUtils;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
@@ -35,8 +36,12 @@ import java.util.stream.Collectors;
public class DAGUtils {
- public static JobDAGInfo getJobDAGInfo(LogicalDag logicalDag,
JobImmutableInformation jobImmutableInformation, boolean isPhysicalDAGIInfo) {
- List<Pipeline> pipelines = new ExecutionPlanGenerator(logicalDag,
jobImmutableInformation).generate().getPipelines();
+ public static JobDAGInfo getJobDAGInfo(LogicalDag logicalDag,
+ JobImmutableInformation
jobImmutableInformation,
+ CheckpointConfig checkpointConfig,
+ boolean isPhysicalDAGIInfo) {
+ List<Pipeline> pipelines = new ExecutionPlanGenerator(
+ logicalDag, jobImmutableInformation,
checkpointConfig).generate().getPipelines();
if (isPhysicalDAGIInfo) {
// Generate ExecutePlan DAG
Map<Integer, List<Edge>> pipelineWithEdges = new HashMap<>();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index 1260b91fc..c344ab08a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -19,10 +19,17 @@ package org.apache.seatunnel.engine.server.dag.execution;
import static com.google.common.base.Preconditions.checkArgument;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleMultipleRowStrategy;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.actions.TransformAction;
@@ -34,189 +41,282 @@ import
org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
-import java.util.stream.Collectors;
+@Slf4j
public class ExecutionPlanGenerator {
- /**
- * The action ID needs to be regenerated because of the source, sink, and
chain.
- */
- private final IdGenerator idGenerator = new IdGenerator();
-
- /**
- * key: the id of the execution vertex.
- * <br> value: the execution vertex.
- */
- private final Map<Long, ExecutionVertex> executionVertexMap = new
HashMap<>();
-
- /**
- * key: the vertex id.
- * <br> value: The input vertices of this vertex.
- *
- * <p>When chaining vertices, it need to query whether the vertex has
multiple input vertices. </p>
- */
- private final Map<Long, List<LogicalVertex>> inputVerticesMap = new
HashMap<>();
-
- /**
- * key: the vertex id.
- * <br> value: The target vertices of this vertex.
- *
- * <p>When chaining vertices, it need to query whether the vertex has
multiple target vertices. </p>
- */
- private final Map<Long, List<LogicalVertex>> targetVerticesMap = new
HashMap<>();
-
- /**
- * key: logical vertex id.
- * <br> value: execution vertex id.
- *
- * <p>The chaining will cause multiple {@link LogicalVertex} mapping to
the same {@link ExecutionVertex}. </p>
- */
- private final Map<Long, Long> logicalToExecutionMap = new HashMap<>();
-
- /**
- * When chaining, the edge will be removed {@link #findChainedVertices} if
it can be chained.
- */
- private final List<LogicalEdge> logicalEdges;
-
- private final List<LogicalVertex> logicalVertices;
-
+ private final LogicalDag logicalPlan;
private final JobImmutableInformation jobImmutableInformation;
+ private final CheckpointConfig checkpointConfig;
+ private final IdGenerator idGenerator = new IdGenerator();
public ExecutionPlanGenerator(@NonNull LogicalDag logicalPlan,
- @NonNull JobImmutableInformation
jobImmutableInformation) {
- checkArgument(logicalPlan.getEdges().size() > 0, "ExecutionPlan
Builder must have LogicalPlan.");
-
- this.logicalEdges = new ArrayList<>(logicalPlan.getEdges());
- this.logicalVertices = new
ArrayList<>(logicalPlan.getLogicalVertexMap().values());
+ @NonNull JobImmutableInformation
jobImmutableInformation,
+ @NonNull CheckpointConfig checkpointConfig) {
+ checkArgument(logicalPlan.getEdges().size() > 0,
+ "ExecutionPlan Builder must have LogicalPlan.");
+ this.logicalPlan = logicalPlan;
this.jobImmutableInformation = jobImmutableInformation;
+ this.checkpointConfig = checkpointConfig;
}
public ExecutionPlan generate() {
- fillVerticesMap();
- getSourceVertices().forEach(sourceVertex -> {
- List<LogicalVertex> vertices = new ArrayList<>();
- vertices.add(sourceVertex);
- for (int index = 0; index < vertices.size(); index++) {
- LogicalVertex vertex = vertices.get(index);
- createExecutionVertex(vertex);
- if (targetVerticesMap.containsKey(vertex.getVertexId())) {
-
vertices.addAll(targetVerticesMap.get(vertex.getVertexId()));
+ log.debug("Generate execution plan using logical plan:");
+
+ Set<ExecutionEdge> executionEdges =
generateExecutionEdges(logicalPlan.getEdges());
+ log.debug("Phase 1: generate execution edge list {}", executionEdges);
+
+ executionEdges = generateShuffleEdges(executionEdges);
+ log.debug("Phase 2: generate shuffle edge list {}", executionEdges);
+
+ executionEdges = generateTransformChainEdges(executionEdges);
+ log.debug("Phase 3: generate transform chain edge list {}",
executionEdges);
+
+ List<Pipeline> pipelines = generatePipelines(executionEdges);
+ log.debug("Phase 4: generate pipeline list {}", pipelines);
+
+ ExecutionPlan executionPlan = new ExecutionPlan(pipelines,
jobImmutableInformation);
+ log.debug("Phase 5: generate execution plan: {}", executionPlan);
+
+ return executionPlan;
+ }
+
+ public static Action recreateAction(Action action, Long id, int
parallelism) {
+ Action newAction;
+ if (action instanceof ShuffleAction) {
+ newAction = new ShuffleAction(id, action.getName(),
+ ((ShuffleAction) action).getConfig());
+ } else if (action instanceof SinkAction) {
+ newAction = new SinkAction<>(id, action.getName(),
+ ((SinkAction<?, ?, ?, ?>) action).getSink(),
action.getJarUrls());
+ } else if (action instanceof SourceAction) {
+ newAction = new SourceAction<>(id, action.getName(),
+ ((SourceAction<?, ?, ?>) action).getSource(),
action.getJarUrls());
+ } else if (action instanceof TransformAction) {
+ newAction = new TransformAction(id, action.getName(),
+ ((TransformAction) action).getTransform(),
action.getJarUrls());
+ } else if (action instanceof TransformChainAction) {
+ newAction = new TransformChainAction(id, action.getName(),
+ action.getJarUrls(), ((TransformChainAction<?>)
action).getTransforms());
+ } else {
+ throw new UnknownActionException(action);
+ }
+ newAction.setParallelism(parallelism);
+ return newAction;
+ }
+
+ private Set<ExecutionEdge> generateExecutionEdges(Set<LogicalEdge>
logicalEdges) {
+ Set<ExecutionEdge> executionEdges = new HashSet<>();
+
+ Map<Long, ExecutionVertex> logicalVertexIdToExecutionVertexMap = new
HashMap();
+
+ List<LogicalEdge> sortedLogicalEdges = new ArrayList<>(logicalEdges);
+ Collections.sort(sortedLogicalEdges,
Comparator.comparingLong(LogicalEdge::getInputVertexId));
+ for (LogicalEdge logicalEdge : sortedLogicalEdges) {
+ LogicalVertex logicalInputVertex = logicalEdge.getInputVertex();
+ ExecutionVertex executionInputVertex =
logicalVertexIdToExecutionVertexMap.computeIfAbsent(
+ logicalInputVertex.getVertexId(),
+ vertexId -> {
+ long newId = idGenerator.getNextId();
+ Action newLogicalInputAction = recreateAction(
+ logicalInputVertex.getAction(), newId,
logicalInputVertex.getParallelism());
+ return new ExecutionVertex(newId, newLogicalInputAction,
logicalInputVertex.getParallelism());
+ });
+
+ LogicalVertex logicalTargetVertex = logicalEdge.getTargetVertex();
+ ExecutionVertex executionTargetVertex =
logicalVertexIdToExecutionVertexMap.computeIfAbsent(
+ logicalTargetVertex.getVertexId(),
+ vertexId -> {
+ long newId = idGenerator.getNextId();
+ Action newLogicalTargetAction = recreateAction(
+ logicalTargetVertex.getAction(), newId,
logicalTargetVertex.getParallelism());
+ return new ExecutionVertex(newId, newLogicalTargetAction,
logicalTargetVertex.getParallelism());
}
+ );
+
+ ExecutionEdge executionEdge = new
ExecutionEdge(executionInputVertex, executionTargetVertex);
+ executionEdges.add(executionEdge);
+ }
+ return executionEdges;
+ }
+
+ @SuppressWarnings("MagicNumber")
+ private Set<ExecutionEdge> generateShuffleEdges(Set<ExecutionEdge>
executionEdges) {
+ Map<Long, List<ExecutionVertex>> targetVerticesMap = new HashMap<>();
+ Set<ExecutionVertex> sourceExecutionVertices = new HashSet<>();
+ executionEdges.forEach(edge -> {
+ ExecutionVertex leftVertex = edge.getLeftVertex();
+ ExecutionVertex rightVertex = edge.getRightVertex();
+ if (leftVertex.getAction() instanceof SourceAction) {
+ sourceExecutionVertices.add(leftVertex);
}
+ targetVerticesMap.computeIfAbsent(leftVertex.getVertexId(), id ->
new ArrayList<>())
+ .add(rightVertex);
});
- List<ExecutionEdge> executionEdges = createExecutionEdges();
- return new ExecutionPlan(new
PipelineGenerator(executionVertexMap.values(), executionEdges)
- .generatePipelines(), jobImmutableInformation);
- }
+ if (sourceExecutionVertices.size() != 1) {
+ return executionEdges;
+ }
+ ExecutionVertex sourceExecutionVertex =
sourceExecutionVertices.stream().findFirst().get();
+ SourceAction sourceAction = (SourceAction)
sourceExecutionVertex.getAction();
+ SeaTunnelDataType sourceProducedType =
sourceAction.getSource().getProducedType();
+ if (!SqlType.MULTIPLE_ROW.equals(sourceProducedType.getSqlType())) {
+ return executionEdges;
+ }
+
+ List<ExecutionVertex> sinkVertices =
targetVerticesMap.get(sourceExecutionVertex.getVertexId());
+ Optional<ExecutionVertex> hasOtherAction = sinkVertices.stream()
+ .filter(vertex -> !(vertex.getAction() instanceof SinkAction))
+ .findFirst();
+ checkArgument(!hasOtherAction.isPresent());
+
+ Set<ExecutionEdge> newExecutionEdges = new HashSet<>();
+ ShuffleStrategy shuffleStrategy = ShuffleMultipleRowStrategy.builder()
+ .jobId(jobImmutableInformation.getJobId())
+ .inputPartitions(sourceAction.getParallelism())
+ .inputRowType(MultipleRowType.class.cast(sourceProducedType))
+ .queueEmptyQueueTtl((int)
(checkpointConfig.getCheckpointInterval() * 3))
+ .build();
+ ShuffleConfig shuffleConfig = ShuffleConfig.builder()
+ .shuffleStrategy(shuffleStrategy)
+ .build();
+
+ long shuffleVertexId = idGenerator.getNextId();
+ String shuffleActionName = String.format("Shuffle [%s -> table[0~%s]]",
+ sourceAction.getName(), ((MultipleRowType)
sourceProducedType).getTableIds().length - 1);
+ ShuffleAction shuffleAction = new ShuffleAction(shuffleVertexId,
shuffleActionName, shuffleConfig);
+ // multiple-row shuffle default parallelism is 1
+ shuffleAction.setParallelism(1);
+ ExecutionVertex shuffleVertex = new ExecutionVertex(shuffleVertexId,
shuffleAction, shuffleAction.getParallelism());
+ ExecutionEdge sourceToShuffleEdge = new
ExecutionEdge(sourceExecutionVertex, shuffleVertex);
+ newExecutionEdges.add(sourceToShuffleEdge);
- public List<ExecutionEdge> createExecutionEdges() {
- return logicalEdges.stream()
- .map(logicalEdge -> new ExecutionEdge(
-
executionVertexMap.get(logicalToExecutionMap.get(logicalEdge.getInputVertexId())),
-
executionVertexMap.get(logicalToExecutionMap.get(logicalEdge.getTargetVertexId())))
- ).collect(Collectors.toList());
+ for (ExecutionVertex sinkVertex : sinkVertices) {
+ ExecutionEdge shuffleToSinkEdge = new ExecutionEdge(shuffleVertex,
sinkVertex);
+ newExecutionEdges.add(shuffleToSinkEdge);
+ }
+ return newExecutionEdges;
}
- private void fillVerticesMap() {
- logicalEdges.forEach(logicalEdge -> {
- inputVerticesMap.computeIfAbsent(logicalEdge.getTargetVertexId(),
id -> new ArrayList<>())
- .add(logicalEdge.getInputVertex());
- targetVerticesMap.computeIfAbsent(logicalEdge.getInputVertexId(),
id -> new ArrayList<>())
- .add(logicalEdge.getTargetVertex());
+ private Set<ExecutionEdge> generateTransformChainEdges(Set<ExecutionEdge>
executionEdges) {
+ Map<Long, List<ExecutionVertex>> inputVerticesMap = new HashMap<>();
+ Map<Long, List<ExecutionVertex>> targetVerticesMap = new HashMap<>();
+ Set<ExecutionVertex> sourceExecutionVertices = new HashSet<>();
+ executionEdges.forEach(edge -> {
+ ExecutionVertex leftVertex = edge.getLeftVertex();
+ ExecutionVertex rightVertex = edge.getRightVertex();
+ if (leftVertex.getAction() instanceof SourceAction) {
+ sourceExecutionVertices.add(leftVertex);
+ }
+ inputVerticesMap.computeIfAbsent(rightVertex.getVertexId(), id ->
new ArrayList<>())
+ .add(leftVertex);
+ targetVerticesMap.computeIfAbsent(leftVertex.getVertexId(), id ->
new ArrayList<>())
+ .add(rightVertex);
});
- }
- private List<LogicalVertex> getSourceVertices() {
- List<LogicalVertex> sourceVertices = new ArrayList<>();
- for (LogicalVertex logicalVertex : logicalVertices) {
- List<LogicalVertex> inputVertices =
inputVerticesMap.get(logicalVertex.getVertexId());
- if (inputVertices == null || inputVertices.size() == 0) {
- sourceVertices.add(logicalVertex);
+ Map<Long, ExecutionVertex> transformChainVertexMap = new HashMap<>();
+ Map<Long, Long> chainedTransformVerticesMapping = new HashMap<>();
+ for (ExecutionVertex sourceVertex : sourceExecutionVertices) {
+ List<ExecutionVertex> vertices = new ArrayList<>();
+ vertices.add(sourceVertex);
+ for (int index = 0; index < vertices.size(); index++) {
+ ExecutionVertex vertex = vertices.get(index);
+
+ fillChainedTransformExecutionVertex(vertex,
+ chainedTransformVerticesMapping, transformChainVertexMap,
executionEdges,
+ Collections.unmodifiableMap(inputVerticesMap),
+ Collections.unmodifiableMap(targetVerticesMap));
+
+ if (targetVerticesMap.containsKey(vertex.getVertexId())) {
+
vertices.addAll(targetVerticesMap.get(vertex.getVertexId()));
+ }
+ }
+ }
+
+ Set<ExecutionEdge> transformChainEdges = new HashSet<>();
+ for (ExecutionEdge executionEdge : executionEdges) {
+ ExecutionVertex leftVertex = executionEdge.getLeftVertex();
+ ExecutionVertex rightVertex = executionEdge.getRightVertex();
+ boolean needRebuild = false;
+ if
(chainedTransformVerticesMapping.containsKey(leftVertex.getVertexId())) {
+ needRebuild = true;
+ leftVertex =
transformChainVertexMap.get(chainedTransformVerticesMapping.get(leftVertex.getVertexId()));
+ }
+ if
(chainedTransformVerticesMapping.containsKey(rightVertex.getVertexId())) {
+ needRebuild = true;
+ rightVertex =
transformChainVertexMap.get(chainedTransformVerticesMapping.get(rightVertex.getVertexId()));
+ }
+ if (needRebuild) {
+ executionEdge = new ExecutionEdge(leftVertex, rightVertex);
}
+ transformChainEdges.add(executionEdge);
}
- return sourceVertices;
+ return transformChainEdges;
}
- private void createExecutionVertex(LogicalVertex logicalVertex) {
- if (logicalToExecutionMap.containsKey(logicalVertex.getVertexId())) {
+ private void fillChainedTransformExecutionVertex(ExecutionVertex
currentVertex,
+ Map<Long, Long>
chainedTransformVerticesMapping,
+ Map<Long,
ExecutionVertex> transformChainVertexMap,
+ Set<ExecutionEdge>
executionEdges,
+ Map<Long,
List<ExecutionVertex>> inputVerticesMap,
+ Map<Long,
List<ExecutionVertex>> targetVerticesMap) {
+ if
(chainedTransformVerticesMapping.containsKey(currentVertex.getVertexId())) {
return;
}
- final ArrayList<LogicalVertex> chainedVertices = new ArrayList<>();
- findChainedVertices(logicalVertex, chainedVertices);
- final long newId = idGenerator.getNextId();
- Action newAction;
- if (chainedVertices.size() < 1) {
- newAction = recreateAction(logicalVertex.getAction(), newId,
logicalVertex.getParallelism());
- } else {
- List<SeaTunnelTransform> transforms = new
ArrayList<>(chainedVertices.size());
- List<String> names = new ArrayList<>(chainedVertices.size());
+
+ List<ExecutionVertex> transformChainedVertices = new ArrayList<>();
+ collectChainedVertices(currentVertex, transformChainedVertices,
executionEdges, inputVerticesMap, targetVerticesMap);
+ if (transformChainedVertices.size() > 0) {
+ long newVertexId = idGenerator.getNextId();
+ List<SeaTunnelTransform> transforms = new
ArrayList<>(transformChainedVertices.size());
+ List<String> names = new
ArrayList<>(transformChainedVertices.size());
Set<URL> jars = new HashSet<>();
- chainedVertices.stream()
- .peek(vertex ->
logicalToExecutionMap.put(vertex.getVertexId(), newId))
- .map(LogicalVertex::getAction)
+
+ transformChainedVertices.stream()
+ .peek(vertex ->
chainedTransformVerticesMapping.put(vertex.getVertexId(), newVertexId))
+ .map(ExecutionVertex::getAction)
.map(action -> (TransformAction) action)
.forEach(action -> {
transforms.add(action.getTransform());
jars.addAll(action.getJarUrls());
names.add(action.getName());
});
- newAction = new TransformChainAction(newId,
+ TransformChainAction transformChainAction = new
TransformChainAction(newVertexId,
String.join("->", names),
jars,
transforms);
-
newAction.setParallelism(logicalVertex.getAction().getParallelism());
- }
- ExecutionVertex executionVertex = new ExecutionVertex(newId,
newAction, logicalVertex.getParallelism());
- executionVertexMap.put(newId, executionVertex);
- logicalToExecutionMap.put(logicalVertex.getVertexId(),
executionVertex.getVertexId());
- }
+
transformChainAction.setParallelism(currentVertex.getAction().getParallelism());
- public static Action recreateAction(Action action, Long id, int
parallelism) {
- Action newAction;
- if (action instanceof ShuffleAction) {
- newAction = new ShuffleAction(id,
- action.getName(),
- ((ShuffleAction) action).getPartitionTransformation(),
- action.getJarUrls());
- } else if (action instanceof SinkAction) {
- newAction = new SinkAction<>(id,
- action.getName(),
- ((SinkAction<?, ?, ?, ?>) action).getSink(),
- action.getJarUrls());
- } else if (action instanceof SourceAction) {
- newAction = new SourceAction<>(id,
- action.getName(),
- ((SourceAction<?, ?, ?>) action).getSource(),
- action.getJarUrls());
- } else if (action instanceof TransformChainAction) {
- newAction = new TransformChainAction(id,
- action.getName(),
- action.getJarUrls(),
- ((TransformChainAction<?>) action).getTransforms());
- } else {
- throw new UnknownActionException(action);
+ ExecutionVertex executionVertex = new ExecutionVertex(newVertexId,
transformChainAction, currentVertex.getParallelism());
+ transformChainVertexMap.put(newVertexId, executionVertex);
+ chainedTransformVerticesMapping.put(currentVertex.getVertexId(),
executionVertex.getVertexId());
}
- newAction.setParallelism(parallelism);
- return newAction;
}
- private void findChainedVertices(LogicalVertex logicalVertex,
List<LogicalVertex> chainedVertices) {
- Action action = logicalVertex.getAction();
+ private void collectChainedVertices(ExecutionVertex currentVertex,
+ List<ExecutionVertex> chainedVertices,
+ Set<ExecutionEdge> executionEdges,
+ Map<Long, List<ExecutionVertex>>
inputVerticesMap,
+ Map<Long, List<ExecutionVertex>>
targetVerticesMap) {
+ Action action = currentVertex.getAction();
// Currently only support Transform action chaining.
if (action instanceof TransformAction) {
if (chainedVertices.size() == 0) {
- chainedVertices.add(logicalVertex);
- } else if
(inputVerticesMap.get(logicalVertex.getVertexId()).size() == 1) {
+ chainedVertices.add(currentVertex);
+ } else if
(inputVerticesMap.get(currentVertex.getVertexId()).size() == 1) {
// It cannot be chained to any input vertex if it has multiple
input vertices.
- logicalEdges.remove(new
LogicalEdge(chainedVertices.get(chainedVertices.size() - 1), logicalVertex));
- chainedVertices.add(logicalVertex);
+ executionEdges.remove(new
ExecutionEdge(chainedVertices.get(chainedVertices.size() - 1), currentVertex));
+ chainedVertices.add(currentVertex);
} else {
return;
}
@@ -225,8 +325,19 @@ public class ExecutionPlanGenerator {
}
// It cannot chain to any target vertex if it has multiple target
vertices.
- if (targetVerticesMap.get(logicalVertex.getVertexId()).size() == 1) {
-
findChainedVertices(targetVerticesMap.get(logicalVertex.getVertexId()).get(0),
chainedVertices);
+ if (targetVerticesMap.get(currentVertex.getVertexId()).size() == 1) {
+
collectChainedVertices(targetVerticesMap.get(currentVertex.getVertexId()).get(0),
+ chainedVertices, executionEdges, inputVerticesMap,
targetVerticesMap);
+ }
+ }
+
+ private List<Pipeline> generatePipelines(Set<ExecutionEdge>
executionEdges) {
+ Set<ExecutionVertex> executionVertices = new HashSet<>();
+ for (ExecutionEdge edge : executionEdges) {
+ executionVertices.add(edge.getLeftVertex());
+ executionVertices.add(edge.getRightVertex());
}
+ return new PipelineGenerator(executionVertices, new
ArrayList<>(executionEdges))
+ .generatePipelines();
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index e9bbf2a49..c55a14e7e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -20,11 +20,15 @@ package org.apache.seatunnel.engine.server.dag.physical;
import static
org.apache.seatunnel.engine.common.config.server.QueueType.BLOCKINGQUEUE;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleMultipleRowStrategy;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.internal.IntermediateQueue;
@@ -36,7 +40,6 @@ import
org.apache.seatunnel.engine.server.dag.execution.ExecutionPlan;
import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
import org.apache.seatunnel.engine.server.dag.physical.config.FlowConfig;
import
org.apache.seatunnel.engine.server.dag.physical.config.IntermediateQueueConfig;
-import org.apache.seatunnel.engine.server.dag.physical.config.PartitionConfig;
import org.apache.seatunnel.engine.server.dag.physical.config.SinkConfig;
import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
import org.apache.seatunnel.engine.server.dag.physical.flow.Flow;
@@ -65,6 +68,7 @@ import lombok.NonNull;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -270,41 +274,102 @@ public class PhysicalPlanGenerator {
private List<PhysicalVertex> getShuffleTask(List<ExecutionEdge> edges,
int pipelineIndex,
int totalPipelineNum) {
- return edges.stream().filter(s -> s.getLeftVertex().getAction()
instanceof ShuffleAction)
+ return edges.stream()
+ .filter(s -> s.getLeftVertex().getAction() instanceof
ShuffleAction)
.map(q -> (ShuffleAction) q.getLeftVertex().getAction())
+ .collect(Collectors.toSet())
+ .stream()
.map(q -> new PhysicalExecutionFlow(q, getNextWrapper(edges, q)))
.flatMap(flow -> {
- List<PhysicalVertex> t = new ArrayList<>();
- long taskIDPrefix = idGenerator.getNextId();
- long taskGroupIDPrefix = idGenerator.getNextId();
- for (int i = 0; i < flow.getAction().getParallelism(); i++) {
- long taskGroupID = mixIDPrefixAndIndex(taskGroupIDPrefix,
i);
- TaskGroupLocation taskGroupLocation =
- new
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex,
taskGroupID);
- TaskLocation taskLocation = new
TaskLocation(taskGroupLocation, taskIDPrefix, i);
- setFlowConfig(flow, i);
- SeaTunnelTask seaTunnelTask =
- new
TransformSeaTunnelTask(jobImmutableInformation.getJobId(), taskLocation, i,
flow);
- // checkpoint
- fillCheckpointPlan(seaTunnelTask);
- t.add(new PhysicalVertex(
- i,
- executorService,
- flow.getAction().getParallelism(),
- new TaskGroupDefaultImpl(taskGroupLocation,
flow.getAction().getName() +
- "-ShuffleTask",
- Lists.newArrayList(seaTunnelTask)),
- flakeIdGenerator,
- pipelineIndex,
- totalPipelineNum,
- seaTunnelTask.getJarsUrl(),
- jobImmutableInformation,
- initializationTimestamp,
- nodeEngine,
- runningJobStateIMap,
- runningJobStateTimestampsIMap));
+ List<PhysicalVertex> physicalVertices = new ArrayList<>();
+
+ ShuffleAction shuffleAction = (ShuffleAction) flow.getAction();
+ ShuffleConfig shuffleConfig = shuffleAction.getConfig();
+ ShuffleStrategy shuffleStrategy =
shuffleConfig.getShuffleStrategy();
+ if (shuffleStrategy instanceof ShuffleMultipleRowStrategy) {
+ ShuffleMultipleRowStrategy shuffleMultipleRowStrategy =
(ShuffleMultipleRowStrategy) shuffleStrategy;
+ for (Flow nextFlow : flow.getNext()) {
+ PhysicalExecutionFlow sinkFlow =
(PhysicalExecutionFlow) nextFlow;
+ SinkAction sinkAction = (SinkAction)
sinkFlow.getAction();
+ String sinkTableId =
sinkAction.getConfig().getMultipleRowTableId();
+ MultipleRowType multipleRowType =
shuffleMultipleRowStrategy.getInputRowType();
+ int sinkTableIndex =
Arrays.asList(multipleRowType.getTableIds()).indexOf(sinkTableId);
+
+ long taskIDPrefix = idGenerator.getNextId();
+ long taskGroupIDPrefix = idGenerator.getNextId();
+ for (int parallelismIndex = 0; parallelismIndex <
flow.getAction().getParallelism(); parallelismIndex++) {
+ ShuffleStrategy shuffleStrategyOfSinkFlow =
shuffleMultipleRowStrategy.toBuilder()
+ .targetTableId(sinkTableId)
+ .build();
+ ShuffleConfig shuffleConfigOfSinkFlow =
shuffleConfig.toBuilder()
+ .shuffleStrategy(shuffleStrategyOfSinkFlow)
+ .build();
+ long shuffleActionId = idGenerator.getNextId();
+ String shuffleActionName = String.format("Shuffle
[table[%s] -> %s]", sinkTableIndex, sinkAction.getName());
+ ShuffleAction shuffleActionOfSinkFlow = new
ShuffleAction(shuffleActionId, shuffleActionName, shuffleConfigOfSinkFlow);
+ shuffleActionOfSinkFlow.setParallelism(1);
+ PhysicalExecutionFlow shuffleFlow = new
PhysicalExecutionFlow(shuffleActionOfSinkFlow,
Collections.singletonList(sinkFlow));
+ setFlowConfig(shuffleFlow, parallelismIndex);
+
+ long taskGroupID =
mixIDPrefixAndIndex(taskGroupIDPrefix, parallelismIndex);
+ TaskGroupLocation taskGroupLocation = new
TaskGroupLocation(
+ jobImmutableInformation.getJobId(),
pipelineIndex, taskGroupID);
+ TaskLocation taskLocation = new
TaskLocation(taskGroupLocation, taskIDPrefix, parallelismIndex);
+ SeaTunnelTask seaTunnelTask = new
TransformSeaTunnelTask(
+ jobImmutableInformation.getJobId(),
taskLocation, parallelismIndex, shuffleFlow);
+
+ // checkpoint
+ fillCheckpointPlan(seaTunnelTask);
+ physicalVertices.add(new PhysicalVertex(
+ parallelismIndex,
+ executorService,
+ shuffleFlow.getAction().getParallelism(),
+ new TaskGroupDefaultImpl(taskGroupLocation,
shuffleFlow.getAction().getName() +
+ "-ShuffleTask",
+ Collections.singletonList(seaTunnelTask)),
+ flakeIdGenerator,
+ pipelineIndex,
+ totalPipelineNum,
+ seaTunnelTask.getJarsUrl(),
+ jobImmutableInformation,
+ initializationTimestamp,
+ nodeEngine,
+ runningJobStateIMap,
+ runningJobStateTimestampsIMap));
+ }
+ }
+ } else {
+ long taskIDPrefix = idGenerator.getNextId();
+ long taskGroupIDPrefix = idGenerator.getNextId();
+ for (int i = 0; i < flow.getAction().getParallelism();
i++) {
+ long taskGroupID =
mixIDPrefixAndIndex(taskGroupIDPrefix, i);
+ TaskGroupLocation taskGroupLocation =
+ new
TaskGroupLocation(jobImmutableInformation.getJobId(), pipelineIndex,
taskGroupID);
+ TaskLocation taskLocation = new
TaskLocation(taskGroupLocation, taskIDPrefix, i);
+ setFlowConfig(flow, i);
+ SeaTunnelTask seaTunnelTask =
+ new
TransformSeaTunnelTask(jobImmutableInformation.getJobId(), taskLocation, i,
flow);
+ // checkpoint
+ fillCheckpointPlan(seaTunnelTask);
+ physicalVertices.add(new PhysicalVertex(
+ i,
+ executorService,
+ flow.getAction().getParallelism(),
+ new TaskGroupDefaultImpl(taskGroupLocation,
flow.getAction().getName() +
+ "-ShuffleTask",
+ Lists.newArrayList(seaTunnelTask)),
+ flakeIdGenerator,
+ pipelineIndex,
+ totalPipelineNum,
+ seaTunnelTask.getJarsUrl(),
+ jobImmutableInformation,
+ initializationTimestamp,
+ nodeEngine,
+ runningJobStateIMap,
+ runningJobStateTimestampsIMap));
+ }
}
- return t.stream();
+ return physicalVertices.stream();
}).collect(Collectors.toList());
}
@@ -463,13 +528,6 @@ public class PhysicalPlanGenerator {
config.setCommitterTask(committerTaskIDMap.get((SinkAction<?, ?, ?, ?>)
flow.getAction()));
}
flow.setConfig(config);
- } else if (flow.getAction() instanceof ShuffleAction) {
- PartitionConfig config =
- new PartitionConfig(
- ((ShuffleAction)
flow.getAction()).getPartitionTransformation().getPartitionCount(),
- ((ShuffleAction)
flow.getAction()).getPartitionTransformation().getTargetCount(),
- parallelismIndex);
- flow.setConfig(config);
}
} else if (f instanceof IntermediateExecutionFlow) {
((IntermediateExecutionFlow<IntermediateQueueConfig>) f)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
index efced807a..3892b47e0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.dag.physical;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
@@ -36,16 +37,16 @@ public class PlanUtils {
public static Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>>
fromLogicalDAG(@NonNull LogicalDag logicalDag,
@NonNull NodeEngine nodeEngine,
-
@NonNull
-
JobImmutableInformation jobImmutableInformation,
+
@NonNull JobImmutableInformation jobImmutableInformation,
long initializationTimestamp,
@NonNull ExecutorService executorService,
@NonNull FlakeIdGenerator flakeIdGenerator,
@NonNull IMap runningJobStateIMap,
@NonNull IMap runningJobStateTimestampsIMap,
-
@NonNull QueueType queueType) {
+
@NonNull QueueType queueType,
+
@NonNull CheckpointConfig checkpointConfig) {
return new PhysicalPlanGenerator(
- new ExecutionPlanGenerator(logicalDag,
jobImmutableInformation).generate(),
+ new ExecutionPlanGenerator(logicalDag, jobImmutableInformation,
checkpointConfig).generate(),
nodeEngine,
jobImmutableInformation,
initializationTimestamp,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
index 41bcbbbbc..1c852218e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskExecutionContext.java
@@ -22,6 +22,7 @@ import
org.apache.seatunnel.engine.server.metrics.MetricsContext;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.hazelcast.cluster.Address;
+import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngineImpl;
@@ -60,4 +61,7 @@ public class TaskExecutionContext {
return (T) task;
}
+ public HazelcastInstance getInstance() {
+ return nodeEngine.getHazelcastInstance();
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index c9352461f..6d53deffe 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -133,6 +133,8 @@ public class JobMaster {
private Map<Integer, CheckpointPlan> checkpointPlanMap;
+ private CheckpointConfig jobCheckpointConfig;
+
public JobMaster(@NonNull Data jobImmutableInformationData,
@NonNull NodeEngine nodeEngine,
@NonNull ExecutorService executorService,
@@ -157,6 +159,9 @@ public class JobMaster {
public void init(long initializationTimestamp) {
jobImmutableInformation =
nodeEngine.getSerializationService().toObject(
jobImmutableInformationData);
+ jobCheckpointConfig =
createJobCheckpointConfig(engineConfig.getCheckpointConfig(),
+ jobImmutableInformation.getJobConfig().getEnvOptions());
+
LOGGER.info(String.format("Init JobMaster for Job %s (%s) ",
jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId()));
LOGGER.info(String.format("Job %s (%s) needed jar urls %s",
jobImmutableInformation.getJobConfig().getName(),
@@ -174,7 +179,8 @@ public class JobMaster {
flakeIdGenerator,
runningJobStateIMap,
runningJobStateTimestampsIMap,
- engineConfig.getQueueType());
+ engineConfig.getQueueType(),
+ jobCheckpointConfig);
this.physicalPlan = planTuple.f0();
this.physicalPlan.setJobMaster(this);
this.checkpointPlanMap = planTuple.f1();
@@ -182,32 +188,33 @@ public class JobMaster {
}
public void initCheckPointManager() throws CheckpointStorageException {
- CheckpointConfig checkpointConfig =
mergeEnvAndEngineConfig(engineConfig.getCheckpointConfig(),
- jobImmutableInformation.getJobConfig().getEnvOptions());
this.checkpointManager = new CheckpointManager(
jobImmutableInformation.getJobId(),
jobImmutableInformation.isStartWithSavePoint(),
nodeEngine,
this,
checkpointPlanMap,
- checkpointConfig);
+ jobCheckpointConfig);
}
// TODO replace it after ReadableConfig Support parse yaml format, then
use only one config to read engine and env config.
- private CheckpointConfig mergeEnvAndEngineConfig(CheckpointConfig engine,
Map<String, Object> env) {
- CheckpointConfig checkpointConfig = new CheckpointConfig();
- if (env.containsKey(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
- checkpointConfig.setCheckpointInterval((Integer)
env.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
+ private CheckpointConfig createJobCheckpointConfig(CheckpointConfig
defaultCheckpointConfig, Map<String, Object> jobEnv) {
+ CheckpointConfig jobCheckpointConfig = new CheckpointConfig();
+
jobCheckpointConfig.setCheckpointTimeout(defaultCheckpointConfig.getCheckpointTimeout());
+
jobCheckpointConfig.setCheckpointInterval(defaultCheckpointConfig.getCheckpointInterval());
+
jobCheckpointConfig.setMaxConcurrentCheckpoints(defaultCheckpointConfig.getMaxConcurrentCheckpoints());
+
jobCheckpointConfig.setTolerableFailureCheckpoints(defaultCheckpointConfig.getTolerableFailureCheckpoints());
+
+ CheckpointStorageConfig jobCheckpointStorageConfig = new
CheckpointStorageConfig();
+
jobCheckpointStorageConfig.setStorage(defaultCheckpointConfig.getStorage().getStorage());
+
jobCheckpointStorageConfig.setStoragePluginConfig(defaultCheckpointConfig.getStorage().getStoragePluginConfig());
+
jobCheckpointStorageConfig.setMaxRetainedCheckpoints(defaultCheckpointConfig.getStorage().getMaxRetainedCheckpoints());
+ jobCheckpointConfig.setStorage(jobCheckpointStorageConfig);
+
+ if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
+ jobCheckpointConfig.setCheckpointInterval((Integer)
jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
}
- checkpointConfig.setCheckpointTimeout(engine.getCheckpointTimeout());
-
checkpointConfig.setTolerableFailureCheckpoints(engine.getTolerableFailureCheckpoints());
-
checkpointConfig.setMaxConcurrentCheckpoints(engine.getMaxConcurrentCheckpoints());
- CheckpointStorageConfig storageConfig = new CheckpointStorageConfig();
-
storageConfig.setMaxRetainedCheckpoints(engine.getStorage().getMaxRetainedCheckpoints());
- storageConfig.setStorage(engine.getStorage().getStorage());
-
storageConfig.setStoragePluginConfig(engine.getStorage().getStoragePluginConfig());
- checkpointConfig.setStorage(storageConfig);
- return checkpointConfig;
+ return jobCheckpointConfig;
}
public void initStateFuture() {
@@ -257,7 +264,8 @@ public class JobMaster {
public JobDAGInfo getJobDAGInfo() {
if (jobDAGInfo == null) {
- jobDAGInfo = DAGUtils.getJobDAGInfo(logicalDag,
jobImmutableInformation, isPhysicalDAGIInfo);
+ jobDAGInfo = DAGUtils.getJobDAGInfo(logicalDag,
+ jobImmutableInformation, engineConfig.getCheckpointConfig(),
isPhysicalDAGIInfo);
}
return jobDAGInfo;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index e23dbc871..11691432d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -34,6 +34,7 @@ import
org.apache.seatunnel.common.utils.function.ConsumerWithException;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.ShuffleAction;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
@@ -64,6 +65,7 @@ import
org.apache.seatunnel.engine.server.task.group.AbstractTaskGroupWithInterm
import org.apache.seatunnel.engine.server.task.record.Barrier;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
+import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import lombok.NonNull;
@@ -213,12 +215,17 @@ public abstract class SeaTunnelTask extends AbstractTask {
new
TransformFlowLifeCycle<SeaTunnelRow>((TransformChainAction) f.getAction(), this,
new SeaTunnelTransformCollector(flowLifeCycles),
completableFuture);
} else if (f.getAction() instanceof ShuffleAction) {
- // TODO use index and taskID to create ringbuffer list
+ ShuffleAction shuffleAction = (ShuffleAction) f.getAction();
+ ShuffleConfig shuffleConfig = shuffleAction.getConfig();
+ HazelcastInstance hazelcastInstance =
getExecutionContext().getInstance();
if (flow.getNext().isEmpty()) {
- lifeCycle = new ShuffleSinkFlowLifeCycle(this,
completableFuture);
+ lifeCycle = new ShuffleSinkFlowLifeCycle(
+ this, indexID, shuffleConfig, hazelcastInstance,
completableFuture);
} else {
- lifeCycle = new ShuffleSourceFlowLifeCycle(this,
completableFuture);
+ lifeCycle = new ShuffleSourceFlowLifeCycle(
+ this, indexID, shuffleConfig, hazelcastInstance,
completableFuture);
}
+ outputs = flowLifeCycles;
} else {
throw new UnknownActionException(f.getAction());
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
index ce2eff995..546ee782c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSinkFlowLifeCycle.java
@@ -17,20 +17,15 @@
package org.apache.seatunnel.engine.server.task.flow;
-import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleStrategy;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import com.hazelcast.collection.IQueue;
-import com.hazelcast.config.QueueConfig;
import com.hazelcast.core.HazelcastInstance;
-import lombok.AllArgsConstructor;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.RandomStringUtils;
import java.io.IOException;
import java.util.HashMap;
@@ -44,20 +39,29 @@ import java.util.concurrent.TimeUnit;
@SuppressWarnings("MagicNumber")
@Slf4j
public class ShuffleSinkFlowLifeCycle extends AbstractFlowLifeCycle implements
OneInputFlowLifeCycle<Record<?>> {
- private Map<String, IQueue<Record<?>>> shuffles;
- private long shuffleBufferTimesMillis = 1000;
- private int shuffleBufferSize = 1024;
- private int shuffleBatchSize = 1024;
- private Map<String, Queue<Record<?>>> shuffleBuffer;
- private ShuffleStrategy shuffleStrategy;
+ private final int pipelineId;
+ private final int taskIndex;
+ private final Map<String, IQueue<Record<?>>> shuffles;
+ private final int shuffleBatchSize;
+ private final long shuffleBatchFlushInterval;
+ private final Map<String, Queue<Record<?>>> shuffleBuffer;
+ private final ShuffleStrategy shuffleStrategy;
+ private int shuffleBufferSize;
private long lastModify;
public ShuffleSinkFlowLifeCycle(SeaTunnelTask runningTask,
+ int taskIndex,
+ ShuffleConfig shuffleConfig,
+ HazelcastInstance hazelcastInstance,
CompletableFuture<Void> completableFuture)
{
super(runningTask, completableFuture);
- // todo initialize shuffleStrategy
- this.shuffleStrategy = null;
- this.shuffles = shuffleStrategy.createShuffles();
+ this.pipelineId =
runningTask.getTaskLocation().getTaskGroupLocation().getPipelineId();
+ this.taskIndex = taskIndex;
+ this.shuffleStrategy = shuffleConfig.getShuffleStrategy();
+ this.shuffles = shuffleStrategy.createShuffles(hazelcastInstance,
pipelineId, taskIndex);
+ this.shuffleBatchSize = shuffleConfig.getBatchSize();
+ this.shuffleBatchFlushInterval = shuffleConfig.getBatchFlushInterval();
+ this.shuffleBuffer = new HashMap<>();
}
@Override
@@ -92,8 +96,9 @@ public class ShuffleSinkFlowLifeCycle extends
AbstractFlowLifeCycle implements O
@Override
public void close() throws IOException {
super.close();
- for (Map.Entry<String, IQueue<Record<?>>> shuffleBatch :
shuffles.entrySet()) {
- shuffleBatch.getValue().destroy();
+ for (Map.Entry<String, IQueue<Record<?>>> shuffleItem :
shuffles.entrySet()) {
+ log.info("destroy shuffle queue[{}]", shuffleItem.getKey());
+ shuffleItem.getValue().destroy();
}
}
@@ -105,7 +110,7 @@ public class ShuffleSinkFlowLifeCycle extends
AbstractFlowLifeCycle implements O
public void run() {
if (!prepareClose
&& shuffleBufferSize > 0
- && System.currentTimeMillis() - lastModify >
shuffleBufferTimesMillis) {
+ && System.currentTimeMillis() - lastModify >
shuffleBatchFlushInterval) {
try {
shuffleFlush();
@@ -117,24 +122,24 @@ public class ShuffleSinkFlowLifeCycle extends
AbstractFlowLifeCycle implements O
// submit next task
if (!prepareClose) {
Runnable nextScheduleFlushTask = this;
- scheduledExecutorService.schedule(nextScheduleFlushTask,
shuffleBufferTimesMillis, TimeUnit.MILLISECONDS);
+ scheduledExecutorService.schedule(nextScheduleFlushTask,
shuffleBatchFlushInterval, TimeUnit.MILLISECONDS);
} else {
completedFuture.complete(true);
}
}
};
- scheduledExecutorService.schedule(scheduleFlushTask,
shuffleBufferTimesMillis, TimeUnit.MILLISECONDS);
+ scheduledExecutorService.schedule(scheduleFlushTask,
shuffleBatchFlushInterval, TimeUnit.MILLISECONDS);
return completedFuture;
}
private synchronized void shuffleItem(Record<?> record) {
- String shuffleKey = shuffleStrategy.extractShuffleKey(record);
- shuffleBuffer.compute(shuffleKey, (key, records) -> new LinkedList<>())
+ String shuffleKey = shuffleStrategy.createShuffleKey(record,
pipelineId, taskIndex);
+ shuffleBuffer.computeIfAbsent(shuffleKey, key -> new LinkedList<>())
.add(record);
shuffleBufferSize++;
if (shuffleBufferSize >= shuffleBatchSize
- || (shuffleBufferSize > 1 && System.currentTimeMillis() -
lastModify > shuffleBufferTimesMillis)) {
+ || (shuffleBufferSize > 1 && System.currentTimeMillis() -
lastModify > shuffleBatchFlushInterval)) {
shuffleFlush();
}
@@ -143,7 +148,7 @@ public class ShuffleSinkFlowLifeCycle extends
AbstractFlowLifeCycle implements O
private synchronized void shuffleFlush() {
for (Map.Entry<String, Queue<Record<?>>> shuffleBatch :
shuffleBuffer.entrySet()) {
- IQueue<Record<?>> shuffleQueue =
shuffleQueue(shuffleBatch.getKey());
+ IQueue<Record<?>> shuffleQueue =
shuffles.get(shuffleBatch.getKey());
Queue<Record<?>> shuffleQueueBatch = shuffleBatch.getValue();
if (!shuffleQueue.addAll(shuffleBatch.getValue())) {
for (; ;) {
@@ -162,73 +167,4 @@ public class ShuffleSinkFlowLifeCycle extends
AbstractFlowLifeCycle implements O
}
shuffleBufferSize = 0;
}
-
- private IQueue<Record<?>> shuffleQueue(String shuffleKey) {
- return shuffles.get(shuffleKey);
- }
-
- interface ShuffleStrategy {
- Map<String, IQueue<Record<?>>> createShuffles();
-
- String extractShuffleKey(Record<?> record);
- }
-
- @RequiredArgsConstructor
- @AllArgsConstructor
- public static class PartitionShuffle implements ShuffleStrategy {
- private final HazelcastInstance hazelcast;
- private final String jobId;
- private final int partitionNumber;
- private QueueConfig queueConfig;
-
- @Override
- public Map<String, IQueue<Record<?>>> createShuffles() {
- Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
- for (int i = 0; i < partitionNumber; i++) {
- String queueName = String.format("PartitionShuffle[%s-%s]",
jobId, i);
- QueueConfig queueConfig =
hazelcast.getConfig().getQueueConfig(queueName);
- queueConfig.setMaxSize(queueConfig.getMaxSize())
- .setBackupCount(queueConfig.getBackupCount())
- .setAsyncBackupCount(queueConfig.getAsyncBackupCount())
- .setEmptyQueueTtl(queueConfig.getEmptyQueueTtl());
- shuffleMap.put(String.valueOf(i),
hazelcast.getQueue(queueName));
- }
- return shuffleMap;
- }
-
- @Override
- public String extractShuffleKey(Record<?> record) {
- return RandomStringUtils.random(partitionNumber);
- }
- }
-
- @RequiredArgsConstructor
- @AllArgsConstructor
- public static class MultipleRowShuffle implements ShuffleStrategy {
- private final HazelcastInstance hazelcast;
- private final String jobId;
- private final int parallelismIndex;
- private final MultipleRowType multipleRowType;
- private QueueConfig queueConfig;
-
- @Override
- public Map<String, IQueue<Record<?>>> createShuffles() {
- Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
- for (Map.Entry<String, SeaTunnelRowType> entry : multipleRowType) {
- String queueName =
String.format("MultipleRowShuffle[%s-%s-%s]", jobId, parallelismIndex,
entry.getKey());
- QueueConfig queueConfig =
hazelcast.getConfig().getQueueConfig(queueName);
- queueConfig.setMaxSize(queueConfig.getMaxSize())
- .setBackupCount(queueConfig.getBackupCount())
- .setAsyncBackupCount(queueConfig.getAsyncBackupCount())
- .setEmptyQueueTtl(queueConfig.getEmptyQueueTtl());
- shuffleMap.put(entry.getKey(), hazelcast.getQueue(queueName));
- }
- return shuffleMap;
- }
-
- @Override
- public String extractShuffleKey(Record<?> record) {
- return ((SeaTunnelRow) record.getData()).getTableId();
- }
- }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
index c95f421df..43c450f32 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/ShuffleSourceFlowLifeCycle.java
@@ -19,10 +19,12 @@ package org.apache.seatunnel.engine.server.task.flow;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.engine.core.dag.actions.ShuffleConfig;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import com.hazelcast.collection.IQueue;
+import com.hazelcast.core.HazelcastInstance;
import java.io.IOException;
import java.util.HashMap;
@@ -33,17 +35,22 @@ import java.util.concurrent.CompletableFuture;
@SuppressWarnings("MagicNumber")
public class ShuffleSourceFlowLifeCycle<T> extends AbstractFlowLifeCycle
implements OneOutputFlowLifeCycle<Record<?>> {
- private int shuffleBatchSize = 1024;
- private IQueue<Record<?>>[] shuffles;
+ private final int shuffleBatchSize;
+ private final IQueue<Record<?>>[] shuffles;
private List<Record<?>> unsentBuffer;
private final Map<Integer, Barrier> alignedBarriers = new HashMap<>();
private long currentCheckpointId = Long.MAX_VALUE;
private int alignedBarriersCounter = 0;
public ShuffleSourceFlowLifeCycle(SeaTunnelTask runningTask,
+ int taskIndex,
+ ShuffleConfig shuffleConfig,
+ HazelcastInstance hazelcastInstance,
CompletableFuture<Void>
completableFuture) {
super(runningTask, completableFuture);
- // todo initialize shuffles
+ int pipelineId = runningTask.getTaskLocation().getPipelineId();
+ this.shuffles =
shuffleConfig.getShuffleStrategy().getShuffles(hazelcastInstance, pipelineId,
taskIndex);
+ this.shuffleBatchSize = shuffleConfig.getBatchSize();
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
index c9f802489..7ffa535f0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/TestUtils.java
@@ -33,11 +33,16 @@ import
org.apache.seatunnel.engine.core.dag.logical.LogicalEdge;
import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
import org.apache.seatunnel.engine.core.parse.JobConfigParser;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.tuple.ImmutablePair;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -51,6 +56,11 @@ public class TestUtils {
IdGenerator idGenerator = new IdGenerator();
FakeSource fakeSource = new FakeSource();
fakeSource.setJobContext(jobContext);
+ Config fakeSourceConfig = ConfigFactory.parseMap(
+ Collections.singletonMap("schema",
+ Collections.singletonMap("fields",
+ ImmutableMap.of("id", "int", "name", "string"))));
+ fakeSource.prepare(fakeSourceConfig);
Action fake = new SourceAction<>(idGenerator.getNextId(), "fake",
fakeSource,
Sets.newHashSet(new URL("file:///fake.jar")));
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
index 77da5b8f5..5c3b25697 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -35,6 +36,10 @@ import
org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import com.google.common.collect.ImmutableMap;
import com.hazelcast.map.IMap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -69,7 +74,8 @@ public class CheckpointPlanTest extends
AbstractSeaTunnelServerTest {
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
runningJobState,
runningJobStateTimestamp,
- QueueType.BLOCKINGQUEUE).f1();
+ QueueType.BLOCKINGQUEUE,
+ new CheckpointConfig()).f1();
Assertions.assertNotNull(checkpointPlans);
Assertions.assertEquals(2, checkpointPlans.size());
// enum(1) + reader(2) + writer(2)
@@ -90,6 +96,11 @@ public class CheckpointPlanTest extends
AbstractSeaTunnelServerTest {
JobContext jobContext = new JobContext();
jobContext.setJobMode(JobMode.BATCH);
FakeSource fakeSource = new FakeSource();
+ Config fakeSourceConfig = ConfigFactory.parseMap(
+ Collections.singletonMap("schema",
+ Collections.singletonMap("fields",
+ ImmutableMap.of("id", "int", "name", "string"))));
+ fakeSource.prepare(fakeSourceConfig);
fakeSource.setJobContext(jobContext);
Action fake = new SourceAction<>(idGenerator.getNextId(), "fake",
fakeSource, Collections.emptySet());
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index 7bec42e2b..bc0d5bedd 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -38,6 +39,10 @@ import org.apache.seatunnel.engine.server.TestUtils;
import org.apache.seatunnel.engine.server.dag.physical.PhysicalPlan;
import org.apache.seatunnel.engine.server.dag.physical.PlanUtils;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.hazelcast.map.IMap;
import org.junit.jupiter.api.Assertions;
@@ -74,11 +79,11 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
IdGenerator idGenerator = new IdGenerator();
- Action fake = new SourceAction<>(idGenerator.getNextId(), "fake", new
FakeSource(),
+ Action fake = new SourceAction<>(idGenerator.getNextId(), "fake",
createFakeSource(),
Sets.newHashSet(new URL("file:///fake.jar")));
LogicalVertex fakeVertex = new LogicalVertex(fake.getId(), fake, 2);
- Action fake2 = new SourceAction<>(idGenerator.getNextId(), "fake", new
FakeSource(),
+ Action fake2 = new SourceAction<>(idGenerator.getNextId(), "fake",
createFakeSource(),
Sets.newHashSet(new URL("file:///fake.jar")));
LogicalVertex fake2Vertex = new LogicalVertex(fake2.getId(), fake2, 2);
@@ -110,10 +115,21 @@ public class TaskTest extends AbstractSeaTunnelServerTest
{
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
runningJobState,
runningJobStateTimestamp,
- QueueType.BLOCKINGQUEUE).f0();
+ QueueType.BLOCKINGQUEUE,
+ new CheckpointConfig()).f0();
Assertions.assertEquals(physicalPlan.getPipelineList().size(), 1);
Assertions.assertEquals(physicalPlan.getPipelineList().get(0).getCoordinatorVertexList().size(),
1);
Assertions.assertEquals(physicalPlan.getPipelineList().get(0).getPhysicalVertexList().size(),
2);
}
+
+ private static FakeSource createFakeSource() {
+ FakeSource fakeSource = new FakeSource();
+ Config fakeSourceConfig = ConfigFactory.parseMap(
+ Collections.singletonMap("schema",
+ Collections.singletonMap("fields",
+ ImmutableMap.of("id", "int", "name", "string"))));
+ fakeSource.prepare(fakeSourceConfig);
+ return fakeSource;
+ }
}