This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 76d5a3b [GOBBLIN-1144] remove specs from gobblin service job scheduler
76d5a3b is described below
commit 76d5a3bab26b471b722787d735f3df696beecd68
Author: Arjun <[email protected]>
AuthorDate: Thu May 14 10:18:36 2020 -0700
[GOBBLIN-1144] remove specs from gobblin service job scheduler
Dear Gobblin maintainers,
Please accept this PR. I understand that it will
not be reviewed until I have checked off all the
steps below!
### JIRA
- [x] My PR addresses the following [Gobblin JIRA]
(https://issues.apache.org/jira/browse/GOBBLIN/)
issues and references them in the PR title. For
example, "[GOBBLIN-XXX] My Gobblin PR"
-
https://issues.apache.org/jira/browse/GOBBLIN-1144
### Description
- [x] Here are some details about my PR, including
screenshots (if applicable):
implement option 4 mentioned in the doc https://do
cs.google.com/document/d/1OsImllAZRnJIp2NWEOdlfw0X
tqY1b-ysyKEZYLHwVbQ/edit
### Tests
- [x] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:
trivial changes
### Commits
- [x] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
1. Subject is separated from body by a blank line
2. Subject is limited to 50 characters
3. Subject does not end with a period
4. Subject uses the imperative mood ("add", not
"adding")
5. Body wraps at 72 characters
6. Body explains "what" and "why", not "how"
Closes #2981 from
arjun4084346/flowCatalogRaceCondition
---
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 42 +++++++++++-----------
.../scheduler/GobblinServiceJobScheduler.java | 15 ++++++--
2 files changed, 34 insertions(+), 23 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index d902fdf..029005c 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -21,6 +21,8 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
@@ -33,6 +35,7 @@ import java.util.Map;
import java.util.Properties;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.reflect.ConstructorUtils;
+
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
@@ -293,30 +296,27 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
*/
public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
Map<String, AddSpecResponse> responseMap = new HashMap<>();
+ FlowSpec flowSpec = (FlowSpec) spec;
Preconditions.checkState(state() == State.RUNNING, String.format("%s is
not running.", this.getClass().getName()));
- Preconditions.checkNotNull(spec);
+ Preconditions.checkNotNull(flowSpec);
+
+ log.info(String.format("Adding FlowSpec with URI: %s and Config: %s",
flowSpec.getUri(), flowSpec.getConfigAsProperties()));
+ try {
+ long startTime = System.currentTimeMillis();
+ specStore.addSpec(flowSpec);
+ metrics.updatePutSpecTime(startTime);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot add Spec to Spec store: " + flowSpec,
e);
+ }
- log.info(String.format("Adding FlowSpec with URI: %s and Config: %s",
spec.getUri(),
- ((FlowSpec) spec).getConfigAsProperties()));
if (triggerListener) {
- AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener,
AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+ AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener,
AddSpecResponse>> response = this.listeners.onAddSpec(flowSpec);
for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>>
entry: response.getValue().getSuccesses().entrySet()) {
responseMap.put(entry.getKey().getName(),
entry.getValue().getResult());
}
}
- boolean compileSuccess = isCompileSuccessful(responseMap);
-
- if (compileSuccess) {
- long startTime = System.currentTimeMillis();
- metrics.updatePutSpecTime(startTime);
- try {
- if (!((FlowSpec) spec).isExplain()) {
- specStore.addSpec(spec);
- }
- } catch (IOException e) {
- throw new RuntimeException("Cannot add Spec to Spec store: " + spec,
e);
- }
+ if (isCompileSuccessful(responseMap)) {
responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new
AddSpecResponse<>("true"));
} else {
responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new
AddSpecResponse<>("false"));
@@ -325,12 +325,14 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
return responseMap;
}
- private boolean isCompileSuccessful(Map<String, AddSpecResponse>
responseMap) {
+ public static boolean isCompileSuccessful(Map<String, AddSpecResponse>
responseMap) {
AddSpecResponse<String> addSpecResponse =
responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS,
new AddSpecResponse<>(""));
- return addSpecResponse != null
- && addSpecResponse.getValue() != null
- && !addSpecResponse.getValue().contains("ConfigException");
+ return isCompileSuccessful(addSpecResponse.getValue());
+ }
+
+ public static boolean isCompileSuccessful(String dag) {
+ return dag != null && !dag.contains(ConfigException.class.getSimpleName());
}
@Override
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index afc401b..4f2c9ff 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -269,6 +269,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
if (addedSpec instanceof FlowSpec) {
try {
FlowSpec flowSpec = (FlowSpec) addedSpec;
+ URI flowSpecUri = flowSpec.getUri();
Properties jobConfig = new Properties();
Properties flowSpecProperties = ((FlowSpec)
addedSpec).getConfigAsProperties();
jobConfig.putAll(this.properties);
@@ -293,7 +294,9 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
response =
Arrays.toString(flowSpec.getCompilationErrors().toArray());
}
- if (!isExplain) {
+ boolean compileSuccess = FlowCatalog.isCompileSuccessful(response);
+
+ if (!isExplain && compileSuccess) {
this.scheduledFlowSpecs.put(addedSpec.getUri().toString(),
addedSpec);
if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
@@ -301,11 +304,17 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
scheduleJob(jobConfig, null);
if (PropertiesUtils.getPropAsBoolean(jobConfig,
ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
_log.info("RunImmediately requested, hence executing FlowSpec: "
+ addedSpec);
- this.jobExecutor.execute(new
NonScheduledJobRunner(flowSpec.getUri(), false, jobConfig, null));
+ this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri,
false, jobConfig, null));
}
} else {
_log.info("No FlowSpec schedule found, so running FlowSpec: " +
addedSpec);
- this.jobExecutor.execute(new
NonScheduledJobRunner(flowSpec.getUri(), true, jobConfig, null));
+ this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri,
true, jobConfig, null));
+ }
+ } else {
+ _log.info("Removing the flow spec: {}, isExplain: {},
compileSuccess: {}", addedSpec, isExplain, compileSuccess);
+ if (this.flowCatalog.isPresent()) {
+ _log.debug("Removing flow spec from FlowCatalog: {}", flowSpec);
+
GobblinServiceJobScheduler.this.flowCatalog.get().remove(flowSpecUri, new
Properties(), false);
}
}
return new AddSpecResponse<>(response);