[
https://issues.apache.org/jira/browse/GOBBLIN-1656?focusedWorklogId=779709&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-779709
]
ASF GitHub Bot logged work on GOBBLIN-1656:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 08/Jun/22 23:44
Start Date: 08/Jun/22 23:44
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3516:
URL: https://github.com/apache/gobblin/pull/3516#discussion_r892891985
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -148,7 +161,7 @@ public void checkQuota(Dag.DagNode<JobExecutionPlan>
dagNode, boolean onInit) th
*/
private int incrementJobCountAndCheckQuota(String key, Map<String, Integer>
quotaMap, Dag.DagNode<JobExecutionPlan> dagNode, int quotaForKey) {
// Only increment job count for first attempt, since job is considered
running between retries
- if (dagNode.getValue().getCurrentAttempts() != 1) {
+ if (dagNode.getValue().getCurrentAttempts() > 1) {
Review Comment:
are you specifically wishing to include `0`? if so, please augment comment
above.
##########
gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java:
##########
@@ -60,9 +63,8 @@ public CreateKVResponse createFlowConfig(FlowConfig
flowConfig, boolean triggerL
}
log.info(createLog);
FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
- FlowStatusId flowStatusId = new FlowStatusId()
-
.setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY))
-
.setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY));
+ FlowStatusId flowStatusId =
+ new
FlowStatusId().setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY)).setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY));
Review Comment:
I'm not usually a stickler for formatting... but curious why you remove the
breaks... this line is >200 chars now!
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -360,16 +361,23 @@ public Map<String, AddSpecResponse> put(Spec spec,
boolean triggerListener) {
responseMap.put(entry.getKey().getName(),
entry.getValue().getResult());
}
}
+ AddSpecResponse<String> schedulerResponse =
responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS,
new AddSpecResponse<>(null));
Review Comment:
in case helpful background/justification, consider the:
```
return isCompileSuccessful(addSpecResponse.getValue());
```
in:
```
public static boolean isCompileSuccessful(Map<String, AddSpecResponse>
responseMap) {
// If we cannot get the response from the scheduler, assume that the
flow failed compilation
AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(
ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new
AddSpecResponse<>(null));
return isCompileSuccessful(addSpecResponse.getValue());
}
public static boolean isCompileSuccessful(String dag) {
return dag != null &&
!dag.contains(ConfigException.class.getSimpleName());
}
```
if `addSpecResponse` were not `AddSpecResponse<String>`, that invocation
would give a runtime failure. at the least it deserves a `instanceof`--yet raw
types evade such enforcement.
primer, if helpful -
https://stackoverflow.com/questions/2770321/what-is-a-raw-type-and-why-shouldnt-we-use-it
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java:
##########
@@ -103,8 +108,16 @@ public void checkQuota(Dag.DagNode<JobExecutionPlan>
dagNode, boolean onInit) th
boolean requesterCheck = true;
if (serializedRequesters != null) {
- List<String> uniqueRequesters =
RequesterService.deserialize(serializedRequesters).stream()
-
.map(ServiceRequester::getName).distinct().collect(Collectors.toList());
+ List<String> uniqueRequesters;
+ try {
+ uniqueRequesters = RequesterService.deserialize(serializedRequesters)
+ .stream()
+ .map(ServiceRequester::getName)
+ .distinct()
+ .collect(Collectors.toList());
+ } catch (IOException e) {
+ throw new RuntimeException("Could not process requesters due to ", e);
+ }
Review Comment:
looking at how this is just calling a `static` in `RequesterService` seems
potentially appropriate abstraction as
`DagManagerUtils.getDistinctServiceRequesters` (throwing the unchecked
exception).
BTW, I don't immediately perceive the justification for:
```
} catch (RuntimeException e) {
throw new IOException(e);
}
```
in `RequesterService.deserialize`
(and anyway is perhaps `RuntimeException` merely catching for its derivation
`JsonParseException`... not sure what other kind would arise within GSON...)
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -360,16 +361,21 @@ public Map<String, AddSpecResponse> put(Spec spec,
boolean triggerListener) {
responseMap.put(entry.getKey().getName(),
entry.getValue().getResult());
}
}
+ AddSpecResponse<String> schedulerResponse =
responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS,
new AddSpecResponse<>(null));
- if (isCompileSuccessful(responseMap)) {
+ if (isCompileSuccessful(schedulerResponse.getValue())) {
Review Comment:
a large part of this is the tunneling I describe above, but if you actually
do need additional info to support `isExplain/hasExplain`, you could insert it
into (or wrapping around) the `QuotaExceededException`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -360,16 +361,23 @@ public Map<String, AddSpecResponse> put(Spec spec,
boolean triggerListener) {
responseMap.put(entry.getKey().getName(),
entry.getValue().getResult());
}
}
+ AddSpecResponse<String> schedulerResponse =
responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS,
new AddSpecResponse<>(null));
Review Comment:
I've never looked closely at `AddSpecResponse`, but have grown quite weary
in the course of this review. those using it have been far more liberal with
java raw types than I'm comfortable with (being basically never).
anyway, I see what you're doing to tunnel through the quota failure. why
not instead have the `GobblinServiceJobScheduler` (listener) throw the
`QuotaFailureException` and you interrogate the `.getFailures()` here to see
whether that happened? if you find it, then rethrow (as a checked exception,
which your resource handler needs to be on the look out for). that way, no
need to essentially disguise the situation and sneak it through; rather leave
it explicit and therefore under the compiler's watch.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -322,6 +326,19 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
return new AddSpecResponse<>(response);
}
+ // Check quota limits against run immediately flows or adhoc flows before
saving the schedule
+ if (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) ||
PropertiesUtils.getPropAsBoolean(jobConfig,
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
Review Comment:
for scheduled AND runImmediately flows, seems we would only `scheduleJob()`
when quota finds it permitted to run immediately (right now). are those
semantics intended?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -322,6 +326,18 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
return new AddSpecResponse<>(response);
}
+ // Check quota limits against run immediately flows or adhoc flows before
saving the schedule
+ if (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) ||
PropertiesUtils.getPropAsBoolean(jobConfig,
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+ try {
+ if (quotaManager.isPresent()) {
+ quotaManager.get().checkQuota(dag.getNodes().get(0), false);
Review Comment:
will that also avoid premature quota increment in cases where
`DagManagerThread.initialize` later short-circuit aborts?
```
if (this.dags.containsKey(dagId)) {
log.warn("Already tracking a dag with dagId {}, skipping.", dagId);
return;
}
```
(comment should clarify that neither double increment nor premature
increment would occur)
Issue Time Tracking
-------------------
Worklog Id: (was: 779709)
Time Spent: 2h 50m (was: 2h 40m)
> Return different Http Status on GaaS if Quota is Exceeded
> ---------------------------------------------------------
>
> Key: GOBBLIN-1656
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1656
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-service
> Reporter: William Lo
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> GaaS has quota limits for proxy users and flowGroups.
> When a user wants to create a flow that exceeds their specified quota, the
> flow should
> 1) Not be run
> 2) Return a http status code (i.e. 503) due to exceeding the resource. This
> allows clients to implement some wait and retry functionality
--
This message was sent by Atlassian Jira
(v8.20.7#820007)