umustafi commented on code in PR #3756: URL: https://github.com/apache/gobblin/pull/3756#discussion_r1312346681
########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProc.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; + + +/** + * Responsible to performing the actual work for a given {@link DagTask}. Review Comment: "performing work and notifying other components/modules of it's imminent or completed actions" ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcFactory.java: ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +/** + * Factory for creating {@link DagProc} based on the visitor type {@link DagTask}. + */ +public interface DagProcFactory extends DagTaskVisitor<DagProc> { + DagProc meet(LaunchDagTask ldt); + DagProc meet(KillDagTask kdt); + DagProc meet(ResumeDagTask rdt); Review Comment: Let's add the SLA type actions as well from above - enforceFlowCompletionDeadline - void enforceJobStartDeadline ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/KillDagProc.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; + + +/** + * An implmentation of {@link DagProc} for killing {@link DagTask}. Review Comment: Is this only for API call to kill or from any other pt? Good to clarify in java doc ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/LaunchDagProc.java: ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; + + +/** + * An implmentation of {@link DagProc} for launching {@link DagTask}. Review Comment: newly added launches or every time task needs to be launched on executor? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.util.Iterator; + + +/** + * Holds a stream of {@link DagTask} that needs to be processed by the {@link DagManager}. + * It provides an implementation for {@link DagManagement} defines the rules for a flow and job. + * Implements {@link Iterator} to provide the next {@link DagTask} if available to {@link DagManager} + */ +public class DagTaskStream implements Iterator<DagTask>, DagManagement { + @Override + public boolean hasNext() { + return false; + } + + @Override + public DagTask next() { + return null; + } + + @Override + public void launchFlow() { + + } + + @Override + public void resumeFlow() { + + } + + @Override + public void killFlow() { + + } + + @Override + public void enforceFlowCompletionDeadline() { + + } + + @Override + public void enforceJobStartDeadline() { Review Comment: Who calls these functions? Why are there no parameters? I initially thought `DagManagement` from above will be the one to obtain these objects one by one from `DagTaskStream` but perhaps `DagManager` is the user of `DagTaskStream` and utilizes the `DagManagement` interface? It would be good to include in these classes how they fit into the other classes (who is a caller of what)? A diagram might actually be super helpful at clarifying for any reader who interacts with whom in what capacity. ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTask.java: ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +/** + * Defines an individual task or job in a Dag. + * It carries the state information required by {@link DagProc} to for its processing. Review Comment: extra "to" ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskStream.java: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import java.util.Iterator; + + +/** + * Holds a stream of {@link DagTask} that needs to be processed by the {@link DagManager}. + * It provides an implementation for {@link DagManagement} defines the rules for a flow and job. Review Comment: "(and) defines the rules ...." missing the word `and`. What are said rules btw? Can you elaborate? Can you also explain if each host's `DagTaskStream` will be unique or all tasks will be common between hosts? That will imply whether or not we handle lease arbitration under the hood in this class. ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTaskVisitor.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +/** + * Interface defining {@link DagTask} based on the type of visitor. + * @param <T> + */ +public interface DagTaskVisitor<T> { + T meet(LaunchDagTask launchDagTask); + T meet(KillDagTask killDagTask); + T meet(ResumeDagTask resumeDagTask); Review Comment: missing remaining SLA tasks ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagTask.java: ########## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +/** + * Defines an individual task or job in a Dag. + * It carries the state information required by {@link DagProc} to for its processing. + * Upon completion of the {@link DagProc#process()} it will mark the lease + * acquired by {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter} as complete + * @param <T> + */ +abstract class DagTask<T> { + + abstract void initialize(); + abstract void conclude(); + abstract T host(DagTaskVisitor<T> visitor); Review Comment: add short one line java docs here ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProc.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; + + +/** + * Responsible to performing the actual work for a given {@link DagTask}. + * It processes the {@link DagTask} by first initializing its state, performing actions + * like updating {@link DagStateStore} and finally submiting an event to the executor. + * @param <S> current state of the dag node + * @param <R> result after processing the dag node + */ +public abstract class DagProc<S, R> { + abstract protected S initialize() throws MaybeRetryableException; + abstract protected R act(S state) throws MaybeRetryableException; + abstract protected void sendNotification(R result) throws MaybeRetryableException; + + final void process() { + throw new UnsupportedOperationException(" Process unsupported"); Review Comment: can you elaborate a little on how S, R, and these functions are used with pseudocode in process? Who is the user of `DagProc`? Is it sufficient just to call `process` with the `DagTask` as input or do they need to `initialize`, then `act`, etc...? Does `process` do all of the above instead? Does it handle retries? Can you add some java docs to these methods. ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProc.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +import org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException; + + +/** + * Responsible to performing the actual work for a given {@link DagTask}. + * It processes the {@link DagTask} by first initializing its state, performing actions + * like updating {@link DagStateStore} and finally submiting an event to the executor. Review Comment: Let's differentiate between contacting the executor to carry out an action and submitting status events ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/NewDagManagerBoilerPlate.java: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.orchestration; + +public class NewDagManagerBoilerPlate { + + private DagTaskStream dagTaskStream; + private DagProc dagProc; Review Comment: I need some clarification on how these are used (see questions above). Does `run` below keep getting called continuously? Once the `DagTask` is returned do we call the `DagTaskStream` methods to launch/kill.... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
