This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1fc5fb058 [GOBBLIN-2042] Fix bug where flowgraph validation
initialization was creating multip… (#3921)
1fc5fb058 is described below
commit 1fc5fb058efda1ee4e5775bf7706c415ab3bbd5d
Author: William Lo <[email protected]>
AuthorDate: Mon Apr 15 13:24:29 2024 -0400
[GOBBLIN-2042] Fix bug where flowgraph validation initialization was
creating multip… (#3921)
Fix bug where flowgraph validation initialization was creating multiple
instances of the flow compiler
---
.../service/modules/orchestration/Orchestrator.java | 20 ++------------------
.../utils/FlowCompilationValidationHelper.java | 2 ++
.../modules/orchestration/OrchestratorTest.java | 7 +++++--
3 files changed, 9 insertions(+), 20 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 7540f31f1..7466fb09b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -18,7 +18,6 @@
package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
@@ -26,7 +25,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,7 +68,6 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
-import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@@ -117,26 +114,14 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
Optional<DagManagementStateStore> dagManagementStateStore,
FlowCompilationValidationHelper flowCompilationValidationHelper) throws
IOException {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
- ClassAliasResolver<SpecCompiler> aliasResolver = new
ClassAliasResolver<>(SpecCompiler.class);
this.topologyCatalog = topologyCatalog;
this.dagManager = dagManager;
this.flowStatusGenerator = flowStatusGenerator;
this.flowTriggerDecorator = flowTriggerDecorator;
this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
this.flowCatalog = flowCatalog;
- try {
- String specCompilerClassName =
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
- if
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
- specCompilerClassName =
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
- }
- _log.info("Using specCompiler class name/alias " +
specCompilerClassName);
-
- this.specCompiler = (SpecCompiler)
ConstructorUtils.invokeConstructor(Class.forName(aliasResolver.resolve(specCompilerClassName)),
config);
- } catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException |
- ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
-
+ this.flowCompilationValidationHelper = flowCompilationValidationHelper;
+ this.specCompiler = flowCompilationValidationHelper.getSpecCompiler();
//At this point, the TopologySpecMap is initialized by the SpecCompiler.
Pass the TopologySpecMap to the DagManager.
this.dagManager.setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
if (dagManagementStateStore.isPresent()) {
@@ -155,7 +140,6 @@ public class Orchestrator implements SpecCatalogListener,
Instrumentable {
quotaManager =
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS,
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
config);
- this.flowCompilationValidationHelper = flowCompilationValidationHelper;
}
@VisibleForTesting
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index 590fedb2b..301e3e436 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -28,6 +28,7 @@ import com.google.inject.Inject;
import com.typesafe.config.Config;
import lombok.Data;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -63,6 +64,7 @@ import org.apache.gobblin.util.ConfigUtils;
@Data
public class FlowCompilationValidationHelper {
private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
+ @Getter
private final SpecCompiler specCompiler;
private final UserQuotaManager quotaManager;
private final EventSubmitter eventSubmitter;
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index a9e5097c6..a98fc8cb7 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -53,6 +53,7 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
@@ -124,10 +125,12 @@ public class OrchestratorTest {
MostlyMySqlDagManagementStateStore dagManagementStateStore =
new MostlyMySqlDagManagementStateStore(config, null, null, null);
+ SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
+
this.orchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
this.topologyCatalog, mockDagManager, Optional.of(logger),
mockStatusGenerator,
- Optional.of(mockFlowTriggerHandler), new
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(
- orchestratorProperties)), Optional.of(mock(FlowCatalog.class)),
Optional.of(dagManagementStateStore), null);
+ Optional.of(mockFlowTriggerHandler), sharedFlowMetricsSingleton,
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
+ new FlowCompilationValidationHelper(config,
sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator));
this.topologyCatalog.addListener(orchestrator);
this.flowCatalog.addListener(orchestrator);
// Start application