MrLiuzy commented on code in PR #10272:
URL: https://github.com/apache/seatunnel/pull/10272#discussion_r2661011773


##########
seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineEmbeddedExample.java:
##########
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.example.engine;
+
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.org.apache.commons.lang3.ArrayUtils;
+
+import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.options.EnvCommonOptions;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.EngineConfig;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.job.JobResult;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.server.CoordinatorService;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.rest.RestJobExecutionEnvironment;
+
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.internal.serialization.Data;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * Embed the Zeta engine as a tool in the project. Start a local cluster 
instance in Cluster mode
+ * and use it to submit tasks, manage tasks, and query task metrics, etc.
+ */
+public class SeaTunnelEngineEmbeddedExample implements AutoCloseable {
+
+    public static void main(String[] args) {
+        try (SeaTunnelEngineEmbeddedExample server = new 
SeaTunnelEngineEmbeddedExample(); ) {
+            server.start();
+            String json =
+                    "{\r\n"
+                            + "    \"env\": {\r\n"
+                            + "        \"job.mode\": \"batch\"\r\n"
+                            + "    },\r\n"
+                            + "    \"source\": [\r\n"
+                            + "        {\r\n"
+                            + "            \"plugin_name\": 
\"FakeSource\",\r\n"
+                            + "            \"plugin_output\": \"fake\",\r\n"
+                            + "            \"row.num\": 100,\r\n"
+                            + "            \"schema\": {\r\n"
+                            + "                \"fields\": {\r\n"
+                            + "                    \"name\": \"string\",\r\n"
+                            + "                    \"age\": \"int\",\r\n"
+                            + "                    \"card\": \"int\"\r\n"
+                            + "                }\r\n"
+                            + "            }\r\n"
+                            + "        }\r\n"
+                            + "    ],\r\n"
+                            + "    \"transform\": [\r\n"
+                            + "    ],\r\n"
+                            + "    \"sink\": [\r\n"
+                            + "        {\r\n"
+                            + "            \"plugin_name\": \"Console\",\r\n"
+                            + "            \"plugin_input\": [\"fake\"]\r\n"
+                            + "        }\r\n"
+                            + "    ]\r\n"
+                            + "}";
+            long jobId = server.submitJob("demo", json);
+            System.err.println("jobId: " + jobId);
+
+            JobResult jobResult = server.waitForJobComplete(jobId);
+            System.err.println("------------jobResult-------------");
+            System.err.println("jobStatus: " + jobResult.getStatus());
+            System.err.println("jobError: " + jobResult.getError());
+
+            Map<String, Object> jobMetricsSummary = 
server.getJobMetricsSummary(jobId);
+            System.err.println("------------jobMetricsSummary-------------");
+            System.err.println(JsonUtils.toJsonString(jobMetricsSummary));
+        }
+    }
+
+    private HazelcastInstanceImpl hazelcastInstance;
+    private SeaTunnelServer seaTunnelServer;
+
+    public synchronized void start() {
+        if (this.seaTunnelServer != null) {
+            return;
+        }
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        seaTunnelConfig
+                .getEngineConfig()
+                .setClusterRole(EngineConfig.ClusterRole.MASTER_AND_WORKER);
+        HazelcastInstanceImpl hazelcastInstance =
+                SeaTunnelServerStarter.createHazelcastInstance(
+                        seaTunnelConfig, "Zeta-" + UUID.randomUUID());
+        this.hazelcastInstance = hazelcastInstance;
+
+        Map<String, Object> extensionServices =
+                
hazelcastInstance.node.getNodeExtension().createExtensionServices();
+        SeaTunnelServer seaTunnelServer =
+                (SeaTunnelServer) 
extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME);
+        this.seaTunnelServer = seaTunnelServer;
+    }
+
+    public synchronized void close() {
+        if (this.hazelcastInstance != null) {
+            this.hazelcastInstance.shutdown();
+            this.hazelcastInstance = null;
+            this.seaTunnelServer = null;
+        }
+    }
+
+    public long submitJob(String jobName, String seatunnelJobConfig) {
+        return this.submitJob(jobName, 
JsonUtils.parseObject(seatunnelJobConfig));
+    }
+
+    public long submitJob(String jobName, ObjectNode seatunnelJobConfig) {
+        return this.submitJob(jobName, JsonUtils.toMap(seatunnelJobConfig));
+    }
+
+    public long submitJob(String jobName, Map<String, Object> 
seatunnelJobConfig) {
+        Config config = ConfigBuilder.of(seatunnelJobConfig);
+        ReadonlyConfig envOptions = 
ReadonlyConfig.fromConfig(config.getConfig("env"));
+        String nameFromConfig = envOptions.get(EnvCommonOptions.JOB_NAME);
+        if (nameFromConfig != null) {
+            jobName = nameFromConfig;
+        }
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName(jobName);
+        RestJobExecutionEnvironment restJobExecutionEnvironment =
+                new RestJobExecutionEnvironment(
+                        seaTunnelServer,
+                        jobConfig,
+                        config,
+                        this.hazelcastInstance.node,
+                        false,
+                        null);
+        JobImmutableInformation jobImmutableInformation = 
restJobExecutionEnvironment.build();
+        long jobId = jobImmutableInformation.getJobId();
+        CoordinatorService coordinatorService = 
seaTunnelServer.getCoordinatorService();
+        Data data =
+                this.hazelcastInstance
+                        .node
+                        .nodeEngine
+                        .getSerializationService()
+                        .toData(jobImmutableInformation);
+        PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+                coordinatorService.submitJob(
+                        Long.parseLong(jobConfig.getJobContext().getJobId()),
+                        data,
+                        jobImmutableInformation.isStartWithSavePoint());
+        voidPassiveCompletableFuture.join();
+        return jobId;
+    }
+
+    public JobResult waitForJobComplete(long jobId) {
+        CoordinatorService coordinatorService = 
seaTunnelServer.getCoordinatorService();
+        PassiveCompletableFuture<JobResult> waitForJobCompleteFuture =
+                coordinatorService.waitForJobComplete(jobId);
+        JobResult jobResult = waitForJobCompleteFuture.join();
+        return jobResult;
+    }
+
+    public Map<String, Object> getJobMetricsSummary(long jobId) {
+        CoordinatorService coordinatorService = 
seaTunnelServer.getCoordinatorService();
+        JobMetrics jobMetrics = coordinatorService.getJobMetrics(jobId);
+        return JobMetricsParser.getJobMetrics(jobMetrics);
+    }
+
+    public static class JobMetricsParser {

Review Comment:
   SeaTunnel is a comprehensive software. However, in actual projects, the 
requirements I encountered were that some functions needed to achieve data 
transmission between different databases and collect data transmission 
indicators at the same time. I focused on researching DataX and SeaTunnel and 
found that SeaTunnel is relatively easy to be introduced into the project as a 
maven dependency and used as a tool rather than as a complete application, 
which is highly consistent with my requirements. I carefully reviewed the 
official documentation, drew on the implementation method of the rest api, 
wrote this sample code and confirmed that it was indeed feasible. Since I 
hadn't found similar usage experiences on the Internet before, I also asked 
large models like deepseek and Doubao, but still didn't get satisfactory 
answers. I think perhaps others might also encounter similar usage scenarios. 
Since I have found a feasible solution, I hope it can offer some reference to 
others



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to