Hisoka-X commented on code in PR #5542:
URL: https://github.com/apache/seatunnel/pull/5542#discussion_r1369999130
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -253,4 +257,63 @@ private void printExecutionInfo() {
coordinatorService.printJobDetailInfo();
}
}
+
+ public SeaTunnelConfig getSeaTunnelConfig() {
+ return seaTunnelConfig;
+ }
+
+ public NodeEngineImpl getNodeEngine() {
+ return nodeEngine;
+ }
+
+ public ConnectorPackageService getConnectorPackageService() {
+ int retryCount = 0;
+ if (isMasterNode()) {
+ // The hazelcast operator request invocation will retry, We must
wait enough time to
+ // wait the invocation return.
+ String hazelcastInvocationMaxRetry =
+ seaTunnelConfig
+ .getHazelcastConfig()
+
.getProperty("hazelcast.invocation.max.retry.count");
Review Comment:
Please use reference not hard code config. You can refer
https://github.com/apache/seatunnel/pull/5618
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -253,4 +257,63 @@ private void printExecutionInfo() {
coordinatorService.printJobDetailInfo();
}
}
+
+ public SeaTunnelConfig getSeaTunnelConfig() {
+ return seaTunnelConfig;
+ }
+
+ public NodeEngineImpl getNodeEngine() {
+ return nodeEngine;
+ }
+
+ public ConnectorPackageService getConnectorPackageService() {
Review Comment:
Why not put it into `CoordinatorService` since connectorPackageService only
work on master node.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java:
##########
@@ -253,4 +257,63 @@ private void printExecutionInfo() {
coordinatorService.printJobDetailInfo();
}
}
+
+ public SeaTunnelConfig getSeaTunnelConfig() {
+ return seaTunnelConfig;
+ }
+
+ public NodeEngineImpl getNodeEngine() {
+ return nodeEngine;
+ }
+
+ public ConnectorPackageService getConnectorPackageService() {
+ int retryCount = 0;
+ if (isMasterNode()) {
+ // The hazelcast operator request invocation will retry, We must
wait enough time to
+ // wait the invocation return.
+ String hazelcastInvocationMaxRetry =
+ seaTunnelConfig
+ .getHazelcastConfig()
+
.getProperty("hazelcast.invocation.max.retry.count");
+ int maxRetry =
+ hazelcastInvocationMaxRetry == null
+ ? 250 * 2
+ : Integer.parseInt(hazelcastInvocationMaxRetry) *
2;
+
+ String hazelcastRetryPause =
+ seaTunnelConfig
+ .getHazelcastConfig()
+
.getProperty("hazelcast.invocation.retry.pause.millis");
Review Comment:
ditto
##########
seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ConnectorPackageClient.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client.job;
+
+import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
+import org.apache.seatunnel.engine.common.utils.MDUtil;
+import org.apache.seatunnel.engine.core.job.ConnectorJar;
+import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.core.job.ConnectorJarType;
+import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelUploadConnectorJarCodec;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+public class ConnectorPackageClient {
+
+ private static final ILogger LOGGER =
Logger.getLogger(ConnectorPackageClient.class);
+
+ private final SeaTunnelHazelcastClient hazelcastClient;
+
+ public ConnectorPackageClient(SeaTunnelHazelcastClient hazelcastClient) {
+ checkNotNull(hazelcastClient);
+ this.hazelcastClient = hazelcastClient;
+ }
+
+ public Set<ConnectorJarIdentifier> uploadCommonPluginJars(
+ long jobId, List<URL> commonPluginJars) {
+ Set<ConnectorJarIdentifier> connectorJarIdentifiers = new HashSet<>();
+ // Upload commonPluginJar
+ for (URL commonPluginJar : commonPluginJars) {
+ // handle the local file path
+ // origin path :
/${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar ->
+ // handled path :
${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar
+ Path path = Paths.get(commonPluginJar.getPath().substring(1));
+ // Obtain the directory name of the relative location of the file
path.
+ // for example, The path is
+ //
${SEATUNNEL_HOME}/plugins/Jdbc/lib/mysql-connector-java-5.1.32.jar, so the name
+ // obtained here is the connector plugin name : JDBC
Review Comment:
So. You still don't fix this problem.
##########
seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/ConnectorPackageServiceTest.java:
##########
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
+import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
+import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.common.utils.MDUtil;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
+import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment;
+import org.apache.seatunnel.engine.core.job.ConnectorJar;
+import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.core.job.ConnectorJarType;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.job.PipelineStatus;
+import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
+import org.apache.seatunnel.engine.server.master.ConnectorPackageService;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.internal.serialization.Data;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static
org.apache.seatunnel.engine.core.job.AbstractJobEnvironment.getJarUrlsFromIdentifiers;
+import static org.awaitility.Awaitility.await;
+
+public class ConnectorPackageServiceTest extends AbstractSeaTunnelServerTest {
+
+ @Test
+ public void testMasterNodeActive() {
+ HazelcastInstanceImpl instance1 =
+ SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(
+
"ConnectorPackageServiceTest_testMasterNodeActive"));
+ HazelcastInstanceImpl instance2 =
+ SeaTunnelServerStarter.createHazelcastInstance(
+ TestUtils.getClusterName(
+
"ConnectorPackageServiceTest_testMasterNodeActive"));
+
+ SeaTunnelServer server1 =
+
instance1.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
+ SeaTunnelServer server2 =
+
instance2.node.getNodeEngine().getService(SeaTunnelServer.SERVICE_NAME);
+
+ Assertions.assertTrue(server1.isMasterNode());
+ ConnectorPackageService connectorPackageService1 =
server1.getConnectorPackageService();
+
Assertions.assertTrue(connectorPackageService1.isConnectorPackageServiceActive());
+
Review Comment:
So, even disable upload jar feature, the connectorPackageService instance
will be created? I think we create it only when this feature enabled.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -259,6 +259,20 @@ private void restoreJobFromMasterActiveSwitch(@NonNull
Long jobId, @NonNull JobI
runningJobInfoIMap.remove(jobId);
return;
}
+ // Data jobImmutableInformationData =
jobInfo.getJobImmutableInformation();
+ // JobImmutableInformation jobImmutableInformation =
+ //
+ //
nodeEngine.getSerializationService().toObject(jobImmutableInformationData);
+ // List<ConnectorJarIdentifier> pluginJarIdentifiers =
+ // jobImmutableInformation.getPluginJarIdentifiers();
+ // pluginJarIdentifiers.forEach(
+ // pluginJarIdentifier -> {
+ // String storagePath =
pluginJarIdentifier.getStoragePath();
+ // if (!new File(storagePath).exists()) {
+ //
connectorPackageHAStorage.downloadConnectorJar(jobId,
+ // pluginJarIdentifier);
+ // }
+ // });
Review Comment:
Any reason?
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java:
##########
@@ -113,6 +116,7 @@ public PhysicalVertex(
int pipelineId,
int totalPipelineNum,
Set<URL> pluginJarsUrls,
+ Set<ConnectorJarIdentifier> connectorJarIdentifiers,
Review Comment:
ditto
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/TaskGroupImmutableInformation.java:
##########
@@ -40,6 +41,8 @@ public class TaskGroupImmutableInformation implements
IdentifiedDataSerializable
private Set<URL> jars;
+ private Set<ConnectorJarIdentifier> connectorJarIdentifiers;
Review Comment:
Why we need maintain two different jars collection? It looks bad.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java:
##########
@@ -252,9 +259,25 @@ public TaskDeployState deployTask(@NonNull
TaskGroupImmutableInformation taskImm
taskImmutableInfo.getExecutionId()));
TaskGroup taskGroup = null;
try {
+ Set<ConnectorJarIdentifier> connectorJarIdentifiers =
+ taskImmutableInfo.getConnectorJarIdentifiers();
Set<URL> jars = taskImmutableInfo.getJars();
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
- if (!CollectionUtils.isEmpty(jars)) {
+ if (!CollectionUtils.isEmpty(connectorJarIdentifiers)) {
+ // Prioritize obtaining the jar package file required for the
current task execution
+ // from the local,
+ // if it does not exist locally, it will be downloaded from
the master node.
Review Comment:
```suggestion
// Prioritize obtaining the jar package file required for
the current task execution
// from the local, if it does not exist locally, it will be
downloaded from the master node.
```
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/AbstractConnectorJarStorageStrategy.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.master;
+
+import org.apache.seatunnel.engine.common.config.SeaTunnelProperties;
+import
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
+import org.apache.seatunnel.engine.core.job.ConnectorJar;
+import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import
org.apache.seatunnel.engine.server.task.operation.DeleteConnectorJarInExecutionNode;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.cluster.Member;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collection;
+
+import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+public abstract class AbstractConnectorJarStorageStrategy implements
ConnectorJarStorageStrategy {
+
+ protected static final ILogger LOGGER =
+ Logger.getLogger(AbstractConnectorJarStorageStrategy.class);
+
+ protected static final String COMMON_PLUGIN_JAR_STORAGE_PATH = "/plugins";
+
+ protected static final String CONNECTOR_PLUGIN_JAR_STORAGE_PATH =
"/connectors/seatunnel";
+
+ protected String storageDir;
+
+ protected final ConnectorJarStorageConfig connectorJarStorageConfig;
+
+ protected final SeaTunnelServer seaTunnelServer;
+
+ protected final NodeEngineImpl nodeEngine;
+
+ public AbstractConnectorJarStorageStrategy(
+ ConnectorJarStorageConfig connectorJarStorageConfig,
SeaTunnelServer seaTunnelServer) {
+ this.seaTunnelServer = seaTunnelServer;
+ this.nodeEngine = seaTunnelServer.getNodeEngine();
+ checkNotNull(connectorJarStorageConfig);
+ this.connectorJarStorageConfig = connectorJarStorageConfig;
+ this.storageDir = getConnectorJarStorageDir();
+ }
+
+ @Override
+ public File getStorageLocation(long jobId, ConnectorJar connectorJar) {
+ checkNotNull(jobId);
+ File file = new File(getStorageLocationPath(jobId, connectorJar));
+ try {
+ Files.createDirectories(file.getParentFile().toPath());
+ } catch (IOException e) {
+ LOGGER.warning(
+ String.format(
+ "The creation of directories : %s for the
connector jar storage path has failed.",
+ file.getParentFile().toPath().toString()));
+ }
+ return file;
+ }
+
+ @Override
+ public ConnectorJarIdentifier getConnectorJarIdentifier(long jobId,
ConnectorJar connectorJar) {
+ return ConnectorJarIdentifier.of(connectorJar,
getStorageLocationPath(jobId, connectorJar));
+ }
+
+ @Override
+ public Path storageConnectorJarFileInternal(ConnectorJar connectorJar,
File storageFile) {
Review Comment:
Please use `Optional<Path>` as return cause you may return null.
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -259,6 +259,20 @@ private void restoreJobFromMasterActiveSwitch(@NonNull
Long jobId, @NonNull JobI
runningJobInfoIMap.remove(jobId);
return;
}
+ // Data jobImmutableInformationData =
jobInfo.getJobImmutableInformation();
+ // JobImmutableInformation jobImmutableInformation =
+ //
+ //
nodeEngine.getSerializationService().toObject(jobImmutableInformationData);
+ // List<ConnectorJarIdentifier> pluginJarIdentifiers =
+ // jobImmutableInformation.getPluginJarIdentifiers();
+ // pluginJarIdentifiers.forEach(
+ // pluginJarIdentifier -> {
+ // String storagePath =
pluginJarIdentifier.getStoragePath();
+ // if (!new File(storagePath).exists()) {
+ //
connectorPackageHAStorage.downloadConnectorJar(jobId,
+ // pluginJarIdentifier);
+ // }
+ // });
Review Comment:
Any reason?
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java:
##########
@@ -96,6 +96,7 @@ private void handleSubmitJob(HttpPostCommand httpPostCommand,
String uri)
Config config = RestUtil.buildConfig(requestHandle(httpPostCommand));
JobConfig jobConfig = new JobConfig();
jobConfig.setName(requestParams.get(RestConstant.JOB_NAME));
+ CoordinatorService coordinatorService =
getSeaTunnelServer().getCoordinatorService();
Review Comment:
Any reason for move this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]