This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 50f0c5fa3e [Improve][Zeta] Split classloader in config parse phase
(#8193)
50f0c5fa3e is described below
commit 50f0c5fa3e2491c3d72d051c01f9b1a7b9bc8907
Author: Jia Fan <[email protected]>
AuthorDate: Tue Dec 3 20:20:07 2024 +0800
[Improve][Zeta] Split classloader in config parse phase (#8193)
---
.../apache/seatunnel/api/common/JobContext.java | 31 +-----
.../client/MultipleTableJobConfigParserTest.java | 71 ++++++++++++++
.../classloader/DefaultClassLoaderService.java | 6 +-
.../core/parse/MultipleTableJobConfigParser.java | 105 ++++++++++-----------
4 files changed, 127 insertions(+), 86 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
index 749becd369..0456233060 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/JobContext.java
@@ -17,25 +17,16 @@
package org.apache.seatunnel.api.common;
-import org.apache.seatunnel.api.table.catalog.TableSchema;
-import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.constants.JobMode;
import java.io.Serializable;
-import java.util.Map;
-import java.util.Optional;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-/** This class is used to store the context of the job. e.g. the table schema,
catalog...etc. */
+/** This class is used to store the context of the job. e.g. the job id, job
mode ...etc. */
public final class JobContext implements Serializable {
private static final long serialVersionUID = -1L;
- // tableName -> tableSchema
- private final Map<String, TableSchema> tableSchemaMap =
- new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
-
private JobMode jobMode;
private final String jobId;
@@ -48,26 +39,6 @@ public final class JobContext implements Serializable {
this.jobId = jobId + "";
}
- /**
- * Put table schema.
- *
- * @param tableName table name
- * @param tableSchema table schema
- */
- public void addSchema(String tableName, TableSchema tableSchema) {
- tableSchemaMap.put(tableName, tableSchema);
- }
-
- /**
- * Get table schema.
- *
- * @param tableName table name.
- * @return table schema.
- */
- public Optional<TableSchema> getSchema(String tableName) {
- return Optional.ofNullable(tableSchemaMap.get(tableName));
- }
-
public JobContext setJobMode(JobMode jobMode) {
this.jobMode = jobMode;
return this;
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
index abc81903de..eb7f5ca215 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/MultipleTableJobConfigParserTest.java
@@ -20,11 +20,14 @@ package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.JobContext;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
@@ -34,12 +37,17 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import scala.Tuple2;
+
import java.io.IOException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
public class MultipleTableJobConfigParserTest {
@@ -147,4 +155,67 @@ public class MultipleTableJobConfigParserTest {
Assertions.assertEquals("Transform[0]-sql",
actions.get(0).getUpstream().get(0).getName());
Assertions.assertEquals("Transform[1]-sql",
actions.get(1).getUpstream().get(0).getName());
}
+
+ @Test
+ public void testCreateDifferentClassLoader() {
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath =
TestUtils.getResource("/batch_fakesource_to_file.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setJobContext(new JobContext(System.currentTimeMillis()));
+ final ClassLoader[] classLoaders = new ClassLoader[3];
+ MultipleTableJobConfigParser jobConfigParser =
+ new MultipleTableJobConfigParser(filePath, new IdGenerator(),
jobConfig) {
+ @Override
+ public Tuple2<String, List<Tuple2<CatalogTable, Action>>>
parseSource(
+ int configIndex, Config sourceConfig, ClassLoader
classLoader) {
+ classLoaders[0] = classLoader;
+ return super.parseSource(configIndex, sourceConfig,
classLoader);
+ }
+
+ @Override
+ public void parseTransforms(
+ List<? extends Config> transformConfigs,
+ ClassLoader classLoader,
+ LinkedHashMap<String, List<Tuple2<CatalogTable,
Action>>>
+ tableWithActionMap) {
+ classLoaders[1] = classLoader;
+ super.parseTransforms(transformConfigs, classLoader,
tableWithActionMap);
+ }
+
+ @Override
+ public List<SinkAction<?, ?, ?, ?>> parseSink(
+ int configIndex,
+ Config sinkConfig,
+ ClassLoader classLoader,
+ LinkedHashMap<String, List<Tuple2<CatalogTable,
Action>>>
+ tableWithActionMap) {
+ classLoaders[2] = classLoader;
+ return super.parseSink(
+ configIndex, sinkConfig, classLoader,
tableWithActionMap);
+ }
+ };
+ AtomicInteger getClassLoaderTimes = new AtomicInteger();
+ AtomicInteger releaseClassLoaderTimes = new AtomicInteger();
+ jobConfigParser.parse(
+ new ClassLoaderService() {
+ @Override
+ public ClassLoader getClassLoader(long jobId,
Collection<URL> jars) {
+ getClassLoaderTimes.getAndIncrement();
+ return new SeaTunnelChildFirstClassLoader(jars);
+ }
+
+ @Override
+ public void releaseClassLoader(long jobId, Collection<URL>
jars) {
+ releaseClassLoaderTimes.getAndIncrement();
+ }
+
+ @Override
+ public void close() {}
+ });
+ Assertions.assertEquals(2, getClassLoaderTimes.get());
+ Assertions.assertEquals(2, releaseClassLoaderTimes.get());
+ Assertions.assertEquals(classLoaders[0], classLoaders[1]);
+ Assertions.assertNotEquals(classLoaders[0], classLoaders[2]);
+ Assertions.assertNotEquals(classLoaders[1], classLoaders[2]);
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java
index 5b36e7d9de..3d3fabe973 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java
@@ -144,7 +144,7 @@ public class DefaultClassLoaderService implements
ClassLoaderService {
/** Only for test */
@VisibleForTesting
- Optional<ClassLoader> queryClassLoaderById(long jobId, Collection<URL>
jars) {
+ public Optional<ClassLoader> queryClassLoaderById(long jobId,
Collection<URL> jars) {
if (cacheMode) {
// with cache mode, all jobs share the same classloader if the
jars are the same
jobId = 1L;
@@ -162,7 +162,7 @@ public class DefaultClassLoaderService implements
ClassLoaderService {
/** Only for test */
@VisibleForTesting
- int queryClassLoaderReferenceCount(long jobId, Collection<URL> jars) {
+ public int queryClassLoaderReferenceCount(long jobId, Collection<URL>
jars) {
if (cacheMode) {
// with cache mode, all jobs share the same classloader if the
jars are the same
jobId = 1L;
@@ -180,7 +180,7 @@ public class DefaultClassLoaderService implements
ClassLoaderService {
/** Only for test */
@VisibleForTesting
- int queryClassLoaderCount() {
+ public int queryClassLoaderCount() {
AtomicInteger count = new AtomicInteger();
classLoaderCache.values().forEach(map -> count.addAndGet(map.size()));
return count.get();
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 4bdca5b875..22abdfbffd 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -44,6 +44,7 @@ import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.CollectionConstants;
+import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
@@ -198,22 +199,25 @@ public class MultipleTableJobConfigParser {
TypesafeConfigUtils.getConfigList(
seaTunnelJobConfig, "sink", Collections.emptyList());
- List<URL> connectorJars = getConnectorJarList(sourceConfigs,
transformConfigs, sinkConfigs);
- if (!commonPluginJars.isEmpty()) {
- connectorJars.addAll(commonPluginJars);
- }
+ List<URL> sourceConnectorJars = getConnectorJarList(sourceConfigs,
PluginType.SOURCE);
+ List<URL> transformConnectorJars =
+ getConnectorJarList(transformConfigs, PluginType.TRANSFORM);
+ List<URL> sinkConnectorJars = getConnectorJarList(sinkConfigs,
PluginType.SINK);
ClassLoader parentClassLoader =
Thread.currentThread().getContextClassLoader();
- ClassLoader classLoader;
- if (classLoaderService == null) {
- classLoader = new SeaTunnelChildFirstClassLoader(connectorJars,
parentClassLoader);
- } else {
- classLoader =
- classLoaderService.getClassLoader(
-
Long.parseLong(jobConfig.getJobContext().getJobId()), connectorJars);
- }
+ // source and transform use the same classloader
+ List<URL> sourceJars =
+ Stream.of(sourceConnectorJars, transformConnectorJars)
+ .flatMap(Collection::stream)
+ .distinct()
+ .collect(Collectors.toList());
+ ClassLoader sourceAndTransformClassLoader =
+ getClassLoader(classLoaderService, parentClassLoader,
sourceJars);
+ ClassLoader sinkClassLoader =
+ getClassLoader(classLoaderService, parentClassLoader,
sinkConnectorJars);
+
try {
- Thread.currentThread().setContextClassLoader(classLoader);
+
Thread.currentThread().setContextClassLoader(sourceAndTransformClassLoader);
ConfigParserUtil.checkGraph(sourceConfigs, transformConfigs,
sinkConfigs);
LinkedHashMap<String, List<Tuple2<CatalogTable, Action>>>
tableWithActionMap =
new LinkedHashMap<>();
@@ -229,19 +233,20 @@ public class MultipleTableJobConfigParser {
for (int configIndex = 0; configIndex < sourceConfigs.size();
configIndex++) {
Config sourceConfig = sourceConfigs.get(configIndex);
Tuple2<String, List<Tuple2<CatalogTable, Action>>> tuple2 =
- parseSource(configIndex, sourceConfig, classLoader);
+ parseSource(configIndex, sourceConfig,
sourceAndTransformClassLoader);
tableWithActionMap.put(tuple2._1(), tuple2._2());
}
log.info("start generating all transforms.");
- parseTransforms(transformConfigs, classLoader, tableWithActionMap);
+ parseTransforms(transformConfigs, sourceAndTransformClassLoader,
tableWithActionMap);
+ Thread.currentThread().setContextClassLoader(sinkClassLoader);
log.info("start generating all sinks.");
List<Action> sinkActions = new ArrayList<>();
for (int configIndex = 0; configIndex < sinkConfigs.size();
configIndex++) {
Config sinkConfig = sinkConfigs.get(configIndex);
sinkActions.addAll(
- parseSink(configIndex, sinkConfig, classLoader,
tableWithActionMap));
+ parseSink(configIndex, sinkConfig, sinkClassLoader,
tableWithActionMap));
}
Set<URL> factoryUrls = getUsedFactoryUrls(sinkActions);
return new ImmutablePair<>(sinkActions, factoryUrls);
@@ -249,55 +254,49 @@ public class MultipleTableJobConfigParser {
Thread.currentThread().setContextClassLoader(parentClassLoader);
if (classLoaderService != null) {
classLoaderService.releaseClassLoader(
- Long.parseLong(jobConfig.getJobContext().getJobId()),
connectorJars);
+ Long.parseLong(jobConfig.getJobContext().getJobId()),
sourceJars);
+ classLoaderService.releaseClassLoader(
+ Long.parseLong(jobConfig.getJobContext().getJobId()),
sinkConnectorJars);
}
}
}
+ private ClassLoader getClassLoader(
+ ClassLoaderService classLoaderService,
+ ClassLoader parentClassLoader,
+ List<URL> connectorJars) {
+ ClassLoader classLoader;
+ if (classLoaderService == null) {
+ classLoader = new SeaTunnelChildFirstClassLoader(connectorJars,
parentClassLoader);
+ } else {
+ classLoader =
+ classLoaderService.getClassLoader(
+
Long.parseLong(jobConfig.getJobContext().getJobId()), connectorJars);
+ }
+ return classLoader;
+ }
+
public Set<URL> getUsedFactoryUrls(List<Action> sinkActions) {
Set<URL> urls = new HashSet<>();
fillUsedFactoryUrls(sinkActions, urls);
return urls;
}
- private List<URL> getConnectorJarList(
- List<? extends Config> sourceConfigs,
- List<? extends Config> transformConfigs,
- List<? extends Config> sinkConfigs) {
+ private List<URL> getConnectorJarList(List<? extends Config> configs,
PluginType type) {
List<PluginIdentifier> factoryIds =
- Stream.concat(
- Stream.concat(
- sourceConfigs.stream()
-
.map(ConfigParserUtil::getFactoryId)
- .map(
- factory ->
-
PluginIdentifier.of(
-
CollectionConstants
-
.SEATUNNEL_PLUGIN,
-
CollectionConstants
-
.SOURCE_PLUGIN,
-
factory)),
- transformConfigs.stream()
-
.map(ConfigParserUtil::getFactoryId)
- .map(
- factory ->
-
PluginIdentifier.of(
-
CollectionConstants
-
.SEATUNNEL_PLUGIN,
-
CollectionConstants
-
.TRANSFORM_PLUGIN,
-
factory))),
- sinkConfigs.stream()
- .map(ConfigParserUtil::getFactoryId)
- .map(
- factory ->
- PluginIdentifier.of(
-
CollectionConstants
-
.SEATUNNEL_PLUGIN,
-
CollectionConstants.SINK_PLUGIN,
- factory)))
+ configs.stream()
+ .map(ConfigParserUtil::getFactoryId)
+ .map(
+ factory ->
+ PluginIdentifier.of(
+
CollectionConstants.SEATUNNEL_PLUGIN,
+ type.getType(),
+ factory))
.collect(Collectors.toList());
- return new
SeaTunnelSinkPluginDiscovery().getPluginJarPaths(factoryIds);
+ List<URL> jarPaths = new ArrayList<>();
+ jarPaths.addAll(new
SeaTunnelSinkPluginDiscovery().getPluginJarPaths(factoryIds));
+ jarPaths.addAll(commonPluginJars);
+ return jarPaths;
}
private void fillUsedFactoryUrls(List<Action> actions, Set<URL> result) {