Will-Lo commented on code in PR #3640:
URL: https://github.com/apache/gobblin/pull/3640#discussion_r1117782597
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java:
##########
@@ -65,6 +65,9 @@
@SuppressFBWarnings(value="SE_BAD_FIELD",
justification = "FindBugs complains about Config not being serializable,
but the implementation of Config is serializable")
public class FlowSpec implements Configurable, Spec {
+ // Key for Property associated with modified_time
+ public static final String modificationTimeKey = "modified_time";
Review Comment:
can we treat this the same way as other Gobblin constants?
`public static final String MODIFICATION_TIME_KEY = "modified_time"`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java:
##########
@@ -54,6 +54,9 @@ public class RuntimeMetrics {
public static final String
GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.mysql.quota.manager.quotaRequests.exceeded";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.mysql.quota.manager.time.to.check.quota";
+ public static final String
GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".jobScheduler.average.get.spec.speed.while.loading.all.specs.millis";
Review Comment:
I would consider shortening this metric name :) since for metric platforms
they usually have some slicing/dicing available based on your delimiter.
So it'd be a very long metric name when you may want to group more related
metrics in the future under `jobScheduler`
So for example this can be `GOBBLIN_SERVICE_PREFIX +
"jobScheduler.getSpecSpeedDuringStartupAvgMillis"`
Similar can be said about the below metrics in a way.
Also be careful not to add a dot before the prefix otherwise I believe
you'll get a metric name with 2 `.`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +418,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}
}
+ // Compare the modification timestamp of the spec being added if the
scheduler is being initialized, ideally we
+ // don't even want to do the same update twice as it will kill the
existing flow and reschedule it unnecessarily
+ Long modificationTime =
Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.modificationTimeKey,
"0"));
+ String uriString = flowSpec.getUri().toString();
+ // If the modification time is 0 (which means the original API was used to
retrieve spec or warm standby mode is not
+ // enabled), spec not in scheduler, or have a modification time associated
with it assume it's the most recent
+ if (modificationTime != 0L &&
this.scheduledFlowSpecs.containsKey(uriString)
+ && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+ if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= modificationTime) {
+ _log.info("Ignoring the spec {} modified at time {} because we have a
more updated version from time {}",
Review Comment:
We should make this a warn log probably, based on the response it will treat
as a successful add
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java:
##########
@@ -342,8 +418,24 @@ public AddSpecResponse onAddSpec(Spec addedSpec) {
}
}
+ // Compare the modification timestamp of the spec being added if the
scheduler is being initialized, ideally we
+ // don't even want to do the same update twice as it will kill the
existing flow and reschedule it unnecessarily
+ Long modificationTime =
Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.modificationTimeKey,
"0"));
+ String uriString = flowSpec.getUri().toString();
+ // If the modification time is 0 (which means the original API was used to
retrieve spec or warm standby mode is not
+ // enabled), spec not in scheduler, or have a modification time associated
with it assume it's the most recent
+ if (modificationTime != 0L &&
this.scheduledFlowSpecs.containsKey(uriString)
+ && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+ if (this.lastUpdatedTimeForFlowSpec.get(uriString) >= modificationTime) {
+ _log.info("Ignoring the spec {} modified at time {} because we have a
more updated version from time {}",
+ addedSpec,
modificationTime,this.lastUpdatedTimeForFlowSpec.get(uriString));
+ return new AddSpecResponse(response);
+ }
+ }
+
// todo : we should probably not schedule a flow if it is a runOnce flow
this.scheduledFlowSpecs.put(flowSpecUri.toString(), addedSpec);
+ this.lastUpdatedTimeForFlowSpec.put(flowSpecUri.toString(),
modificationTime);
Review Comment:
Do we still need to keep track of this data structure after service startup?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]