This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fc5fb058 [GOBBLIN-2042] Fix bug where flowgraph validation 
initialization was creating multip… (#3921)
1fc5fb058 is described below

commit 1fc5fb058efda1ee4e5775bf7706c415ab3bbd5d
Author: William Lo <[email protected]>
AuthorDate: Mon Apr 15 13:24:29 2024 -0400

    [GOBBLIN-2042] Fix bug where flowgraph validation initialization was 
creating multip… (#3921)
    
    Fix bug where flowgraph validation initialization was creating multiple 
instances of the flow compiler
---
 .../service/modules/orchestration/Orchestrator.java  | 20 ++------------------
 .../utils/FlowCompilationValidationHelper.java       |  2 ++
 .../modules/orchestration/OrchestratorTest.java      |  7 +++++--
 3 files changed, 9 insertions(+), 20 deletions(-)

diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 7540f31f1..7466fb09b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -18,7 +18,6 @@
 package org.apache.gobblin.service.modules.orchestration;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.util.Collections;
 import java.util.List;
@@ -26,7 +25,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,7 +68,6 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
-import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -117,26 +114,14 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       Optional<DagManagementStateStore> dagManagementStateStore,
       FlowCompilationValidationHelper flowCompilationValidationHelper) throws 
IOException {
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
-    ClassAliasResolver<SpecCompiler> aliasResolver = new 
ClassAliasResolver<>(SpecCompiler.class);
     this.topologyCatalog = topologyCatalog;
     this.dagManager = dagManager;
     this.flowStatusGenerator = flowStatusGenerator;
     this.flowTriggerDecorator = flowTriggerDecorator;
     this.sharedFlowMetricsSingleton = sharedFlowMetricsSingleton;
     this.flowCatalog = flowCatalog;
-    try {
-      String specCompilerClassName = 
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
-      if 
(config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
-        specCompilerClassName = 
config.getString(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY);
-      }
-      _log.info("Using specCompiler class name/alias " + 
specCompilerClassName);
-
-      this.specCompiler = (SpecCompiler) 
ConstructorUtils.invokeConstructor(Class.forName(aliasResolver.resolve(specCompilerClassName)),
 config);
-    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException |
-             ClassNotFoundException e) {
-      throw new RuntimeException(e);
-    }
-
+    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
+    this.specCompiler = flowCompilationValidationHelper.getSpecCompiler();
     //At this point, the TopologySpecMap is initialized by the SpecCompiler. 
Pass the TopologySpecMap to the DagManager.
     this.dagManager.setTopologySpecMap(getSpecCompiler().getTopologySpecMap());
     if (dagManagementStateStore.isPresent()) {
@@ -155,7 +140,6 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     quotaManager = 
GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
         ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_MANAGER_CLASS, 
ServiceConfigKeys.DEFAULT_QUOTA_MANAGER),
         config);
-    this.flowCompilationValidationHelper = flowCompilationValidationHelper;
   }
 
   @VisibleForTesting
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
index 590fedb2b..301e3e436 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/FlowCompilationValidationHelper.java
@@ -28,6 +28,7 @@ import com.google.inject.Inject;
 import com.typesafe.config.Config;
 
 import lombok.Data;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -63,6 +64,7 @@ import org.apache.gobblin.util.ConfigUtils;
 @Data
 public class FlowCompilationValidationHelper {
   private final SharedFlowMetricsSingleton sharedFlowMetricsSingleton;
+  @Getter
   private final SpecCompiler specCompiler;
   private final UserQuotaManager quotaManager;
   private final EventSubmitter eventSubmitter;
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index a9e5097c6..a98fc8cb7 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -53,6 +53,7 @@ import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
 import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
+import 
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
 import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ConfigUtils;
@@ -124,10 +125,12 @@ public class OrchestratorTest {
     MostlyMySqlDagManagementStateStore dagManagementStateStore =
         new MostlyMySqlDagManagementStateStore(config, null, null, null);
 
+    SharedFlowMetricsSingleton sharedFlowMetricsSingleton = new 
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(orchestratorProperties));
+
     this.orchestrator = new 
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
         this.topologyCatalog, mockDagManager, Optional.of(logger), 
mockStatusGenerator,
-        Optional.of(mockFlowTriggerHandler), new 
SharedFlowMetricsSingleton(ConfigUtils.propertiesToConfig(
-            orchestratorProperties)), Optional.of(mock(FlowCatalog.class)), 
Optional.of(dagManagementStateStore), null);
+        Optional.of(mockFlowTriggerHandler), sharedFlowMetricsSingleton, 
Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
+        new FlowCompilationValidationHelper(config, 
sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator));
     this.topologyCatalog.addListener(orchestrator);
     this.flowCatalog.addListener(orchestrator);
     // Start application

Reply via email to