TyrantLucifer commented on code in PR #9576: URL: https://github.com/apache/seatunnel/pull/9576#discussion_r2207282549
########## seatunnel-core/seatunnel-flink-starter/seatunnel-flink-20-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java: ########## @@ -0,0 +1,248 @@ +package org.apache.seatunnel.core.starter.flink.execution; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigUtil; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory; + +import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.options.EnvCommonOptions; +import org.apache.seatunnel.common.Constants; +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.config.TypesafeConfigUtils; +import org.apache.seatunnel.common.constants.JobMode; +import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.core.starter.exception.TaskExecuteException; +import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor; +import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment; +import org.apache.seatunnel.core.starter.execution.TaskExecution; +import org.apache.seatunnel.core.starter.flink.FlinkStarter; +import org.apache.seatunnel.translation.flink.metric.FlinkJobMetricsSummary; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.RuntimeExecutionMode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** Flink 1.20 专用的执行类 */ +public class FlinkExecution implements TaskExecution { + + private static final Logger LOGGER = LoggerFactory.getLogger(FlinkExecution.class); + + private final FlinkRuntimeEnvironment flinkRuntimeEnvironment; + private final PluginExecuteProcessor<DataStreamTableInfo, FlinkRuntimeEnvironment> + sourcePluginExecuteProcessor; + private final PluginExecuteProcessor<DataStreamTableInfo, FlinkRuntimeEnvironment> + transformPluginExecuteProcessor; + private final PluginExecuteProcessor<DataStreamTableInfo, FlinkRuntimeEnvironment> + sinkPluginExecuteProcessor; + private final List<URL> jarPaths; + + public FlinkExecution(Config config) { + try { + jarPaths = + new ArrayList<>( + Collections.singletonList( + new File( + Common.appStarterDir() + .resolve(FlinkStarter.APP_JAR_NAME) + .toString()) + .toURI() + .toURL())); + } catch (MalformedURLException e) { + throw new SeaTunnelException("load flink starter error.", e); + } + Config envConfig = config.getConfig("env"); + registerPlugin(envConfig); + JobContext jobContext = new JobContext(); + jobContext.setJobMode(RuntimeEnvironment.getJobMode(config)); + jobContext.setEnableCheckpoint(RuntimeEnvironment.getEnableCheckpoint(config)); + + this.sourcePluginExecuteProcessor = + new SourceExecuteProcessor( + jarPaths, envConfig, config.getConfigList(Constants.SOURCE), jobContext); + this.transformPluginExecuteProcessor = + new TransformExecuteProcessor( + jarPaths, + envConfig, + TypesafeConfigUtils.getConfigList( + config, Constants.TRANSFORM, Collections.emptyList()), + jobContext); + this.sinkPluginExecuteProcessor = + new SinkExecuteProcessor( + jarPaths, envConfig, config.getConfigList(Constants.SINK), jobContext); + + this.flinkRuntimeEnvironment = + FlinkRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths)); + + this.sourcePluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment); + this.transformPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment); + this.sinkPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment); + + LOGGER.info("Initialized Flink 1.20 execution environment"); + } + + @Override + public void execute() throws TaskExecuteException { + LOGGER.info("Executing Flink 1.20 job"); + + List<DataStreamTableInfo> dataStreams = new ArrayList<>(); + dataStreams = sourcePluginExecuteProcessor.execute(dataStreams); + dataStreams = transformPluginExecuteProcessor.execute(dataStreams); + sinkPluginExecuteProcessor.execute(dataStreams); + LOGGER.info( + "Flink Execution Plan: {}", + flinkRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan()); + LOGGER.info("Flink job name: {}", flinkRuntimeEnvironment.getJobName()); + if (!flinkRuntimeEnvironment.isStreaming()) { + flinkRuntimeEnvironment + .getStreamExecutionEnvironment() + .setRuntimeMode(RuntimeExecutionMode.BATCH); + LOGGER.info("Flink job Mode: {}", JobMode.BATCH); + } + try { + final long jobStartTime = System.currentTimeMillis(); + JobExecutionResult jobResult = + flinkRuntimeEnvironment + .getStreamExecutionEnvironment() + .execute(flinkRuntimeEnvironment.getJobName()); + final long jobEndTime = System.currentTimeMillis(); + + // 这里是Flink 1.20特有的处理逻辑 + final FlinkJobMetricsSummary jobMetricsSummary = + createFlink20JobMetricsSummary(jobResult, jobStartTime, jobEndTime); + + LOGGER.info("Job finished, execution result: \n{}", jobMetricsSummary); + } catch (Exception e) { + throw new TaskExecuteException("Execute Flink job error", e); + } + } + + /** 创建Flink 1.20专用的FlinkJobMetricsSummary */ Review Comment: Please use english instead of it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org