phet commented on code in PR #3776:
URL: https://github.com/apache/gobblin/pull/3776#discussion_r1329290152
##########
gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/JobScheduler.java:
##########
@@ -396,7 +396,7 @@ public void scheduleJob(Properties jobProps, JobListener
jobListener, Map<String
try {
// Schedule the Quartz job with a trigger built from the job
configuration
Trigger trigger = createTriggerForJob(job.getKey(), jobProps,
Optional.absent());
- this.scheduler.getScheduler().scheduleJob(job, trigger);
+ this.scheduler.getScheduler().scheduleJob(job, trigger); //quartz
scheduler; calls runJob()
Review Comment:
not sure this comment needs to stay... if so, can we fold into one on 397?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.monitoring.JobStatus;
+
+
+/**
+ * Responsible for defining the behavior of {@link DagTask} handling scenarios
for launch, resume, kill, job start
+ * and flow completion deadlines
+ *
+ */
+@Alpha
+public interface DagManagement {
+
+ /**
+ * Currently, it is handling just the launch of a {@link Dag} request via
REST client for adhoc flows
+ * @param flowGroup
+ * @param flowName
+ * @param triggerTimeStamp
+ */
+ void launchFlow(String flowGroup, String flowName, long triggerTimeStamp);
+
+ /**
+ * Currently, it is handling just the resume of a {@link Dag} request via
REST client for adhoc flows
+ * @param flowGroup
+ * @param flowName
+ * @param flowExecutionId
+ * @param triggerTimeStamp
+ * @throws IOException
+ */
+ void resumeFlow(String flowGroup, String flowName, String flowExecutionId,
long triggerTimeStamp)
+ throws IOException, InterruptedException;
+
+ /**
+ * Currently, it is handling just the kill/cancel of a {@link Dag} request
via REST client for adhoc flows
+ * @param flowGroup
+ * @param flowName
+ * @param flowExecutionId
+ * @param triggerTimeStamp
+ */
+ void killFlow(String flowGroup, String flowName, String flowExecutionId,
long triggerTimeStamp)
Review Comment:
`triggerTimeStamp` may not be the right name in the context of kill. also,
while there is some variation, the codebase *mostly* treats timestamp as one
word for the purpose of capitalization
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProc.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state,
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+ abstract protected S initialize()
+ throws MaybeRetryableException, IOException;
+ abstract protected R act(S state) throws ExecutionException,
InterruptedException, IOException;
+ abstract protected void sendNotification(R result) throws
MaybeRetryableException;
+
+ public void process() {
+ try {
+ S state = this.initialize();
+ R result = this.act(state);
+ this.sendNotification(result);
Review Comment:
each of these three should probably be wrap retries. whether or not should
depend on looking deeper into the `MaybeRetryableException` to determine
whether or not retries are warranted and possible
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+import org.apache.gobblin.annotation.Alpha;
+
+
+/**
+ * Factory for creating {@link DagProc} based on the visitor type for a given
{@link DagTask}.
+ */
+
+@Alpha
+public class DagProcFactory implements DagTaskVisitor<DagProc> {
+ @Override
+ public DagProc meet(LaunchDagTask launchDagTask) throws IOException,
InstantiationException, IllegalAccessException {
+ LaunchDagProc launchDagProc = new LaunchDagProc(launchDagTask.flowGroup,
launchDagTask.flowName);
+ return launchDagProc;
+ }
+
+ @Override
+ public DagProc meet(KillDagTask killDagTask) throws IOException {
+ KillDagProc killDagProc = new KillDagProc(killDagTask.killDagId);
+ return killDagProc;
Review Comment:
for all of these, I prefer:
```
return new XYZDagProc(dagTask);
```
to:
```
XYZDagProc result = new XYZDagProc(dagTask);
return result;
```
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+
+import org.apache.gobblin.annotation.Alpha;
+
+
+/**
+ * Factory for creating {@link DagProc} based on the visitor type for a given
{@link DagTask}.
+ */
+
+@Alpha
+public class DagProcFactory implements DagTaskVisitor<DagProc> {
+ @Override
+ public DagProc meet(LaunchDagTask launchDagTask) throws IOException,
InstantiationException, IllegalAccessException {
+ LaunchDagProc launchDagProc = new LaunchDagProc(launchDagTask.flowGroup,
launchDagTask.flowName);
+ return launchDagProc;
Review Comment:
one idea: give each specific `DagProc` type a ctor that takes in the
corresponding specific `DagTask`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.Synchronized;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagStateStore} to provide information about
dags, dag nodes and their job states.
+ * Currently, this store maintains and utilizes in-memory references about
dags and their job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from
MySQL store.
+ * Thus, the {@link DagManager} would then be stateless and operate
independently.
+ */
+@Getter(onMethod_={@Synchronized})
+@Alpha
+public class DagManagementStateStore {
+ private final Map<Dag.DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>>
jobToDag = new HashMap<>();
+ private final Map<String, Dag<JobExecutionPlan>> dagIdToDags = new
HashMap<>();
+ private final Set<String> failedDagIds = new HashSet<>();
+ private final Map<String, Dag<JobExecutionPlan>> dagIdToResumingDags = new
HashMap<>();
+ // dagToJobs holds a map of dagId to running jobs of that dag
+ final Map<String, LinkedList<Dag.DagNode<JobExecutionPlan>>> dagToJobs = new
HashMap<>();
+ final Map<String, Long> dagToSLA = new HashMap<>();
+ private final Set<String> dagIdstoClean = new HashSet<>();
+ private Optional<DagActionStore> dagActionStore;
+
+ protected synchronized void deleteJobState(String dagId,
Dag.DagNode<JobExecutionPlan> dagNode) {
Review Comment:
...I'm curious now to see who uses this... because I wouldn't have imagined
delete/add/etc could be `protected`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+
+import lombok.Getter;
+import lombok.Synchronized;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+/**
+ * An implementation of {@link DagStateStore} to provide information about
dags, dag nodes and their job states.
+ * Currently, this store maintains and utilizes in-memory references about
dags and their job states and is used
+ * to determine what the current status of the {@link Dag} and/or {@link
Dag.DagNode} is and what actions needs to be
+ * taken next likewise mark it as: complete, failed, sla breached or simply
clean up after completion.
+ * Going forward, each of these in-memory references will be read/write from
MySQL store.
+ * Thus, the {@link DagManager} would then be stateless and operate
independently.
+ */
+@Getter(onMethod_={@Synchronized})
+@Alpha
+public class DagManagementStateStore {
Review Comment:
the javadoc called this an impl.--and I agree it should have an interface
for clients to write to!-- but I don't see that here. as soon as we have a
db-backed impl, we'll want to replace this in-memory version, but w/o updating
client code using it.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProc.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state,
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+ abstract protected S initialize()
+ throws MaybeRetryableException, IOException;
+ abstract protected R act(S state) throws ExecutionException,
InterruptedException, IOException;
+ abstract protected void sendNotification(R result) throws
MaybeRetryableException;
+
+ public void process() {
Review Comment:
I like this imp... we just need to sketch out how it communicates whether or
not it succeeded (and hence whether the lease should be completed or not).
specify that in method-level javadoc. e.g. perhaps we say success is when we
don't throw an exception. otherwise, might neet to return a flag/sentinel
value.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTask.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+
+
+/**
+ * Defines an individual task or job in a Dag.
+ * It carries the state information required by {@link DagProc} to for its
processing.
+ * Upon completion of the {@link DagProc#process()} it will mark the lease
+ * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter}
as complete
+ * @param <T>
+ */
+
+@Alpha
+public abstract class DagTask<T> {
+
+ protected Properties jobProps;
+ protected DagActionStore.DagAction flowAction;
+ protected long triggerTimeStamp;
+
+ protected MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus;
+
+ abstract void initialize(Object state, long triggerTimeStamp);
Review Comment:
since a `DagTask` is a lightweight instance for conveying what work needs
doing, I'm not expecting we'd have anything to `initialize`...
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTask.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter;
+
+
+/**
+ * Defines an individual task or job in a Dag.
+ * It carries the state information required by {@link DagProc} to for its
processing.
+ * Upon completion of the {@link DagProc#process()} it will mark the lease
+ * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter}
as complete
+ * @param <T>
+ */
+@WorkInProgress
+public abstract class DagTask<T> {
+
+ protected Properties jobProps;
+ protected DagActionStore.DagAction flowAction;
+ protected long triggerTimeStamp;
+
+ protected MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus;
Review Comment:
seeing the cast in `conclude`, it seems you agree... shall we use the
correct type here and avoid that cast?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java:
##########
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import com.codahale.metrics.Timer;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.DagActionStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
+import org.apache.gobblin.service.monitoring.JobStatus;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+
+import static org.apache.gobblin.service.ExecutionStatus.ORCHESTRATED;
+import static org.apache.gobblin.service.ExecutionStatus.valueOf;
+
+
+/**
+ * Holds a stream of {@link DagTask} that {@link DagManager} would pull from
to process, as it is ready for more work.
+ * It provides an implementation for {@link DagManagement} defines the rules
for a flow and job.
+ * Implements {@link Iterator} to provide the next {@link DagTask} if
available to {@link DagManager}
+ */
+
+@Alpha
+@Slf4j
+public class DagTaskStream implements Iterator<Optional<DagTask>>,
DagManagement {
+ @Getter
+ private final BlockingDeque<DagTask> taskQueue = new LinkedBlockingDeque<>();
Review Comment:
I'm not clear where this would originate. yes, this `DagTaskStream` would
definitely given out `DagTask`s... but it would be the one to create them--not
merely forward them onward from what it itself receives from upstream
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]