ZihanLi58 commented on code in PR #3544:
URL: https://github.com/apache/gobblin/pull/3544#discussion_r959858522
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java:
##########
@@ -149,15 +150,47 @@ public void awaitHealthy() throws InterruptedException {
return;
}
- @Override
- public synchronized AddSpecResponse onAddSpec(Spec addedSpec) {
- TopologySpec spec = (TopologySpec) addedSpec;
- log.info ("Loading topology {}", spec.toLongString());
- for (Map.Entry entry: spec.getConfigAsProperties().entrySet()) {
- log.info ("topo: {} --> {}", entry.getKey(), entry.getValue());
+ private synchronized AddSpecResponse onAddTopologySpec(TopologySpec spec) {
+ log.info("Loading topology {}", spec.toLongString());
+ for (Map.Entry entry : spec.getConfigAsProperties().entrySet()) {
+ log.info("topo: {} --> {}", entry.getKey(), entry.getValue());
}
- topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec);
+ topologySpecMap.put(spec.getUri(), spec);
+ return new AddSpecResponse(null);
+ }
+
+ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
+ Properties flowSpecProperties = flowSpec.getConfigAsProperties();
+ if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) &&
org.apache.commons.lang.StringUtils.isNotBlank(
+ flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
+ try {
+ new
CronExpression(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+ } catch (Exception e) {
+ log.error("invalid cron schedule: {}",
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY), e);
+ flowSpec.getCompilationErrors().add(new FlowSpec.CompilationError(0,
"invalid cron schedule: " +
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY) +
e.getMessage()));
+ return null;
+ }
+ }
+ String response = null;
+
+ // always try to compile the flow to verify if it is compilable
+ Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+ // If dag is null then a compilation error has occurred
+ if (dag != null && !dag.isEmpty()) {
+ response = dag.toString();
+ }
+ // todo: should we check quota here?
+ return new AddSpecResponse<>(response);
Review Comment:
Yup, I leave the todo hint here for the next PR after mysql quota is in
place.
I think it's easy for us to not check quota for run immediate flow here and
rely on the leader to check quota and skip it. That way we can achieve the goal
you described. But I'm not sure whether we want to change the behavior of the
service. I would prefer to disable run immediate function and only support
adhoc when we migrate adhoc flow into dag action store.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]