This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 50f0c5fa3e [Improve][Zeta] Split classloader in config parse phase 
(#8193)
50f0c5fa3e is described below

commit 50f0c5fa3e2491c3d72d051c01f9b1a7b9bc8907
Author: Jia Fan <[email protected]>
AuthorDate: Tue Dec 3 20:20:07 2024 +0800

    [Improve][Zeta] Split classloader in config parse phase (#8193)
---
 .../apache/seatunnel/api/common/JobContext.java    |  31 +-----
 .../client/MultipleTableJobConfigParserTest.java   |  71 ++++++++++++++
 .../classloader/DefaultClassLoaderService.java     |   6 +-
 .../core/parse/MultipleTableJobConfigParser.java   | 105 ++++++++++-----------
 4 files changed, 127 insertions(+), 86 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
index 749becd369..0456233060 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
@@ -17,25 +17,16 @@
 
 package org.apache.seatunnel.api.common;
 
-import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.constants.JobMode;
 
 import java.io.Serializable;
-import java.util.Map;
-import java.util.Optional;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 
-/** This class is used to store the context of the job. e.g. the table schema, 
catalog...etc. */
+/** This class is used to store the context of the job. e.g. the job id, job 
mode ...etc. */
 public final class JobContext implements Serializable {
 
     private static final long serialVersionUID = -1L;
 
-    // tableName -> tableSchema
-    private final Map<String, TableSchema> tableSchemaMap =
-            new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
-
     private JobMode jobMode;
 
     private final String jobId;
@@ -48,26 +39,6 @@ public final class JobContext implements Serializable {
         this.jobId = jobId + "";
     }
 
-    /**
-     * Put table schema.
-     *
-     * @param tableName table name
-     * @param tableSchema table schema
-     */
-    public void addSchema(String tableName, TableSchema tableSchema) {
-        tableSchemaMap.put(tableName, tableSchema);
-    }
-
-    /**
-     * Get table schema.
-     *
-     * @param tableName table name.
-     * @return table schema.
-     */
-    public Optional<TableSchema> getSchema(String tableName) {
-        return Optional.ofNullable(tableSchemaMap.get(tableName));
-    }
-
     public JobContext setJobMode(JobMode jobMode) {
         this.jobMode = jobMode;
         return this;
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
index abc81903de..eb7f5ca215 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
@@ -20,11 +20,14 @@ package org.apache.seatunnel.engine.client;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
 import org.apache.seatunnel.engine.common.config.JobConfig;
+import 
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
@@ -34,12 +37,17 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import scala.Tuple2;
+
 import java.io.IOException;
 import java.net.URL;
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class MultipleTableJobConfigParserTest {
 
@@ -147,4 +155,67 @@ public class MultipleTableJobConfigParserTest {
         Assertions.assertEquals("Transform[0]-sql", 
actions.get(0).getUpstream().get(0).getName());
         Assertions.assertEquals("Transform[1]-sql", 
actions.get(1).getUpstream().get(0).getName());
     }
+
+    @Test
+    public void testCreateDifferentClassLoader() {
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath = 
TestUtils.getResource("/batch_fakesource_to_file.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setJobContext(new JobContext(System.currentTimeMillis()));
+        final ClassLoader[] classLoaders = new ClassLoader[3];
+        MultipleTableJobConfigParser jobConfigParser =
+                new MultipleTableJobConfigParser(filePath, new IdGenerator(), 
jobConfig) {
+                    @Override
+                    public Tuple2<String, List<Tuple2<CatalogTable, Action>>> 
parseSource(
+                            int configIndex, Config sourceConfig, ClassLoader 
classLoader) {
+                        classLoaders[0] = classLoader;
+                        return super.parseSource(configIndex, sourceConfig, 
classLoader);
+                    }
+
+                    @Override
+                    public void parseTransforms(
+                            List<? extends Config> transformConfigs,
+                            ClassLoader classLoader,
+                            LinkedHashMap<String, List<Tuple2<CatalogTable, 
Action>>>
+                                    tableWithActionMap) {
+                        classLoaders[1] = classLoader;
+                        super.parseTransforms(transformConfigs, classLoader, 
tableWithActionMap);
+                    }
+
+                    @Override
+                    public List<SinkAction<?, ?, ?, ?>> parseSink(
+                            int configIndex,
+                            Config sinkConfig,
+                            ClassLoader classLoader,
+                            LinkedHashMap<String, List<Tuple2<CatalogTable, 
Action>>>
+                                    tableWithActionMap) {
+                        classLoaders[2] = classLoader;
+                        return super.parseSink(
+                                configIndex, sinkConfig, classLoader, 
tableWithActionMap);
+                    }
+                };
+        AtomicInteger getClassLoaderTimes = new AtomicInteger();
+        AtomicInteger releaseClassLoaderTimes = new AtomicInteger();
+        jobConfigParser.parse(
+                new ClassLoaderService() {
+                    @Override
+                    public ClassLoader getClassLoader(long jobId, 
Collection<URL> jars) {
+                        getClassLoaderTimes.getAndIncrement();
+                        return new SeaTunnelChildFirstClassLoader(jars);
+                    }
+
+                    @Override
+                    public void releaseClassLoader(long jobId, Collection<URL> 
jars) {
+                        releaseClassLoaderTimes.getAndIncrement();
+                    }
+
+                    @Override
+                    public void close() {}
+                });
+        Assertions.assertEquals(2, getClassLoaderTimes.get());
+        Assertions.assertEquals(2, releaseClassLoaderTimes.get());
+        Assertions.assertEquals(classLoaders[0], classLoaders[1]);
+        Assertions.assertNotEquals(classLoaders[0], classLoaders[2]);
+        Assertions.assertNotEquals(classLoaders[1], classLoaders[2]);
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java
index 5b36e7d9de..3d3fabe973 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java
@@ -144,7 +144,7 @@ public class DefaultClassLoaderService implements 
ClassLoaderService {
 
     /** Only for test */
     @VisibleForTesting
-    Optional<ClassLoader> queryClassLoaderById(long jobId, Collection<URL> 
jars) {
+    public Optional<ClassLoader> queryClassLoaderById(long jobId, 
Collection<URL> jars) {
         if (cacheMode) {
             // with cache mode, all jobs share the same classloader if the 
jars are the same
             jobId = 1L;
@@ -162,7 +162,7 @@ public class DefaultClassLoaderService implements 
ClassLoaderService {
 
     /** Only for test */
     @VisibleForTesting
-    int queryClassLoaderReferenceCount(long jobId, Collection<URL> jars) {
+    public int queryClassLoaderReferenceCount(long jobId, Collection<URL> 
jars) {
         if (cacheMode) {
             // with cache mode, all jobs share the same classloader if the 
jars are the same
             jobId = 1L;
@@ -180,7 +180,7 @@ public class DefaultClassLoaderService implements 
ClassLoaderService {
 
     /** Only for test */
     @VisibleForTesting
-    int queryClassLoaderCount() {
+    public int queryClassLoaderCount() {
         AtomicInteger count = new AtomicInteger();
         classLoaderCache.values().forEach(map -> count.addAndGet(map.size()));
         return count.get();
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 4bdca5b875..22abdfbffd 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -44,6 +44,7 @@ import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.constants.CollectionConstants;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -198,22 +199,25 @@ public class MultipleTableJobConfigParser {
                 TypesafeConfigUtils.getConfigList(
                         seaTunnelJobConfig, "sink", Collections.emptyList());
 
-        List<URL> connectorJars = getConnectorJarList(sourceConfigs, 
transformConfigs, sinkConfigs);
-        if (!commonPluginJars.isEmpty()) {
-            connectorJars.addAll(commonPluginJars);
-        }
+        List<URL> sourceConnectorJars = getConnectorJarList(sourceConfigs, 
PluginType.SOURCE);
+        List<URL> transformConnectorJars =
+                getConnectorJarList(transformConfigs, PluginType.TRANSFORM);
+        List<URL> sinkConnectorJars = getConnectorJarList(sinkConfigs, 
PluginType.SINK);
         ClassLoader parentClassLoader = 
Thread.currentThread().getContextClassLoader();
 
-        ClassLoader classLoader;
-        if (classLoaderService == null) {
-            classLoader = new SeaTunnelChildFirstClassLoader(connectorJars, 
parentClassLoader);
-        } else {
-            classLoader =
-                    classLoaderService.getClassLoader(
-                            
Long.parseLong(jobConfig.getJobContext().getJobId()), connectorJars);
-        }
+        // source and transform use the same classloader
+        List<URL> sourceJars =
+                Stream.of(sourceConnectorJars, transformConnectorJars)
+                        .flatMap(Collection::stream)
+                        .distinct()
+                        .collect(Collectors.toList());
+        ClassLoader sourceAndTransformClassLoader =
+                getClassLoader(classLoaderService, parentClassLoader, 
sourceJars);
+        ClassLoader sinkClassLoader =
+                getClassLoader(classLoaderService, parentClassLoader, 
sinkConnectorJars);
+
         try {
-            Thread.currentThread().setContextClassLoader(classLoader);
+            
Thread.currentThread().setContextClassLoader(sourceAndTransformClassLoader);
             ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs, 
sinkConfigs);
             LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>> 
tableWithActionMap =
                     new LinkedHashMap<>();
@@ -229,19 +233,20 @@ public class MultipleTableJobConfigParser {
             for (int configIndex = 0; configIndex < sourceConfigs.size(); 
configIndex++) {
                 Config sourceConfig = sourceConfigs.get(configIndex);
                 Tuple2<String, List<Tuple2<CatalogTable, Action>>> tuple2 =
-                        parseSource(configIndex, sourceConfig, classLoader);
+                        parseSource(configIndex, sourceConfig, 
sourceAndTransformClassLoader);
                 tableWithActionMap.put(tuple2._1(), tuple2._2());
             }
 
             log.info("start generating all transforms.");
-            parseTransforms(transformConfigs, classLoader, tableWithActionMap);
+            parseTransforms(transformConfigs, sourceAndTransformClassLoader, 
tableWithActionMap);
 
+            Thread.currentThread().setContextClassLoader(sinkClassLoader);
             log.info("start generating all sinks.");
             List<Action> sinkActions = new ArrayList<>();
             for (int configIndex = 0; configIndex < sinkConfigs.size(); 
configIndex++) {
                 Config sinkConfig = sinkConfigs.get(configIndex);
                 sinkActions.addAll(
-                        parseSink(configIndex, sinkConfig, classLoader, 
tableWithActionMap));
+                        parseSink(configIndex, sinkConfig, sinkClassLoader, 
tableWithActionMap));
             }
             Set<URL> factoryUrls = getUsedFactoryUrls(sinkActions);
             return new ImmutablePair<>(sinkActions, factoryUrls);
@@ -249,55 +254,49 @@ public class MultipleTableJobConfigParser {
             Thread.currentThread().setContextClassLoader(parentClassLoader);
             if (classLoaderService != null) {
                 classLoaderService.releaseClassLoader(
-                        Long.parseLong(jobConfig.getJobContext().getJobId()), 
connectorJars);
+                        Long.parseLong(jobConfig.getJobContext().getJobId()), 
sourceJars);
+                classLoaderService.releaseClassLoader(
+                        Long.parseLong(jobConfig.getJobContext().getJobId()), 
sinkConnectorJars);
             }
         }
     }
 
+    private ClassLoader getClassLoader(
+            ClassLoaderService classLoaderService,
+            ClassLoader parentClassLoader,
+            List<URL> connectorJars) {
+        ClassLoader classLoader;
+        if (classLoaderService == null) {
+            classLoader = new SeaTunnelChildFirstClassLoader(connectorJars, 
parentClassLoader);
+        } else {
+            classLoader =
+                    classLoaderService.getClassLoader(
+                            
Long.parseLong(jobConfig.getJobContext().getJobId()), connectorJars);
+        }
+        return classLoader;
+    }
+
     public Set<URL> getUsedFactoryUrls(List<Action> sinkActions) {
         Set<URL> urls = new HashSet<>();
         fillUsedFactoryUrls(sinkActions, urls);
         return urls;
     }
 
-    private List<URL> getConnectorJarList(
-            List<? extends Config> sourceConfigs,
-            List<? extends Config> transformConfigs,
-            List<? extends Config> sinkConfigs) {
+    private List<URL> getConnectorJarList(List<? extends Config> configs, 
PluginType type) {
         List<PluginIdentifier> factoryIds =
-                Stream.concat(
-                                Stream.concat(
-                                        sourceConfigs.stream()
-                                                
.map(ConfigParserUtil::getFactoryId)
-                                                .map(
-                                                        factory ->
-                                                                
PluginIdentifier.of(
-                                                                        
CollectionConstants
-                                                                               
 .SEATUNNEL_PLUGIN,
-                                                                        
CollectionConstants
-                                                                               
 .SOURCE_PLUGIN,
-                                                                        
factory)),
-                                        transformConfigs.stream()
-                                                
.map(ConfigParserUtil::getFactoryId)
-                                                .map(
-                                                        factory ->
-                                                                
PluginIdentifier.of(
-                                                                        
CollectionConstants
-                                                                               
 .SEATUNNEL_PLUGIN,
-                                                                        
CollectionConstants
-                                                                               
 .TRANSFORM_PLUGIN,
-                                                                        
factory))),
-                                sinkConfigs.stream()
-                                        .map(ConfigParserUtil::getFactoryId)
-                                        .map(
-                                                factory ->
-                                                        PluginIdentifier.of(
-                                                                
CollectionConstants
-                                                                        
.SEATUNNEL_PLUGIN,
-                                                                
CollectionConstants.SINK_PLUGIN,
-                                                                factory)))
+                configs.stream()
+                        .map(ConfigParserUtil::getFactoryId)
+                        .map(
+                                factory ->
+                                        PluginIdentifier.of(
+                                                
CollectionConstants.SEATUNNEL_PLUGIN,
+                                                type.getType(),
+                                                factory))
                         .collect(Collectors.toList());
-        return new 
SeaTunnelSinkPluginDiscovery().getPluginJarPaths(factoryIds);
+        List<URL> jarPaths = new ArrayList<>();
+        jarPaths.addAll(new 
SeaTunnelSinkPluginDiscovery().getPluginJarPaths(factoryIds));
+        jarPaths.addAll(commonPluginJars);
+        return jarPaths;
     }
 
     private void fillUsedFactoryUrls(List<Action> actions, Set<URL> result) {

Reply via email to