This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new c0599f55bc [Improve][Zeta] Refactor jar package service module (#5763)
c0599f55bc is described below

commit c0599f55bc9133d8bba19f69680c8a2a99a17074
Author: Jia Fan <[email protected]>
AuthorDate: Tue Nov 7 13:36:00 2023 +0800

    [Improve][Zeta] Refactor jar package service module (#5763)
---
 .../engine/client/job/ConnectorPackageClient.java  | 80 +++++++++-------------
 .../common/config/server/ServerConfigOptions.java  | 10 +--
 .../seatunnel/engine/core/job/ConnectorJar.java    |  2 +-
 .../engine/core/job/ConnectorJarIdentifier.java    | 12 ++--
 .../engine/server/CoordinatorService.java          |  2 +-
 .../seatunnel/engine/server/SeaTunnelServer.java   |  2 +-
 .../engine/server/TaskExecutionService.java        |  2 +-
 .../seatunnel/engine/server/master/JobMaster.java  | 14 ++--
 .../jar}/AbstractConnectorJarStorageStrategy.java  | 33 ++-------
 .../jar}/ConnectorJarStorageStrategy.java          | 29 ++------
 .../jar}/ConnectorPackageService.java              | 29 ++++----
 .../jar}/IsolatedConnectorJarStorageStrategy.java  | 24 ++-----
 .../jar}/ServerConnectorPackageClient.java         |  2 +-
 .../jar}/SharedConnectorJarCleanupTask.java        |  7 +-
 .../jar}/SharedConnectorJarStorageStrategy.java    | 41 +++--------
 .../jar}/StorageStrategyFactory.java               |  2 +-
 .../DeleteConnectorJarInExecutionNode.java         |  2 +-
 .../SendConnectorJarToMemberNodeOperation.java     |  2 +-
 .../engine/server/ConnectorPackageServiceTest.java |  2 +-
 19 files changed, 97 insertions(+), 200 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorPackageClient.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorPackageClient.java
index 84555efb82..50fe2e111b 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorPackageClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorPackageClient.java
@@ -24,12 +24,7 @@ import 
org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
 import org.apache.seatunnel.engine.core.job.ConnectorJarType;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelUploadConnectorJarCodec;
 
