This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cf45d1dd33 [BUG][Zeta]job name Display error #6470 (#6471)
cf45d1dd33 is described below
commit cf45d1dd331c0637bb102bac2ef554626184c2cb
Author: loveyang1990 <[email protected]>
AuthorDate: Mon Mar 11 17:12:54 2024 +0800
[BUG][Zeta]job name Display error #6470 (#6471)
---
.../seatunnel/engine/e2e/JobExecutionIT.java | 20 +++++++++
.../src/test/resources/valid_job_name.conf | 49 ++++++++++++++++++++++
.../client/job/ClientJobExecutionEnvironment.java | 3 +-
.../core/parse/MultipleTableJobConfigParser.java | 3 +-
4 files changed, 73 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index f67dd22385..0d1a647ded 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -153,6 +153,26 @@ public class JobExecutionIT {
}
}
+ @Test
+ public void testValidJobNameInJobConfig() throws ExecutionException,
InterruptedException {
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath = TestUtils.getResource("valid_job_name.conf");
+ JobConfig jobConfig = new JobConfig();
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
+ try (SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig))
{
+ ClientJobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(filePath, jobConfig,
SEATUNNEL_CONFIG);
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+ CompletableFuture<JobStatus> completableFuture =
+
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
+ await().atMost(600000, TimeUnit.MILLISECONDS)
+ .untilAsserted(() ->
Assertions.assertTrue(completableFuture.isDone()));
+ String value = engineClient.getJobClient().listJobStatus(false);
+
Assertions.assertTrue(value.contains("\"jobName\":\"valid_job_name\""));
+ }
+ }
+
@Test
public void testGetUnKnownJobID() {
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/valid_job_name.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/valid_job_name.conf
new file mode 100644
index 0000000000..11e25cb0d7
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/valid_job_name.conf
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ job.mode = "BATCH"
+ job.name = "valid_job_name"
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ parallelism = 4
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+}
+
+transform {
+}
+
+sink {
+ console {
+ source_table_name="fake"
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
index 7d9561edd0..8e0f0c689b 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java
@@ -153,12 +153,13 @@ public class ClientJobExecutionEnvironment extends
AbstractJobEnvironment {
}
public ClientJobProxy execute() throws ExecutionException,
InterruptedException {
+ LogicalDag logicalDag = getLogicalDag();
JobImmutableInformation jobImmutableInformation =
new JobImmutableInformation(
Long.parseLong(jobConfig.getJobContext().getJobId()),
jobConfig.getName(),
isStartWithSavePoint,
-
seaTunnelHazelcastClient.getSerializationService().toData(getLogicalDag()),
+
seaTunnelHazelcastClient.getSerializationService().toData(logicalDag),
jobConfig,
new ArrayList<>(jarUrls),
new ArrayList<>(connectorJarIdentifiers));
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 137da2e0a1..0a32b0cf00 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -251,7 +251,8 @@ public class MultipleTableJobConfigParser {
private void fillJobConfig() {
jobConfig.getJobContext().setJobMode(envOptions.get(EnvCommonOptions.JOB_MODE));
if (StringUtils.isEmpty(jobConfig.getName())
- || jobConfig.getName().equals(Constants.LOGO)) {
+ || jobConfig.getName().equals(Constants.LOGO)
+ ||
jobConfig.getName().equals(EnvCommonOptions.JOB_NAME.defaultValue())) {
jobConfig.setName(envOptions.get(EnvCommonOptions.JOB_NAME));
}
envOptions