[ 
https://issues.apache.org/jira/browse/GOBBLIN-1783?focusedWorklogId=847595&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-847595
 ]

ASF GitHub Bot logged work on GOBBLIN-1783:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Feb/23 00:00
            Start Date: 25/Feb/23 00:00
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1117782597


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java:
##########
@@ -65,6 +65,9 @@
 @SuppressFBWarnings(value="SE_BAD_FIELD",
     justification = "FindBugs complains about Config not being serializable, 
but the implementation of Config is serializable")
 public class FlowSpec implements Configurable, Spec {
+  // Key for Property associated with modified_time
+  public static final String modificationTimeKey = "modified_time";

Review Comment:
   can we treat this the same way as other Gobblin constants?
   `public static final String MODIFICATION_TIME_KEY = "modified_time"`
   



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java:
##########
@@ -54,6 +54,9 @@ public class RuntimeMetrics {
   public static final String 
GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.mysql.quota.manager.quotaRequests.exceeded";
   public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.mysql.quota.manager.time.to.check.quota";
 
+  public static final String 
GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".jobScheduler.average.get.spec.speed.while.loading.all.specs.millis";

Review Comment:
   I would consider shortening this metric name :) since for metric platforms 
they usually have some slicing/dicing available based on your delimiter.
   So it'd be a very long metric name when you may want to group more related 
metrics in the future under `jobScheduler`
   So for example this can be `GOBBLIN_SERVICE_PREFIX + 
"jobScheduler.getSpecSpeedDuringStartupAvgMillis"`
   
   Similar can be said about the below metrics in a way.
   Also be careful not to add a dot before the prefix otherwise I believe 
you'll get a metric name with 2 `.`



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +418,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the 
scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the 
existing flow and reschedule it unnecessarily
+    Long modificationTime = 
Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.modificationTimeKey,
 "0"));
+    String uriString = flowSpec.getUri().toString();
+    // If the modification time is 0 (which means the original API was used to 
retrieve spec or warm standby mode is not
+    // enabled), spec not in scheduler, or have a modification time associated 
with it assume it's the most recent
+    if (modificationTime != 0L && 
this.scheduledFlowSpecs.containsKey(uriString)
+        && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+      if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= modificationTime) {
+        _log.info("Ignoring the spec {} modified at time {} because we have a 
more updated version from time {}",

Review Comment:
   We should make this a warn log probably, based on the response it will treat 
as a successful add



##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +418,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the 
scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the 
existing flow and reschedule it unnecessarily
+    Long modificationTime = 
Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.modificationTimeKey,
 "0"));
+    String uriString = flowSpec.getUri().toString();
+    // If the modification time is 0 (which means the original API was used to 
retrieve spec or warm standby mode is not
+    // enabled), spec not in scheduler, or have a modification time associated 
with it assume it's the most recent
+    if (modificationTime != 0L && 
this.scheduledFlowSpecs.containsKey(uriString)
+        && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+      if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= modificationTime) {
+        _log.info("Ignoring the spec {} modified at time {} because we have a 
more updated version from time {}",
+            addedSpec, 
modificationTime,this.lastUpdatedTimeForFlowSpec.get(uriString));
+        return new AddSpecResponse(response);
+      }
+    }
+
     // todo : we should probably not schedule a flow if it is a runOnce flow
     this.scheduledFlowSpecs.put(flowSpecUri.toString(), addedSpec);
+    this.lastUpdatedTimeForFlowSpec.put(flowSpecUri.toString(), 
modificationTime);

Review Comment:
   Do we still need to keep track of this data structure after service startup?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 847595)
    Time Spent: 2h 50m  (was: 2h 40m)

> Initialize scheduler with batch gets instead of individual get per flow
> -----------------------------------------------------------------------
>
>                 Key: GOBBLIN-1783
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1783
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-service
>            Reporter: Urmi Mustafi
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> We seek to improve initialization time of the JobScheduler upon restart or 
> new leadership change by batching the mysql queries to get flow specs. 
> Instead of making 1 mysql get call for each flow execution id, which scales 
> extremely poorly with number of flows, we should group them to reduce number 
> of calls and downtime.
> This implementation adds two new functions to the SpecStore interface, 
> getSortedSpecURIs and getBatchedSpecs, that we use to achieve the batching. 
> Because these two functionalities are generic enough to be used in derived 
> classes of the SpecStore we add them to the base class. Although this 
> requires any child classes to implement these functions, it allows any 
> consumer of the parent class SpecStore to use this functionality without 
> caring about the specific implementation of the SpecStore used (as 
> JobScheduler does). Additionally, the getBatchedSpecs requires an offset or 
> starting point to obtain the batches from so the consumer has to do some book 
> keeping of where in the paginated gets we are but this again separates the 
> functionality from the use case of the consumer. the entirety of the flow 
> catalog is too large to load into memory for the Scheduler, so we use this 
> batch functionality. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to