[ 
https://issues.apache.org/jira/browse/GOBBLIN-1910?focusedWorklogId=882993&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-882993
 ]

ASF GitHub Bot logged work on GOBBLIN-1910:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Oct/23 21:07
            Start Date: 02/Oct/23 21:07
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3776:
URL: https://github.com/apache/gobblin/pull/3776#discussion_r1343135965


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.orchestration.proc;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import 
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import 
org.apache.gobblin.service.modules.orchestration.exception.MaybeRetryableException;
+import org.apache.gobblin.service.modules.orchestration.task.DagTask;
+
+
+/**
+ * Responsible to performing the actual work for a given {@link DagTask}.
+ * It processes the {@link DagTask} by first initializing its state, 
performing actions
+ * based on the type of {@link DagTask} and finally submitting an event to the 
executor.
+ * @param <S> current state of the dag node
+ * @param <R> result after processing the dag node
+ */
+@Alpha
+@Slf4j
+public abstract class DagProc<S, R> {
+
+  abstract protected S initialize(DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, IOException;
+  abstract protected R act(S state, DagManagementStateStore 
dagManagementStateStore) throws MaybeRetryableException, Exception;
+  abstract protected void sendNotification(R result, EventSubmitter 
eventSubmitter) throws MaybeRetryableException, IOException;
+
+  public final void process(DagManagementStateStore dagManagementStateStore, 
EventSubmitter eventSubmitter, int maxRetryCount, long delayRetryMillis) {
+    try {
+      S state = this.initializeWithRetries(dagManagementStateStore, 
maxRetryCount, delayRetryMillis);
+      R result = this.actWithRetries(state, dagManagementStateStore, 
maxRetryCount, delayRetryMillis); // may be pass state store too here
+      this.sendNotificationWithRetries(result, eventSubmitter, maxRetryCount, 
delayRetryMillis);
+      log.info("Successfully processed Dag Request");
+    } catch (Exception ex) {
+      throw new RuntimeException("Cannot process Dag Request: ", ex);
+    }
+  }
+
+  protected final S initializeWithRetries(DagManagementStateStore 
dagManagementStateStore, int maxRetryCount, long delayRetryMillis) throws 
IOException {
+    for (int retryCount = 0; retryCount < maxRetryCount; retryCount++) {
+      try {
+        return this.initialize(dagManagementStateStore);
+      } catch (MaybeRetryableException e) {
+        if (retryCount < maxRetryCount - 1) { // Don't wait before the last 
retry
+          waitBeforeRetry(delayRetryMillis);
+        }
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+    throw new RuntimeException("Max retry attempts reached. Cannot initialize 
Dag");
+  }

Review Comment:
   1. try reworking more generically as:
   ```
   protected final <T> T execWithRetries(Supplier<T> exec, int maxRetryCount, 
long delayRetryMillis) {
   ...
   }
   ```
   and call like:
   ```
   R result = this.execWithRetries(() -> this.act(state, 
dagManagementStateStore), maxRetryCount, delayRetryMillis)
   ```
   
   2. probably don't wrap exceptions, given that's what `process` already plans 
to do... unless you're merely trying to tunnel them past the `Supplier`'s 
exception signature.  if so, then create a `CheckedExceptionSupplier` in the 
vein of - 
https://github.com/apache/gobblin/blob/028b85f587e3c1e6afa5d8662fe9ed3f0087568d/gobblin-utility/src/main/java/org/apache/gobblin/util/function/CheckedExceptionFunction.java#L31
   
   3. the final missing piece is to remain aware of the timeframe for the 
task's lease expiration.  when it's close (or already passed) do not execute 
(and certainly don't retry).  instead throw a particular exception to mark the 
situation





Issue Time Tracking
-------------------

    Worklog Id:     (was: 882993)
    Time Spent: 12.5h  (was: 12h 20m)

> Refactor code to move current in-memory references to new design for REST 
> calls: Launch, Resume and Kill
> --------------------------------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1910
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1910
>             Project: Apache Gobblin
>          Issue Type: New Feature
>            Reporter: Meeth Gala
>            Priority: Major
>          Time Spent: 12.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to