[ 
https://issues.apache.org/jira/browse/GOBBLIN-1656?focusedWorklogId=779709&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-779709
 ]

ASF GitHub Bot logged work on GOBBLIN-1656:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Jun/22 23:44
            Start Date: 08/Jun/22 23:44
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3516:
URL: https://github.com/apache/gobblin/pull/3516#discussion_r892891985


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -148,7 +161,7 @@ public void checkQuota(Dag.DagNode<JobExecutionPlan> 
dagNode, boolean onInit) th
    */
   private int incrementJobCountAndCheckQuota(String key, Map<String, Integer> 
quotaMap, Dag.DagNode<JobExecutionPlan> dagNode, int quotaForKey) {
     // Only increment job count for first attempt, since job is considered 
running between retries
-    if (dagNode.getValue().getCurrentAttempts() != 1) {
+    if (dagNode.getValue().getCurrentAttempts() > 1) {

Review Comment:
   are you specifically wishing to include `0`?  if so, please augment comment 
above.



##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java:
##########
@@ -60,9 +63,8 @@ public CreateKVResponse createFlowConfig(FlowConfig 
flowConfig, boolean triggerL
     }
     log.info(createLog);
     FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
-    FlowStatusId flowStatusId = new FlowStatusId()
-        
.setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY))
-        
.setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY));
+    FlowStatusId flowStatusId =
+        new 
FlowStatusId().setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY)).setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY));

Review Comment:
   I'm not usually a stickler for formatting... but curious why you remove the 
breaks...  this line is >200 chars now!



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -360,16 +361,23 @@ public Map<String, AddSpecResponse> put(Spec spec, 
boolean triggerListener) {
         responseMap.put(entry.getKey().getName(), 
entry.getValue().getResult());
       }
     }
+    AddSpecResponse<String> schedulerResponse = 
responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS,
 new AddSpecResponse<>(null));

Review Comment:
   in case helpful background/justification, consider the:
   ```
       return isCompileSuccessful(addSpecResponse.getValue());
   ```
   in:
   ```
     public static boolean isCompileSuccessful(Map<String, AddSpecResponse> 
responseMap) {
       // If we cannot get the response from the scheduler, assume that the 
flow failed compilation
       AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(
           ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new 
AddSpecResponse<>(null));
       return isCompileSuccessful(addSpecResponse.getValue());
     }
   
     public static boolean isCompileSuccessful(String dag) {
       return dag != null && 
!dag.contains(ConfigException.class.getSimpleName());
     }
   ```
   if `addSpecResponse` were not `AddSpecResponse<String>`, that invocation 
would give a runtime failure.  at the least it deserves a `instanceof`--yet raw 
types evade such enforcement.
   
   primer, if helpful - 
https://stackoverflow.com/questions/2770321/what-is-a-raw-type-and-why-shouldnt-we-use-it



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -103,8 +108,16 @@ public void checkQuota(Dag.DagNode<JobExecutionPlan> 
dagNode, boolean onInit) th
     boolean requesterCheck = true;
 
     if (serializedRequesters != null) {
-      List<String> uniqueRequesters = 
RequesterService.deserialize(serializedRequesters).stream()
-          
.map(ServiceRequester::getName).distinct().collect(Collectors.toList());
+      List<String> uniqueRequesters;
+      try {
+        uniqueRequesters = RequesterService.deserialize(serializedRequesters)
+            .stream()
+            .map(ServiceRequester::getName)
+            .distinct()
+            .collect(Collectors.toList());
+      } catch (IOException e) {
+        throw new RuntimeException("Could not process requesters due to ", e);
+      }

Review Comment:
   looking at how this is just calling a `static` in `RequesterService` seems 
potentially appropriate abstraction as 
`DagManagerUtils.getDistinctServiceRequesters` (throwing the unchecked 
exception).
   
   BTW, I don't immediately perceive the justification for:
   ```
      } catch (RuntimeException e) {
         throw new IOException(e);
       }
   ```
   in `RequesterService.deserialize`
   
   (and anyway is perhaps `RuntimeException` merely catching for its derivation 
`JsonParseException`... not sure what other kind would arise within GSON...)



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -360,16 +361,21 @@ public Map<String, AddSpecResponse> put(Spec spec, 
boolean triggerListener) {
         responseMap.put(entry.getKey().getName(), 
entry.getValue().getResult());
       }
     }
+    AddSpecResponse<String> schedulerResponse = 
responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS,
 new AddSpecResponse<>(null));
 
-    if (isCompileSuccessful(responseMap)) {
+    if (isCompileSuccessful(schedulerResponse.getValue())) {

Review Comment:
   a large part of this is the tunneling I describe above, but if you actually 
do need additional info to support `isExplain/hasExplain`, you could insert it 
into (or wrapping around) the `QuotaExceededException`



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -360,16 +361,23 @@ public Map<String, AddSpecResponse> put(Spec spec, 
boolean triggerListener) {
         responseMap.put(entry.getKey().getName(), 
entry.getValue().getResult());
       }
     }
+    AddSpecResponse<String> schedulerResponse = 
responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS,
 new AddSpecResponse<>(null));

Review Comment:
   I've never looked closely at `AddSpecResponse`, but have grown quite weary 
in the course of this review.  those using it have been far more liberal with 
java raw types than I'm comfortable with (being basically never).
   
   anyway, I see what you're doing to tunnel through the quota failure.  why 
not instead have the `GobblinServiceJobScheduler` (listener) throw the 
`QuotaFailureException` and you interrogate the `.getFailures()` here to see 
whether that happened?  if you find it, then rethrow (as a checked exception, 
which your resource handler needs to be on the look out for).  that way, no 
need to essentially disguise the situation and sneak it through; rather leave 
it explicit and therefore under the compiler's watch.
   



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -322,6 +326,19 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       return new AddSpecResponse<>(response);
     }
 
+    // Check quota limits against run immediately flows or adhoc flows before 
saving the schedule
+    if (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || 
PropertiesUtils.getPropAsBoolean(jobConfig, 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {

Review Comment:
   for scheduled AND runImmediately flows, seems we would only `scheduleJob()` 
when quota finds it permitted to run immediately (right now).  are those 
semantics intended?



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -322,6 +326,18 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       return new AddSpecResponse<>(response);
     }
 
+    // Check quota limits against run immediately flows or adhoc flows before 
saving the schedule
+    if (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || 
PropertiesUtils.getPropAsBoolean(jobConfig, 
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+      try {
+        if (quotaManager.isPresent()) {
+          quotaManager.get().checkQuota(dag.getNodes().get(0), false);

Review Comment:
   will that also avoid premature quota increment in cases where 
`DagManagerThread.initialize` later short-circuit aborts?
   ```
         if (this.dags.containsKey(dagId)) {
           log.warn("Already tracking a dag with dagId {}, skipping.", dagId);
           return;
         }
   ```
   (comment should clarify that neither double increment nor premature 
increment would occur)





Issue Time Tracking
-------------------

    Worklog Id:     (was: 779709)
    Time Spent: 2h 50m  (was: 2h 40m)

> Return different Http Status on GaaS if Quota is Exceeded
> ---------------------------------------------------------
>
>                 Key: GOBBLIN-1656
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1656
>             Project: Apache Gobblin
>          Issue Type: New Feature
>          Components: gobblin-service
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> GaaS has quota limits for proxy users and flowGroups.
> When a user wants to create a flow that exceeds their specified quota, the 
> flow should
> 1) Not be run
> 2) Return a http status code (i.e. 503) due to exceeding the resource. This 
> allows clients to implement some wait and retry functionality



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to