-import com.hazelcast.logging.ILogger;
-import com.hazelcast.logging.Logger;
-
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -43,8 +38,6 @@ import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.ch
 
 public class ConnectorPackageClient {
 
-    private static final ILogger LOGGER = 
Logger.getLogger(ConnectorPackageClient.class);
-
     private final SeaTunnelHazelcastClient hazelcastClient;
 
     public ConnectorPackageClient(SeaTunnelHazelcastClient hazelcastClient) {
@@ -57,10 +50,16 @@ public class ConnectorPackageClient {
         Set<ConnectorJarIdentifier> connectorJarIdentifiers = new HashSet<>();
         // Upload commonPluginJar
         for (URL commonPluginJar : commonPluginJars) {
-            // handle the local file path
-            // origin path : 
/${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar ->
-            // handled path : 
${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar
-            Path path = Paths.get(commonPluginJar.getPath().substring(1));
+            Path path;
+            if (commonPluginJar.getPath().startsWith("/")) {
+                // handle the local file path
+                // origin path : 
/${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar
+                // ->
+                // handled path : 
${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar
+                path = Paths.get(commonPluginJar.getPath().substring(1));
+            } else {
+                path = Paths.get(commonPluginJar.getPath());
+            }
             ConnectorJarIdentifier connectorJarIdentifier = 
uploadCommonPluginJar(jobId, path);
             connectorJarIdentifiers.add(connectorJarIdentifier);
         }
@@ -78,18 +77,16 @@ public class ConnectorPackageClient {
         ConnectorJar connectorJar =
                 ConnectorJar.createConnectorJar(
                         digest, ConnectorJarType.COMMON_PLUGIN_JAR, data, 
fileName);
-        ConnectorJarIdentifier connectorJarIdentifier =
-                hazelcastClient
-                        .getSerializationService()
-                        .toObject(
-                                
hazelcastClient.requestOnMasterAndDecodeResponse(
-                                        
SeaTunnelUploadConnectorJarCodec.encodeRequest(
-                                                jobId,
-                                                hazelcastClient
-                                                        
.getSerializationService()
-                                                        .toData(connectorJar)),
-                                        
SeaTunnelUploadConnectorJarCodec::decodeResponse));
-        return connectorJarIdentifier;
+        return hazelcastClient
+                .getSerializationService()
+                .toObject(
+                        hazelcastClient.requestOnMasterAndDecodeResponse(
+                                SeaTunnelUploadConnectorJarCodec.encodeRequest(
+                                        jobId,
+                                        hazelcastClient
+                                                .getSerializationService()
+                                                .toData(connectorJar)),
+                                
SeaTunnelUploadConnectorJarCodec::decodeResponse));
     }
 
     public ConnectorJarIdentifier uploadConnectorPluginJar(long jobId, URL 
connectorPluginJarURL) {
@@ -105,37 +102,24 @@ public class ConnectorPackageClient {
         ConnectorJar connectorJar =
                 ConnectorJar.createConnectorJar(
                         digest, ConnectorJarType.CONNECTOR_PLUGIN_JAR, data, 
fileName);
-        ConnectorJarIdentifier connectorJarIdentifier =
-                hazelcastClient
-                        .getSerializationService()
-                        .toObject(
-                                
hazelcastClient.requestOnMasterAndDecodeResponse(
-                                        
SeaTunnelUploadConnectorJarCodec.encodeRequest(
-                                                jobId,
-                                                hazelcastClient
-                                                        
.getSerializationService()
-                                                        .toData(connectorJar)),
-                                        
SeaTunnelUploadConnectorJarCodec::decodeResponse));
-        return connectorJarIdentifier;
+        return hazelcastClient
+                .getSerializationService()
+                .toObject(
+                        hazelcastClient.requestOnMasterAndDecodeResponse(
+                                SeaTunnelUploadConnectorJarCodec.encodeRequest(
+                                        jobId,
+                                        hazelcastClient
+                                                .getSerializationService()
+                                                .toData(connectorJar)),
+                                
SeaTunnelUploadConnectorJarCodec::decodeResponse));
     }
 
     private static byte[] readFileData(Path filePath) {
         // Read file data and convert it to a byte array.
         try {
-            InputStream inputStream = Files.newInputStream(filePath);
-            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-            byte[] buffer = new byte[1024];
-            int bytesRead;
-            while ((bytesRead = inputStream.read(buffer)) != -1) {
-                outputStream.write(buffer, 0, bytesRead);
-            }
-            return outputStream.toByteArray();
+            return Files.readAllBytes(filePath);
         } catch (IOException e) {
-            LOGGER.warning(
-                    String.format(
-                            "Failed to read the connector jar package file : { 
%s } , the file to be read may not exist",
-                            filePath.toString()));
-            throw new RuntimeException();
+            throw new RuntimeException(e);
         }
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 51e1187301..5c104c9331 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -150,26 +150,26 @@ public class ServerConfigOptions {
 
     public static final Option<ConnectorJarStorageMode> 
CONNECTOR_JAR_STORAGE_MODE =
             Options.key("connector-jar-storage-mode")
-                    .type(new TypeReference<ConnectorJarStorageMode>() {})
+                    .enumType(ConnectorJarStorageMode.class)
                     .defaultValue(ConnectorJarStorageMode.SHARED)
                     .withDescription(
                             "The storage mode of the connector jar package, 
including SHARED, ISOLATED. Default is SHARED");
 
     public static final Option<String> CONNECTOR_JAR_STORAGE_PATH =
             Options.key("connector-jar-storage-path")
-                    .type(new TypeReference<String>() {})
+                    .stringType()
                     .defaultValue("")
                     .withDescription("The user defined connector jar storage 
path.");
 
     public static final Option<Integer> CONNECTOR_JAR_CLEANUP_TASK_INTERVAL =
             Options.key("connector-jar-cleanup-task-interval")
-                    .type(new TypeReference<Integer>() {})
+                    .intType()
                     .defaultValue(3600)
                     .withDescription("The user defined connector jar cleanup 
task interval.");
 
     public static final Option<Integer> CONNECTOR_JAR_EXPIRY_TIME =
             Options.key("connector-jar-expiry-time")
-                    .type(new TypeReference<Integer>() {})
+                    .intType()
                     .defaultValue(600)
                     .withDescription("The user defined connector jar expiry 
time.");
 
@@ -181,7 +181,7 @@ public class ServerConfigOptions {
 
     public static final Option<Map<String, String>> 
CONNECTOR_JAR_HA_STORAGE_PLUGIN_CONFIG =
             Options.key("plugin-config")
-                    .type(new TypeReference<Map<String, String>>() {})
+                    .mapType()
                     .noDefaultValue()
                     .withDescription("The connector jar HA storage instance 
configuration.");
 
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ConnectorJar.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ConnectorJar.java
index d5bc7902ba..3e4c6b011b 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ConnectorJar.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ConnectorJar.java
@@ -36,7 +36,7 @@ public abstract class ConnectorJar implements 
IdentifiedDataSerializable {
 
     protected ConnectorJar(ConnectorJarType type, byte[] data, String 
fileName) {
         checkNotNull(data);
-        if (data == null || data.length == 0) {
+        if (data.length == 0) {
             throw new IllegalArgumentException("The Jar package file for the 
connector is empty!");
         }
         checkNotNull(type);
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ConnectorJarIdentifier.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ConnectorJarIdentifier.java
index e0625929f0..13ffcabf88 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ConnectorJarIdentifier.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ConnectorJarIdentifier.java
@@ -54,13 +54,11 @@ public class ConnectorJarIdentifier implements Serializable 
{
     }
 
     public static ConnectorJarIdentifier of(ConnectorJar connectorJar, String 
storagePath) {
-        ConnectorJarIdentifier connectorJarIdentifier =
-                ConnectorJarIdentifier.of(
-                        connectorJar.getConnectorJarID(),
-                        connectorJar.getType(),
-                        connectorJar.getFileName(),
-                        storagePath);
-        return connectorJarIdentifier;
+        return ConnectorJarIdentifier.of(
+                connectorJar.getConnectorJarID(),
+                connectorJar.getType(),
+                connectorJar.getFileName(),
+                storagePath);
     }
 
     public static ConnectorJarIdentifier of(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 95c85e8076..d92f40ce39 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -41,7 +41,6 @@ import 
org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.master.ConnectorPackageService;
 import org.apache.seatunnel.engine.server.master.JobHistoryService;
 import org.apache.seatunnel.engine.server.master.JobMaster;
 import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
@@ -49,6 +48,7 @@ import 
org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import 
org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
 import org.apache.seatunnel.engine.server.task.operation.GetMetricsOperation;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index b9542672bd..ebb0edea85 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -23,7 +23,7 @@ import 
org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
-import org.apache.seatunnel.engine.server.master.ConnectorPackageService;
+import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
 import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
 import org.apache.seatunnel.engine.server.service.slot.SlotService;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 98caa3f226..0645ae25ce 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -42,8 +42,8 @@ import 
org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.execution.TaskTracker;
 import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
+import 
org.apache.seatunnel.engine.server.service.jar.ServerConnectorPackageClient;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
-import org.apache.seatunnel.engine.server.task.ServerConnectorPackageClient;
 import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
 import 
org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 6a0c562c09..6c527732d6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -310,12 +310,14 @@ public class JobMaster {
                             ExceptionUtils.getMessage(e)));
         } finally {
             jobMasterCompleteFuture.join();
-            List<ConnectorJarIdentifier> pluginJarIdentifiers =
-                    jobImmutableInformation.getPluginJarIdentifiers();
-            seaTunnelServer
-                    .getConnectorPackageService()
-                    .cleanUpWhenJobFinished(
-                            jobImmutableInformation.getJobId(), 
pluginJarIdentifiers);
+            if (engineConfig.getConnectorJarStorageConfig().getEnable()) {
+                List<ConnectorJarIdentifier> pluginJarIdentifiers =
+                        jobImmutableInformation.getPluginJarIdentifiers();
+                seaTunnelServer
+                        .getConnectorPackageService()
+                        .cleanUpWhenJobFinished(
+                                jobImmutableInformation.getJobId(), 
pluginJarIdentifiers);
+            }
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/AbstractConnectorJarStorageStrategy.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/AbstractConnectorJarStorageStrategy.java
similarity index 83%
rename from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/AbstractConnectorJarStorageStrategy.java
rename to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/AbstractConnectorJarStorageStrategy.java
index 1d14bae3e1..f9dc21f03b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/AbstractConnectorJarStorageStrategy.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/AbstractConnectorJarStorageStrategy.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.master;
+package org.apache.seatunnel.engine.server.service.jar;
 
 import org.apache.seatunnel.engine.common.config.SeaTunnelProperties;
 import 
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
@@ -33,10 +33,7 @@ import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -52,7 +49,7 @@ public abstract class AbstractConnectorJarStorageStrategy 
implements ConnectorJa
 
     protected static final String COMMON_PLUGIN_JAR_STORAGE_PATH = "/plugins";
 
-    protected static final String CONNECTOR_PLUGIN_JAR_STORAGE_PATH = 
"/connectors/seatunnel";
+    protected static final String CONNECTOR_PLUGIN_JAR_STORAGE_PATH = 
"/connectors";
 
     protected String storageDir;
 
@@ -81,7 +78,7 @@ public abstract class AbstractConnectorJarStorageStrategy 
implements ConnectorJa
             LOGGER.warning(
                     String.format(
                             "The creation of directories : %s for the 
connector jar storage path has failed.",
-                            file.getParentFile().toPath().toString()));
+                            file.getParentFile().toPath()));
         }
         return file;
     }
@@ -97,8 +94,7 @@ public abstract class AbstractConnectorJarStorageStrategy 
implements ConnectorJa
         boolean success = false;
         try {
             if (!storageFile.exists()) {
-                FileOutputStream fos = new FileOutputStream(storageFile);
-                fos.write(connectorJar.getData());
+                Files.write(storageFile.toPath(), connectorJar.getData());
             } else {
                 LOGGER.warning(
                         String.format(
@@ -160,25 +156,4 @@ public abstract class AbstractConnectorJarStorageStrategy 
implements ConnectorJa
                     }
                 });
     }
-
-    @Override
-    public byte[] readConnectorJarByteDataInternal(File connectorJarFile) {
-        try {
-            // Read file data and convert it to a byte array.
-            FileInputStream inputStream = new 
FileInputStream(connectorJarFile);
-            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-            byte[] buffer = new byte[1024];
-            int bytesRead;
-            while ((bytesRead = inputStream.read(buffer)) != -1) {
-                outputStream.write(buffer, 0, bytesRead);
-            }
-            return outputStream.toByteArray();
-        } catch (IOException e) {
-            LOGGER.warning(
-                    String.format(
-                            "Failed to read the connector jar package file : { 
%s } , the file to be read may not exist",
-                            connectorJarFile));
-            return new byte[0];
-        }
-    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/ConnectorJarStorageStrategy.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorJarStorageStrategy.java
similarity index 83%
rename from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/ConnectorJarStorageStrategy.java
rename to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorJarStorageStrategy.java
index 69a11e53ab..d5d6c6d21f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/ConnectorJarStorageStrategy.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorJarStorageStrategy.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.master;
+package org.apache.seatunnel.engine.server.service.jar;
 
 import org.apache.seatunnel.engine.core.job.ConnectorJar;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
@@ -60,7 +60,7 @@ public interface ConnectorJarStorageStrategy extends 
Serializable {
      *
      * @param connectorJar connector jar
      * @param storageLocation the storage location of the connector jar in the 
local file system
-     * @return
+     * @return the storage path of connector jar file
      */
     Optional<Path> storageConnectorJarFileInternal(ConnectorJar connectorJar, 
File storageLocation);
 
@@ -69,7 +69,7 @@ public interface ConnectorJarStorageStrategy extends 
Serializable {
      *
      * @param jobId ID of the job for the connector jar
      * @param connectorJar connector jar
-     * @return
+     * @return true if the same connector Jar package exists in the engine, 
otherwise false
      */
     boolean checkConnectorJarExisted(long jobId, ConnectorJar connectorJar);
 
@@ -78,7 +78,7 @@ public interface ConnectorJarStorageStrategy extends 
Serializable {
      *
      * @param jobId ID of the job for the connector jar
      * @param connectorJar connector jar
-     * @return
+     * @return the unique identifier of the connector jar
      */
     ConnectorJarIdentifier getConnectorJarIdentifier(long jobId, ConnectorJar 
connectorJar);
 
@@ -96,11 +96,6 @@ public interface ConnectorJarStorageStrategy extends 
Serializable {
      */
     void deleteConnectorJarInExecutionNode(ConnectorJarIdentifier 
connectorJarIdentifier);
 
-    /**
-     * Delete the connector jar package by connectorJarIdentifier
-     *
-     * @param connectorJarIdentifier the unique identifier of the connector 
jar.
-     */
     /**
      * Delete the connector jar package in the local file system by 
connectorJarIdentifier.
      *
@@ -108,22 +103,6 @@ public interface ConnectorJarStorageStrategy extends 
Serializable {
      */
     void deleteConnectorJarInternal(File storageLocation);
 
-    /**
-     * Read connector Jar package from file to byte array.
-     *
-     * @param connectorJarFile the connector jar file
-     * @return the byte array of the connector jar file
-     */
-    byte[] readConnectorJarByteDataInternal(File connectorJarFile);
-
-    /**
-     * Read connector Jar package from file to byte array.
-     *
-     * @param connectorJarFile the connector jar file
-     * @return the byte array of the connector jar file
-     */
-    byte[] readConnectorJarByteData(File connectorJarFile);
-
     /**
      * Carry out the cleaning work after the task is finished.
      *
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/ConnectorPackageService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorPackageService.java
similarity index 88%
rename from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/ConnectorPackageService.java
rename to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorPackageService.java
index 1ddedbfeb1..4e1ec89593 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/ConnectorPackageService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorPackageService.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.master;
+package org.apache.seatunnel.engine.server.service.jar;
 
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import 
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
@@ -27,7 +27,6 @@ import 
org.apache.seatunnel.engine.server.task.operation.SendConnectorJarToMembe
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
 import com.hazelcast.cluster.Address;
-import com.hazelcast.cluster.Member;
 import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
@@ -35,7 +34,6 @@ import com.hazelcast.spi.impl.NodeEngineImpl;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.extern.slf4j.Slf4j;
 
-import java.util.Collection;
 import java.util.List;
 
 @Slf4j
@@ -67,10 +65,9 @@ public class ConnectorPackageService {
     }
 
     public ConnectorJarIdentifier storageConnectorJarFile(long jobId, Data 
connectorJarData) {
-        // deserialize connector jar package data
         ConnectorJar connectorJar = 
nodeEngine.getSerializationService().toObject(connectorJarData);
-        /**
-         * If the server holds the same Jar package file, there is no need for 
additional storaged.
+        /*
+         * If the server holds the same Jar package file, there is no need for 
additional storage.
          * When the Connector Jar storage strategy is 
SharedConnectorJarStorageStrategy, the
          * reference count in the connectorJarRefCounters needs to be 
increased. When the Connector
          * Jar storage strategy is IsolatedConnectorJarStorageStrategy, we 
don't need to do any
@@ -92,15 +89,17 @@ public class ConnectorPackageService {
         }
         ConnectorJarIdentifier connectorJarIdentifier =
                 connectorJarStorageStrategy.storageConnectorJarFile(jobId, 
connectorJar);
-        Address masterNodeAddress = 
nodeEngine.getClusterService().getMasterAddress();
-        Collection<Member> memberList = 
nodeEngine.getClusterService().getMembers();
-        memberList.forEach(
-                member -> {
-                    Address address = member.getAddress();
-                    if (!address.equals(masterNodeAddress)) {
-                        sendConnectorJarToMemberNode(connectorJarIdentifier, 
connectorJar, address);
-                    }
-                });
+        nodeEngine
+                .getClusterService()
+                .getMembers()
+                .forEach(
+                        member -> {
+                            Address address = member.getAddress();
+                            if (!address.equals(nodeEngine.getThisAddress())) {
+                                sendConnectorJarToMemberNode(
+                                        connectorJarIdentifier, connectorJar, 
address);
+                            }
+                        });
         return connectorJarIdentifier;
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/IsolatedConnectorJarStorageStrategy.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/IsolatedConnectorJarStorageStrategy.java
similarity index 81%
rename from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/IsolatedConnectorJarStorageStrategy.java
rename to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/IsolatedConnectorJarStorageStrategy.java
index dc1f2b0ecf..b963827b5f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/IsolatedConnectorJarStorageStrategy.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/IsolatedConnectorJarStorageStrategy.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.master;
+package org.apache.seatunnel.engine.server.service.jar;
 
 import 
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
 import org.apache.seatunnel.engine.core.job.CommonPluginJar;
@@ -45,29 +45,20 @@ public class IsolatedConnectorJarStorageStrategy extends 
AbstractConnectorJarSto
             return ConnectorJarIdentifier.of(connectorJar, 
storageFile.toString());
         }
         Optional<Path> optional = 
storageConnectorJarFileInternal(connectorJar, storageFile);
-        ConnectorJarIdentifier connectorJarIdentifier =
-                optional.isPresent()
-                        ? ConnectorJarIdentifier.of(connectorJar, 
optional.get().toString())
-                        : ConnectorJarIdentifier.of(connectorJar, "");
-        return connectorJarIdentifier;
+        return optional.map(path -> ConnectorJarIdentifier.of(connectorJar, 
path.toString()))
+                .orElseGet(() -> ConnectorJarIdentifier.of(connectorJar, ""));
     }
 
     @Override
     public boolean checkConnectorJarExisted(long jobId, ConnectorJar 
connectorJar) {
         File storageFile = getStorageLocation(jobId, connectorJar);
-        if (storageFile.exists()) {
-            return true;
-        }
-        return false;
+        return storageFile.exists();
     }
 
     @Override
     public void cleanUpWhenJobFinished(
             long jobId, List<ConnectorJarIdentifier> 
connectorJarIdentifierList) {
-        connectorJarIdentifierList.forEach(
-                connectorJarIdentifier -> {
-                    deleteConnectorJar(connectorJarIdentifier);
-                });
+        connectorJarIdentifierList.forEach(this::deleteConnectorJar);
     }
 
     @Override
@@ -96,9 +87,4 @@ public class IsolatedConnectorJarStorageStrategy extends 
AbstractConnectorJarSto
                     connectorJar.getFileName());
         }
     }
-
-    @Override
-    public byte[] readConnectorJarByteData(File connectorJarFile) {
-        return readConnectorJarByteDataInternal(connectorJarFile);
-    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/ServerConnectorPackageClient.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ServerConnectorPackageClient.java
similarity index 99%
rename from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/ServerConnectorPackageClient.java
rename to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ServerConnectorPackageClient.java
index 1fdba681b3..ed782991f6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/ServerConnectorPackageClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ServerConnectorPackageClient.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.task;
+package org.apache.seatunnel.engine.server.service.jar;
 
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/SharedConnectorJarCleanupTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/SharedConnectorJarCleanupTask.java
similarity index 92%
rename from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/SharedConnectorJarCleanupTask.java
rename to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/SharedConnectorJarCleanupTask.java
index 26c6c4b806..64770f4b56 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/SharedConnectorJarCleanupTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/SharedConnectorJarCleanupTask.java
@@ -15,12 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.master;
+package org.apache.seatunnel.engine.server.service.jar;
 
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
 import org.apache.seatunnel.engine.core.job.RefCount;
 
-import com.hazelcast.logging.ILogger;
 import com.hazelcast.map.IMap;
 
 import java.util.Iterator;
@@ -35,17 +34,13 @@ Cleanup task for shared connector jar package.
  */
 public class SharedConnectorJarCleanupTask extends TimerTask {
 
-    private final ILogger LOGGER;
-
     private final Consumer<ConnectorJarIdentifier> cleanupCallback;
 
     private final IMap<ConnectorJarIdentifier, RefCount> 
connectorJarRefCounters;
 
     public SharedConnectorJarCleanupTask(
-            ILogger LOGGER,
             Consumer<ConnectorJarIdentifier> cleanupCallback,
             IMap<ConnectorJarIdentifier, RefCount> connectorJarRefCounters) {
-        this.LOGGER = checkNotNull(LOGGER);
         this.cleanupCallback = checkNotNull(cleanupCallback);
         this.connectorJarRefCounters = checkNotNull(connectorJarRefCounters);
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/SharedConnectorJarStorageStrategy.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/SharedConnectorJarStorageStrategy.java
similarity index 83%
rename from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/SharedConnectorJarStorageStrategy.java
rename to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/SharedConnectorJarStorageStrategy.java
index e66fdd56e8..c2d88fcbb2 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/SharedConnectorJarStorageStrategy.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/SharedConnectorJarStorageStrategy.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.master;
+package org.apache.seatunnel.engine.server.service.jar;
 
 import org.apache.seatunnel.engine.common.Constant;
 import 
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
@@ -33,7 +33,6 @@ import java.util.List;
 import java.util.Timer;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.BiFunction;
 
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
 
@@ -56,12 +55,12 @@ public class SharedConnectorJarStorageStrategy extends 
AbstractConnectorJarStora
         this.readWriteLock = new ReentrantReadWriteLock();
         this.connectorJarRefCounters =
                 
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_CONNECTOR_JAR_REF_COUNTERS);
-        // Initializing the clean up task
+        // Initializing the cleanup task
         this.cleanupTimer = new Timer(true);
         this.cleanupInterval = 
connectorJarStorageConfig.getCleanupTaskInterval() * 1000;
         this.cleanupTimer.schedule(
                 new SharedConnectorJarCleanupTask(
-                        LOGGER, this::deleteConnectorJar, 
connectorJarRefCounters),
+                        this::deleteConnectorJar, connectorJarRefCounters),
                 cleanupInterval,
                 cleanupInterval);
     }
@@ -95,10 +94,7 @@ public class SharedConnectorJarStorageStrategy extends 
AbstractConnectorJarStora
                 ConnectorJarIdentifier.of(
                         connectorJar, getStorageLocationPath(jobId, 
connectorJar));
         RefCount refCount = 
connectorJarRefCounters.get(connectorJarIdentifier);
-        if (refCount != null) {
-            return true;
-        }
-        return false;
+        return refCount != null;
     }
 
     public void increaseRefCountForConnectorJar(ConnectorJarIdentifier 
connectorJarIdentifier) {
@@ -145,35 +141,18 @@ public class SharedConnectorJarStorageStrategy extends 
AbstractConnectorJarStora
     @Override
     public void cleanUpWhenJobFinished(
             long jobId, List<ConnectorJarIdentifier> 
connectorJarIdentifierList) {
-        connectorJarIdentifierList.forEach(
-                connectorJarIdentifier -> {
-                    decreaseConnectorJarRefCount(connectorJarIdentifier);
-                });
+        connectorJarIdentifierList.forEach(this::decreaseConnectorJarRefCount);
     }
 
     public void decreaseConnectorJarRefCount(ConnectorJarIdentifier 
connectorJarIdentifier) {
         connectorJarRefCounters.compute(
                 connectorJarIdentifier,
-                new BiFunction<ConnectorJarIdentifier, RefCount, RefCount>() {
-                    @Override
-                    public RefCount apply(
-                            ConnectorJarIdentifier connectorJarIdentifier, 
RefCount refCount) {
-                        if (refCount != null) {
-                            Long references = refCount.getReferences();
-                            refCount.setReferences(--references);
-                        }
-                        return refCount;
+                (connectorJarIdentifier1, refCount) -> {
+                    if (refCount != null) {
+                        Long references = refCount.getReferences();
+                        refCount.setReferences(--references);
                     }
+                    return refCount;
                 });
     }
-
-    @Override
-    public byte[] readConnectorJarByteData(File connectorJarFile) {
-        readWriteLock.readLock().lock();
-        try {
-            return readConnectorJarByteDataInternal(connectorJarFile);
-        } finally {
-            readWriteLock.readLock().unlock();
-        }
-    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/StorageStrategyFactory.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/StorageStrategyFactory.java
similarity index 97%
rename from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/StorageStrategyFactory.java
rename to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/StorageStrategyFactory.java
index 9aed0a564a..f0fe4d9b84 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/StorageStrategyFactory.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/StorageStrategyFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.master;
+package org.apache.seatunnel.engine.server.service.jar;
 
 import 
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
 import 
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageMode;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeleteConnectorJarInExecutionNode.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeleteConnectorJarInExecutionNode.java
index 87e572c366..52b211f15c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeleteConnectorJarInExecutionNode.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeleteConnectorJarInExecutionNode.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.engine.server.task.operation;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.ServerConnectorPackageClient;
+import 
org.apache.seatunnel.engine.server.service.jar.ServerConnectorPackageClient;
 
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/SendConnectorJarToMemberNodeOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/SendConnectorJarToMemberNodeOperation.java
index 836f09cffe..94d767cb07 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/SendConnectorJarToMemberNodeOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/SendConnectorJarToMemberNodeOperation.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.engine.core.job.ConnectorJar;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.ServerConnectorPackageClient;
+import 
org.apache.seatunnel.engine.server.service.jar.ServerConnectorPackageClient;
 
 import com.hazelcast.nio.ObjectDataInput;
 import com.hazelcast.nio.ObjectDataOutput;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java
index ba4da88a1d..a5c0569d60 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java
@@ -45,7 +45,7 @@ import 
org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.core.job.PipelineStatus;
 import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
-import org.apache.seatunnel.engine.server.master.ConnectorPackageService;
+import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;


Reply via email to