aplex commented on a change in pull request #3273:
URL: https://github.com/apache/gobblin/pull/3273#discussion_r625303699



##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
##########
@@ -366,11 +376,17 @@ public Spec getSpecWrapper(URI uri) {
     return responseMap;
   }
 
-  public static boolean isCompileSuccessful(Map<String, AddSpecResponse> 
responseMap) {
-    AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(
-        ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new 
AddSpecResponse<>(""));
-
-    return isCompileSuccessful(addSpecResponse.getValue());
+  public boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) 
{
+    for (SpecCatalogListener listener: this.listeners.getListeners()) {
+      String listenerClass = listener.getClass().getCanonicalName();
+      AddSpecResponse<String> addSpecResponse =

Review comment:
       Is it safe to call listeners from this place? I wonder if they have any 
side effects, and will call them multiple times for the same flow. It would be 
good to review what  they actually do and what is the contract/docs for the 
listener interface.

##########
File path: 
gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
##########
@@ -129,6 +130,65 @@ public void testDisableFlowRunImmediatelyOnStart()
     
Assert.assertEquals(modifiedConfig.getString(ConfigurationKeys.JOB_NAME_KEY), 
TEST_FLOW_NAME);
   }
 
+  /**
+   * Test that flowSpecs that throw compilation errors do not block the 
scheduling of other flowSpecs
+   */
+  @Test
+  public void testJobSchedulerInitWithFailedSpec() throws Exception {
+    // Mock a FlowCatalog.
+    File specDir = Files.createTempDir();
+
+    Properties properties = new Properties();
+    properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
+    FlowCatalog flowCatalog = new 
FlowCatalog(ConfigUtils.propertiesToConfig(properties));
+    ServiceBasedAppLauncher serviceLauncher = new 
ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
+
+    serviceLauncher.addService(flowCatalog);
+    serviceLauncher.start();
+
+    FlowSpec flowSpec0 = 
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"),
+        MockedSpecCompiler.UNCOMPILABLE_FLOW);
+    FlowSpec flowSpec1 = 
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"));
+    FlowSpec flowSpec2 = 
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec2"));
+
+    // Assume that the catalog can store corrupted flows
+    flowCatalog.put(flowSpec0, false);
+    // Ensure that these flows are scheduled
+    flowCatalog.put(flowSpec1, false);
+    flowCatalog.put(flowSpec2, false);
+
+    Assert.assertEquals(flowCatalog.getSpecs().size(), 3);
+
+    Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+
+    // Mock a GaaS scheduler.
+    TestGobblinServiceJobScheduler scheduler = new 
TestGobblinServiceJobScheduler("testscheduler",
+        ConfigFactory.empty(), Optional.of(flowCatalog), null, 
mockOrchestrator, null);
+
+    SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
+    Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
+    Mockito.doAnswer((Answer<Void>) a -> {
+      scheduler.isCompilerHealthy = true;
+      return null;
+    }).when(mockCompiler).awaitHealthy();
+
+    scheduler.setActive(true);
+
+    
AssertWithBackoff.create().timeoutMs(6000).maxSleepMs(2000).backoffFactor(2)

Review comment:
       Timed waits in tests can introduce flakiness. Usually this happens when 
dozens of tests run concurrently on an overloaded CI machine, and even basic 
operation take way longer when developer expects.
   
   The best way to address this it to have some kind of notification/event from 
a class, and react to it. It will have no delays, and can also tolerate the 
situation when it takes a long time for operation to happen.
   
   If it's difficult or unclear how to do that in this case, I suggest to bump 
the timeout to 15-30 seconds.

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MockSpecCatalogListener.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.net.URI;
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
+
+
+public class MockSpecCatalogListener implements SpecCatalogListener {

Review comment:
       Does this class need to be a part of the public Gobblin API, or it can 
live inside /tests/ folder? 
   
   You can also consider using dynamic mocks that will handle all methods that 
interface implements, and will throw an exception for a call with specific 
argument - 
https://stackoverflow.com/questions/17088465/performing-a-custom-action-when-a-given-mocked-void-method-is-called/17089554
 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to