This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c0599f55bc [Improve][Zeta] Refactor jar package service module (#5763)
c0599f55bc is described below
commit c0599f55bc9133d8bba19f69680c8a2a99a17074
Author: Jia Fan <[email protected]>
AuthorDate: Tue Nov 7 13:36:00 2023 +0800
[Improve][Zeta] Refactor jar package service module (#5763)
---
.../engine/client/job/ConnectorPackageClient.java | 80 +++++++++-------------
.../common/config/server/ServerConfigOptions.java | 10 +--
.../seatunnel/engine/core/job/ConnectorJar.java | 2 +-
.../engine/core/job/ConnectorJarIdentifier.java | 12 ++--
.../engine/server/CoordinatorService.java | 2 +-
.../seatunnel/engine/server/SeaTunnelServer.java | 2 +-
.../engine/server/TaskExecutionService.java | 2 +-
.../seatunnel/engine/server/master/JobMaster.java | 14 ++--
.../jar}/AbstractConnectorJarStorageStrategy.java | 33 ++-------
.../jar}/ConnectorJarStorageStrategy.java | 29 ++------
.../jar}/ConnectorPackageService.java | 29 ++++----
.../jar}/IsolatedConnectorJarStorageStrategy.java | 24 ++-----
.../jar}/ServerConnectorPackageClient.java | 2 +-
.../jar}/SharedConnectorJarCleanupTask.java | 7 +-
.../jar}/SharedConnectorJarStorageStrategy.java | 41 +++--------
.../jar}/StorageStrategyFactory.java | 2 +-
.../DeleteConnectorJarInExecutionNode.java | 2 +-
.../SendConnectorJarToMemberNodeOperation.java | 2 +-
.../engine/server/ConnectorPackageServiceTest.java | 2 +-
19 files changed, 97 insertions(+), 200 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorPackageClient.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorPackageClient.java
index 84555efb82..50fe2e111b 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorPackageClient.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorPackageClient.java
@@ -24,12 +24,7 @@ import
org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.core.job.ConnectorJarType;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelUploadConnectorJarCodec;
-import com.hazelcast.logging.ILogger;
-import com.hazelcast.logging.Logger;
-
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -43,8 +38,6 @@ import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.ch
public class ConnectorPackageClient {
- private static final ILogger LOGGER =
Logger.getLogger(ConnectorPackageClient.class);
-
private final SeaTunnelHazelcastClient hazelcastClient;
public ConnectorPackageClient(SeaTunnelHazelcastClient hazelcastClient) {
@@ -57,10 +50,16 @@ public class ConnectorPackageClient {
Set<ConnectorJarIdentifier> connectorJarIdentifiers = new HashSet<>();
// Upload commonPluginJar
for (URL commonPluginJar : commonPluginJars) {
- // handle the local file path
- // origin path :
/${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar ->
- // handled path :
${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar
- Path path = Paths.get(commonPluginJar.getPath().substring(1));
+ Path path;
+ if (commonPluginJar.getPath().startsWith("/")) {
+ // handle the local file path
+ // origin path :
/${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar
+ // ->
+ // handled path :
${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar
+ path = Paths.get(commonPluginJar.getPath().substring(1));
+ } else {
+ path = Paths.get(commonPluginJar.getPath());
+ }
ConnectorJarIdentifier connectorJarIdentifier =
uploadCommonPluginJar(jobId, path);
connectorJarIdentifiers.add(connectorJarIdentifier);
}
@@ -78,18 +77,16 @@ public class ConnectorPackageClient {
ConnectorJar connectorJar =
ConnectorJar.createConnectorJar(
digest, ConnectorJarType.COMMON_PLUGIN_JAR, data,
fileName);
- ConnectorJarIdentifier connectorJarIdentifier =
- hazelcastClient
- .getSerializationService()
- .toObject(
-
hazelcastClient.requestOnMasterAndDecodeResponse(
-
SeaTunnelUploadConnectorJarCodec.encodeRequest(
- jobId,
- hazelcastClient
-
.getSerializationService()
- .toData(connectorJar)),
-
SeaTunnelUploadConnectorJarCodec::decodeResponse));
- return connectorJarIdentifier;
+ return hazelcastClient
+ .getSerializationService()
+ .toObject(
+ hazelcastClient.requestOnMasterAndDecodeResponse(
+ SeaTunnelUploadConnectorJarCodec.encodeRequest(
+ jobId,
+ hazelcastClient
+ .getSerializationService()
+ .toData(connectorJar)),
+
SeaTunnelUploadConnectorJarCodec::decodeResponse));
}
public ConnectorJarIdentifier uploadConnectorPluginJar(long jobId, URL
connectorPluginJarURL) {
@@ -105,37 +102,24 @@ public class ConnectorPackageClient {
ConnectorJar connectorJar =
ConnectorJar.createConnectorJar(
digest, ConnectorJarType.CONNECTOR_PLUGIN_JAR, data,
fileName);
- ConnectorJarIdentifier connectorJarIdentifier =
- hazelcastClient
- .getSerializationService()
- .toObject(
-
hazelcastClient.requestOnMasterAndDecodeResponse(
-
SeaTunnelUploadConnectorJarCodec.encodeRequest(
- jobId,
- hazelcastClient
-
.getSerializationService()
- .toData(connectorJar)),
-
SeaTunnelUploadConnectorJarCodec::decodeResponse));
- return connectorJarIdentifier;
+ return hazelcastClient
+ .getSerializationService()
+ .toObject(
+ hazelcastClient.requestOnMasterAndDecodeResponse(
+ SeaTunnelUploadConnectorJarCodec.encodeRequest(
+ jobId,
+ hazelcastClient
+ .getSerializationService()
+ .toData(connectorJar)),
+
SeaTunnelUploadConnectorJarCodec::decodeResponse));
}
private static byte[] readFileData(Path filePath) {
// Read file data and convert it to a byte array.
try {
- InputStream inputStream = Files.newInputStream(filePath);
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- byte[] buffer = new byte[1024];
- int bytesRead;
- while ((bytesRead = inputStream.read(buffer)) != -1) {
- outputStream.write(buffer, 0, bytesRead);
- }
- return outputStream.toByteArray();
+ return Files.readAllBytes(filePath);
} catch (IOException e) {
- LOGGER.warning(
- String.format(
- "Failed to read the connector jar package file : {
%s } , the file to be read may not exist",
- filePath.toString()));
- throw new RuntimeException();
+ throw new RuntimeException(e);
}
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 51e1187301..5c104c9331 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -150,26 +150,26 @@ public class ServerConfigOptions {
public static final Option<ConnectorJarStorageMode>
CONNECTOR_JAR_STORAGE_MODE =
Options.key("connector-jar-storage-mode")
- .type(new TypeReference<ConnectorJarStorageMode>() {})
+ .enumType(ConnectorJarStorageMode.class)
.defaultValue(ConnectorJarStorageMode.SHARED)
.withDescription(
"The storage mode of the connector jar package,
including SHARED, ISOLATED. Default is SHARED");
public static final Option<String> CONNECTOR_JAR_STORAGE_PATH =
Options.key("connector-jar-storage-path")
- .type(new TypeReference<String>() {})
+ .stringType()
.defaultValue("")
.withDescription("The user defined connector jar storage
path.");
public static final Option<Integer> CONNECTOR_JAR_CLEANUP_TASK_INTERVAL =
Options.key("connector-jar-cleanup-task-interval")
- .type(new TypeReference<Integer>() {})
+ .intType()
.defaultValue(3600)
.withDescription("The user defined connector jar cleanup
task interval.");
public static final Option<Integer> CONNECTOR_JAR_EXPIRY_TIME =
Options.key("connector-jar-expiry-time")
- .type(new TypeReference<Integer>() {})
+ .intType()
.defaultValue(600)
.withDescription("The user defined connector jar expiry
time.");
@@ -181,7 +181,7 @@ public class ServerConfigOptions {
public static final Option<Map<String, String>>
CONNECTOR_JAR_HA_STORAGE_PLUGIN_CONFIG =
Options.key("plugin-config")
- .type(new TypeReference<Map<String, String>>() {})
+ .mapType()
.noDefaultValue()
.withDescription("The connector jar HA storage instance
configuration.");
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ConnectorJar.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ConnectorJar.java
index d5bc7902ba..3e4c6b011b 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ConnectorJar.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ConnectorJar.java
@@ -36,7 +36,7 @@ public abstract class ConnectorJar implements
IdentifiedDataSerializable {
protected ConnectorJar(ConnectorJarType type, byte[] data, String
fileName) {
checkNotNull(data);
- if (data == null || data.length == 0) {
+ if (data.length == 0) {
throw new IllegalArgumentException("The Jar package file for the
connector is empty!");
}
checkNotNull(type);
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ConnectorJarIdentifier.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ConnectorJarIdentifier.java
index e0625929f0..13ffcabf88 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ConnectorJarIdentifier.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ConnectorJarIdentifier.java
@@ -54,13 +54,11 @@ public class ConnectorJarIdentifier implements Serializable
{
}
public static ConnectorJarIdentifier of(ConnectorJar connectorJar, String
storagePath) {
- ConnectorJarIdentifier connectorJarIdentifier =
- ConnectorJarIdentifier.of(
- connectorJar.getConnectorJarID(),
- connectorJar.getType(),
- connectorJar.getFileName(),
- storagePath);
- return connectorJarIdentifier;
+ return ConnectorJarIdentifier.of(
+ connectorJar.getConnectorJarID(),
+ connectorJar.getType(),
+ connectorJar.getFileName(),
+ storagePath);
}
public static ConnectorJarIdentifier of(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 95c85e8076..d92f40ce39 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -41,7 +41,6 @@ import
org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
-import org.apache.seatunnel.engine.server.master.ConnectorPackageService;
import org.apache.seatunnel.engine.server.master.JobHistoryService;
import org.apache.seatunnel.engine.server.master.JobMaster;
import org.apache.seatunnel.engine.server.metrics.JobMetricsUtil;
@@ -49,6 +48,7 @@ import
org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import
org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
import org.apache.seatunnel.engine.server.task.operation.GetMetricsOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index b9542672bd..ebb0edea85 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -23,7 +23,7 @@ import
org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
-import org.apache.seatunnel.engine.server.master.ConnectorPackageService;
+import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
import org.apache.seatunnel.engine.server.service.slot.SlotService;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 98caa3f226..0645ae25ce 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -42,8 +42,8 @@ import
org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.execution.TaskTracker;
import org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
+import
org.apache.seatunnel.engine.server.service.jar.ServerConnectorPackageClient;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
-import org.apache.seatunnel.engine.server.task.ServerConnectorPackageClient;
import org.apache.seatunnel.engine.server.task.TaskGroupImmutableInformation;
import
org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 6a0c562c09..6c527732d6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -310,12 +310,14 @@ public class JobMaster {
ExceptionUtils.getMessage(e)));
} finally {
jobMasterCompleteFuture.join();
- List<ConnectorJarIdentifier> pluginJarIdentifiers =
- jobImmutableInformation.getPluginJarIdentifiers();
- seaTunnelServer
- .getConnectorPackageService()
- .cleanUpWhenJobFinished(
- jobImmutableInformation.getJobId(),
pluginJarIdentifiers);
+ if (engineConfig.getConnectorJarStorageConfig().getEnable()) {
+ List<ConnectorJarIdentifier> pluginJarIdentifiers =
+ jobImmutableInformation.getPluginJarIdentifiers();
+ seaTunnelServer
+ .getConnectorPackageService()
+ .cleanUpWhenJobFinished(
+ jobImmutableInformation.getJobId(),
pluginJarIdentifiers);
+ }
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/AbstractConnectorJarStorageStrategy.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/AbstractConnectorJarStorageStrategy.java
similarity index 83%
rename from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/AbstractConnectorJarStorageStrategy.java
rename to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/AbstractConnectorJarStorageStrategy.java
index 1d14bae3e1..f9dc21f03b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/AbstractConnectorJarStorageStrategy.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/AbstractConnectorJarStorageStrategy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.master;
+package org.apache.seatunnel.engine.server.service.jar;
import org.apache.seatunnel.engine.common.config.SeaTunnelProperties;
import
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
@@ -33,10 +33,7 @@ import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.spi.impl.NodeEngineImpl;
-import java.io.ByteArrayOutputStream;
import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -52,7 +49,7 @@ public abstract class AbstractConnectorJarStorageStrategy
implements ConnectorJa
protected static final String COMMON_PLUGIN_JAR_STORAGE_PATH = "/plugins";
- protected static final String CONNECTOR_PLUGIN_JAR_STORAGE_PATH =
"/connectors/seatunnel";
+ protected static final String CONNECTOR_PLUGIN_JAR_STORAGE_PATH =
"/connectors";
protected String storageDir;
@@ -81,7 +78,7 @@ public abstract class AbstractConnectorJarStorageStrategy
implements ConnectorJa
LOGGER.warning(
String.format(
"The creation of directories : %s for the
connector jar storage path has failed.",
- file.getParentFile().toPath().toString()));
+ file.getParentFile().toPath()));
}
return file;
}
@@ -97,8 +94,7 @@ public abstract class AbstractConnectorJarStorageStrategy
implements ConnectorJa
boolean success = false;
try {
if (!storageFile.exists()) {
- FileOutputStream fos = new FileOutputStream(storageFile);
- fos.write(connectorJar.getData());
+ Files.write(storageFile.toPath(), connectorJar.getData());
} else {
LOGGER.warning(
String.format(
@@ -160,25 +156,4 @@ public abstract class AbstractConnectorJarStorageStrategy
implements ConnectorJa
}
});
}
-
- @Override
- public byte[] readConnectorJarByteDataInternal(File connectorJarFile) {
- try {
- // Read file data and convert it to a byte array.
- FileInputStream inputStream = new
FileInputStream(connectorJarFile);
- ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- byte[] buffer = new byte[1024];
- int bytesRead;
- while ((bytesRead = inputStream.read(buffer)) != -1) {
- outputStream.write(buffer, 0, bytesRead);
- }
- return outputStream.toByteArray();
- } catch (IOException e) {
- LOGGER.warning(
- String.format(
- "Failed to read the connector jar package file : {
%s } , the file to be read may not exist",
- connectorJarFile));
- return new byte[0];
- }
- }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/ConnectorJarStorageStrategy.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorJarStorageStrategy.java
similarity index 83%
rename from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/ConnectorJarStorageStrategy.java
rename to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorJarStorageStrategy.java
index 69a11e53ab..d5d6c6d21f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/ConnectorJarStorageStrategy.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorJarStorageStrategy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.master;
+package org.apache.seatunnel.engine.server.service.jar;
import org.apache.seatunnel.engine.core.job.ConnectorJar;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
@@ -60,7 +60,7 @@ public interface ConnectorJarStorageStrategy extends
Serializable {
*
* @param connectorJar connector jar
* @param storageLocation the storage location of the connector jar in the
local file system
- * @return
+ * @return the storage path of connector jar file
*/
Optional<Path> storageConnectorJarFileInternal(ConnectorJar connectorJar,
File storageLocation);
@@ -69,7 +69,7 @@ public interface ConnectorJarStorageStrategy extends
Serializable {
*
* @param jobId ID of the job for the connector jar
* @param connectorJar connector jar
- * @return
+ * @return true if the same connector Jar package exists in the engine,
otherwise false
*/
boolean checkConnectorJarExisted(long jobId, ConnectorJar connectorJar);
@@ -78,7 +78,7 @@ public interface ConnectorJarStorageStrategy extends
Serializable {
*
* @param jobId ID of the job for the connector jar
* @param connectorJar connector jar
- * @return
+ * @return the unique identifier of the connector jar
*/
ConnectorJarIdentifier getConnectorJarIdentifier(long jobId, ConnectorJar
connectorJar);
@@ -96,11 +96,6 @@ public interface ConnectorJarStorageStrategy extends
Serializable {
*/
void deleteConnectorJarInExecutionNode(ConnectorJarIdentifier
connectorJarIdentifier);
- /**
- * Delete the connector jar package by connectorJarIdentifier
- *
- * @param connectorJarIdentifier the unique identifier of the connector
jar.
- */
/**
* Delete the connector jar package in the local file system by
connectorJarIdentifier.
*
@@ -108,22 +103,6 @@ public interface ConnectorJarStorageStrategy extends
Serializable {
*/
void deleteConnectorJarInternal(File storageLocation);
- /**
- * Read connector Jar package from file to byte array.
- *
- * @param connectorJarFile the connector jar file
- * @return the byte array of the connector jar file
- */
- byte[] readConnectorJarByteDataInternal(File connectorJarFile);
-
- /**
- * Read connector Jar package from file to byte array.
- *
- * @param connectorJarFile the connector jar file
- * @return the byte array of the connector jar file
- */
- byte[] readConnectorJarByteData(File connectorJarFile);
-
/**
* Carry out the cleaning work after the task is finished.
*
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/ConnectorPackageService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorPackageService.java
similarity index 88%
rename from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/ConnectorPackageService.java
rename to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorPackageService.java
index 1ddedbfeb1..4e1ec89593 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/ConnectorPackageService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ConnectorPackageService.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.master;
+package org.apache.seatunnel.engine.server.service.jar;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
@@ -27,7 +27,6 @@ import
org.apache.seatunnel.engine.server.task.operation.SendConnectorJarToMembe
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
import com.hazelcast.cluster.Address;
-import com.hazelcast.cluster.Member;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
@@ -35,7 +34,6 @@ import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.extern.slf4j.Slf4j;
-import java.util.Collection;
import java.util.List;
@Slf4j
@@ -67,10 +65,9 @@ public class ConnectorPackageService {
}
public ConnectorJarIdentifier storageConnectorJarFile(long jobId, Data
connectorJarData) {
- // deserialize connector jar package data
ConnectorJar connectorJar =
nodeEngine.getSerializationService().toObject(connectorJarData);
- /**
- * If the server holds the same Jar package file, there is no need for
additional storaged.
+ /*
+ * If the server holds the same Jar package file, there is no need for
additional storage.
* When the Connector Jar storage strategy is
SharedConnectorJarStorageStrategy, the
* reference count in the connectorJarRefCounters needs to be
increased. When the Connector
* Jar storage strategy is IsolatedConnectorJarStorageStrategy, we
don't need to do any
@@ -92,15 +89,17 @@ public class ConnectorPackageService {
}
ConnectorJarIdentifier connectorJarIdentifier =
connectorJarStorageStrategy.storageConnectorJarFile(jobId,
connectorJar);
- Address masterNodeAddress =
nodeEngine.getClusterService().getMasterAddress();
- Collection<Member> memberList =
nodeEngine.getClusterService().getMembers();
- memberList.forEach(
- member -> {
- Address address = member.getAddress();
- if (!address.equals(masterNodeAddress)) {
- sendConnectorJarToMemberNode(connectorJarIdentifier,
connectorJar, address);
- }
- });
+ nodeEngine
+ .getClusterService()
+ .getMembers()
+ .forEach(
+ member -> {
+ Address address = member.getAddress();
+ if (!address.equals(nodeEngine.getThisAddress())) {
+ sendConnectorJarToMemberNode(
+ connectorJarIdentifier, connectorJar,
address);
+ }
+ });
return connectorJarIdentifier;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/IsolatedConnectorJarStorageStrategy.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/IsolatedConnectorJarStorageStrategy.java
similarity index 81%
rename from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/IsolatedConnectorJarStorageStrategy.java
rename to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/IsolatedConnectorJarStorageStrategy.java
index dc1f2b0ecf..b963827b5f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/IsolatedConnectorJarStorageStrategy.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/IsolatedConnectorJarStorageStrategy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.master;
+package org.apache.seatunnel.engine.server.service.jar;
import
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
import org.apache.seatunnel.engine.core.job.CommonPluginJar;
@@ -45,29 +45,20 @@ public class IsolatedConnectorJarStorageStrategy extends
AbstractConnectorJarSto
return ConnectorJarIdentifier.of(connectorJar,
storageFile.toString());
}
Optional<Path> optional =
storageConnectorJarFileInternal(connectorJar, storageFile);
- ConnectorJarIdentifier connectorJarIdentifier =
- optional.isPresent()
- ? ConnectorJarIdentifier.of(connectorJar,
optional.get().toString())
- : ConnectorJarIdentifier.of(connectorJar, "");
- return connectorJarIdentifier;
+ return optional.map(path -> ConnectorJarIdentifier.of(connectorJar,
path.toString()))
+ .orElseGet(() -> ConnectorJarIdentifier.of(connectorJar, ""));
}
@Override
public boolean checkConnectorJarExisted(long jobId, ConnectorJar
connectorJar) {
File storageFile = getStorageLocation(jobId, connectorJar);
- if (storageFile.exists()) {
- return true;
- }
- return false;
+ return storageFile.exists();
}
@Override
public void cleanUpWhenJobFinished(
long jobId, List<ConnectorJarIdentifier>
connectorJarIdentifierList) {
- connectorJarIdentifierList.forEach(
- connectorJarIdentifier -> {
- deleteConnectorJar(connectorJarIdentifier);
- });
+ connectorJarIdentifierList.forEach(this::deleteConnectorJar);
}
@Override
@@ -96,9 +87,4 @@ public class IsolatedConnectorJarStorageStrategy extends
AbstractConnectorJarSto
connectorJar.getFileName());
}
}
-
- @Override
- public byte[] readConnectorJarByteData(File connectorJarFile) {
- return readConnectorJarByteDataInternal(connectorJarFile);
- }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/ServerConnectorPackageClient.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ServerConnectorPackageClient.java
similarity index 99%
rename from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/ServerConnectorPackageClient.java
rename to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ServerConnectorPackageClient.java
index 1fdba681b3..ed782991f6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/ServerConnectorPackageClient.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/ServerConnectorPackageClient.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.task;
+package org.apache.seatunnel.engine.server.service.jar;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/SharedConnectorJarCleanupTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/SharedConnectorJarCleanupTask.java
similarity index 92%
rename from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/SharedConnectorJarCleanupTask.java
rename to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/SharedConnectorJarCleanupTask.java
index 26c6c4b806..64770f4b56 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/SharedConnectorJarCleanupTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/SharedConnectorJarCleanupTask.java
@@ -15,12 +15,11 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.master;
+package org.apache.seatunnel.engine.server.service.jar;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.core.job.RefCount;
-import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
import java.util.Iterator;
@@ -35,17 +34,13 @@ Cleanup task for shared connector jar package.
*/
public class SharedConnectorJarCleanupTask extends TimerTask {
- private final ILogger LOGGER;
-
private final Consumer<ConnectorJarIdentifier> cleanupCallback;
private final IMap<ConnectorJarIdentifier, RefCount>
connectorJarRefCounters;
public SharedConnectorJarCleanupTask(
- ILogger LOGGER,
Consumer<ConnectorJarIdentifier> cleanupCallback,
IMap<ConnectorJarIdentifier, RefCount> connectorJarRefCounters) {
- this.LOGGER = checkNotNull(LOGGER);
this.cleanupCallback = checkNotNull(cleanupCallback);
this.connectorJarRefCounters = checkNotNull(connectorJarRefCounters);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/SharedConnectorJarStorageStrategy.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/SharedConnectorJarStorageStrategy.java
similarity index 83%
rename from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/SharedConnectorJarStorageStrategy.java
rename to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/SharedConnectorJarStorageStrategy.java
index e66fdd56e8..c2d88fcbb2 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/SharedConnectorJarStorageStrategy.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/SharedConnectorJarStorageStrategy.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.master;
+package org.apache.seatunnel.engine.server.service.jar;
import org.apache.seatunnel.engine.common.Constant;
import
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
@@ -33,7 +33,6 @@ import java.util.List;
import java.util.Timer;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.BiFunction;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
@@ -56,12 +55,12 @@ public class SharedConnectorJarStorageStrategy extends
AbstractConnectorJarStora
this.readWriteLock = new ReentrantReadWriteLock();
this.connectorJarRefCounters =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_CONNECTOR_JAR_REF_COUNTERS);
- // Initializing the clean up task
+ // Initializing the cleanup task
this.cleanupTimer = new Timer(true);
this.cleanupInterval =
connectorJarStorageConfig.getCleanupTaskInterval() * 1000;
this.cleanupTimer.schedule(
new SharedConnectorJarCleanupTask(
- LOGGER, this::deleteConnectorJar,
connectorJarRefCounters),
+ this::deleteConnectorJar, connectorJarRefCounters),
cleanupInterval,
cleanupInterval);
}
@@ -95,10 +94,7 @@ public class SharedConnectorJarStorageStrategy extends
AbstractConnectorJarStora
ConnectorJarIdentifier.of(
connectorJar, getStorageLocationPath(jobId,
connectorJar));
RefCount refCount =
connectorJarRefCounters.get(connectorJarIdentifier);
- if (refCount != null) {
- return true;
- }
- return false;
+ return refCount != null;
}
public void increaseRefCountForConnectorJar(ConnectorJarIdentifier
connectorJarIdentifier) {
@@ -145,35 +141,18 @@ public class SharedConnectorJarStorageStrategy extends
AbstractConnectorJarStora
@Override
public void cleanUpWhenJobFinished(
long jobId, List<ConnectorJarIdentifier>
connectorJarIdentifierList) {
- connectorJarIdentifierList.forEach(
- connectorJarIdentifier -> {
- decreaseConnectorJarRefCount(connectorJarIdentifier);
- });
+ connectorJarIdentifierList.forEach(this::decreaseConnectorJarRefCount);
}
public void decreaseConnectorJarRefCount(ConnectorJarIdentifier
connectorJarIdentifier) {
connectorJarRefCounters.compute(
connectorJarIdentifier,
- new BiFunction<ConnectorJarIdentifier, RefCount, RefCount>() {
- @Override
- public RefCount apply(
- ConnectorJarIdentifier connectorJarIdentifier,
RefCount refCount) {
- if (refCount != null) {
- Long references = refCount.getReferences();
- refCount.setReferences(--references);
- }
- return refCount;
+ (connectorJarIdentifier1, refCount) -> {
+ if (refCount != null) {
+ Long references = refCount.getReferences();
+ refCount.setReferences(--references);
}
+ return refCount;
});
}
-
- @Override
- public byte[] readConnectorJarByteData(File connectorJarFile) {
- readWriteLock.readLock().lock();
- try {
- return readConnectorJarByteDataInternal(connectorJarFile);
- } finally {
- readWriteLock.readLock().unlock();
- }
- }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/StorageStrategyFactory.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/StorageStrategyFactory.java
similarity index 97%
rename from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/StorageStrategyFactory.java
rename to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/StorageStrategyFactory.java
index 9aed0a564a..f0fe4d9b84 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/StorageStrategyFactory.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/service/jar/StorageStrategyFactory.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.master;
+package org.apache.seatunnel.engine.server.service.jar;
import
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
import
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageMode;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeleteConnectorJarInExecutionNode.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeleteConnectorJarInExecutionNode.java
index 87e572c366..52b211f15c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeleteConnectorJarInExecutionNode.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/DeleteConnectorJarInExecutionNode.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.engine.server.task.operation;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.ServerConnectorPackageClient;
+import
org.apache.seatunnel.engine.server.service.jar.ServerConnectorPackageClient;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/SendConnectorJarToMemberNodeOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/SendConnectorJarToMemberNodeOperation.java
index 836f09cffe..94d767cb07 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/SendConnectorJarToMemberNodeOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/SendConnectorJarToMemberNodeOperation.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.engine.core.job.ConnectorJar;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
-import org.apache.seatunnel.engine.server.task.ServerConnectorPackageClient;
+import
org.apache.seatunnel.engine.server.service.jar.ServerConnectorPackageClient;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java
index ba4da88a1d..a5c0569d60 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java
@@ -45,7 +45,7 @@ import
org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
-import org.apache.seatunnel.engine.server.master.ConnectorPackageService;
+import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;