phet commented on code in PR #3997:
URL: https://github.com/apache/gobblin/pull/3997#discussion_r1671665510
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagement.java:
##########
@@ -26,14 +26,8 @@
* Consumption of the Dags happen through {@link DagTaskStream}.
*/
public interface DagManagement {
-
- /**
- * Used to add a dagAction event to DagManagement
- */
- void addDagAction(DagActionStore.DagAction dagAction) throws IOException;
-
/**
- * Used to add reminder dagActions to the queue that already contain an
eventTimestamp from the previous lease attempt
+ * Used to add {@link DagActionStore.LeaseParams} to the queue that already
contain an eventTimestamp from the previous lease attempt
*/
- void addReminderDagAction(DagActionStore.LeaseParams reminderLeaseParams)
throws IOException;
+ void addDagAction(DagActionStore.LeaseParams reminderLeaseParams) throws
IOException;
Review Comment:
should the param still be named `reminder...`?
also (javadoc) - in the general case, it doesn't necessarily "already
contain ... from the prev. lease attempt"?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java:
##########
@@ -107,23 +107,10 @@ public DagManagementTaskStreamImpl(Config config,
Optional<DagActionStore> dagAc
this.dagManagementStateStore = dagManagementStateStore;
}
- @Override
- public synchronized void addDagAction(DagActionStore.DagAction dagAction) {
- // TODO: Used to track missing dag issue, remove later as needed
- log.info("Add original (non-reminder) dagAction {}", dagAction);
-
- if (!this.leaseParamsQueue.offer(new DagActionStore.LeaseParams(dagAction,
false, System.currentTimeMillis()))) {
- throw new RuntimeException(String.format("Could not add dag action to
the queue %s", dagAction));
- }
- }
-
- @Override
- public synchronized void addReminderDagAction(DagActionStore.LeaseParams
reminderLeaseParams) {
- // TODO: Used to track missing dag issue, remove later as needed
- log.info("Add reminder dagAction {}", reminderLeaseParams);
-
+ public synchronized void addDagAction(DagActionStore.LeaseParams
reminderLeaseParams) {
+ log.info("Add {} to queue", reminderLeaseParams);
Review Comment:
nit: `"Adding {} to queue..."` (trailing "..." intended literally)
otherwise: `"About to add {} to queue"`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -230,7 +235,16 @@ protected void
processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
if (this.dagProcEngineEnabled &&
DagProcUtils.isJobLevelStatus(jobName)) {
if (updatedJobStatus.getRight() == NewState.FINISHED) {
// todo - retried/resumed jobs *may* not be handled here, we may
want to create their dag action elsewhere
- this.dagManagementStateStore.addJobDagAction(flowGroup,
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
+ try {
+ this.dagManagementStateStore.addJobDagAction(flowGroup,
flowName, flowExecutionId, jobName, DagActionStore.DagActionType.REEVALUATE);
+ } catch (Exception e) {
+ if (isExceptionInstanceOf(e, nonTransientExceptions)) {
+ // todo - add metrics
+ log.error(e.getMessage());
Review Comment:
`log.warn` is probably more appropriate. also would be good to preface the
exception msg w/ context lead-in (especially as there's no stack trace for
figuring out where in the code did the logging!). in addition, let's mention
that the error is being swallowed/ignored.
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -104,7 +106,8 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
private final StateStore<org.apache.gobblin.configuration.State> stateStore;
private final ScheduledExecutorService scheduledExecutorService;
private static final Config RETRYER_FALLBACK_CONFIG =
ConfigFactory.parseMap(ImmutableMap.of(
- RETRY_TIME_OUT_MS, TimeUnit.HOURS.toMillis(24L), // after a day, presume
non-transient and give up
+ // keeping the retry timeout less until we configure retryer to retry
only the transient exceptions
+ RETRY_TIME_OUT_MS, TimeUnit.MINUTES.toMillis(30L), // after 30 minutes,
presume non-transient and give up
Review Comment:
I'm not convinced that 24 hours is the wrong length, esp. as you'll now skip
the `SQLIntegrityConstraintViolationException`s. but if you really think it
needs shortening, I wouldn't go so far as only 30 mins. maybe 6 or 12 hours at
least...
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -120,6 +123,7 @@ public abstract class KafkaJobStatusMonitor extends
HighLevelConsumer<byte[], by
private final GaaSJobObservabilityEventProducer eventProducer;
private final DagManagementStateStore dagManagementStateStore;
private final boolean dagProcEngineEnabled;
+ private final List<Class<? extends Exception>> nonTransientExceptions =
Collections.singletonList(SQLIntegrityConstraintViolationException.class);
Review Comment:
I wouldn't name these "non-transient" exceptions, so much as
"permitted"/"allowable" exceptions
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagManagementDagActionStoreChangeMonitor.java:
##########
@@ -112,7 +112,7 @@ protected void handleDagAction(DagActionStore.DagAction
dagAction, boolean isSta
case LAUNCH :
case REEVALUATE :
case RESUME:
- dagManagement.addDagAction(dagAction);
+ dagManagement.addDagAction(new DagActionStore.LeaseParams(dagAction,
false, System.currentTimeMillis()));
Review Comment:
nit: suggest a `DagActionStore.LeaseParams` ctor that takes only a
`DagAction` and defaults to `false` and `System.currentTimeMillis()`
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlDagActionStoreTest.java:
##########
@@ -70,17 +70,6 @@ public static DagActionStore
getTestDagActionStore(ITestMetastoreDatabase testDb
return new MysqlDagActionStore(config);
}
- @Test
- public void testAddAction() throws Exception {
- this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.KILL);
- //Should not be able to add KILL again when previous one exist
- Assert.expectThrows(IOException.class,
- () -> this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.KILL));
- //Should be able to add a RESUME action for same execution as well as KILL
for another execution of the flow
- this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName,
flowExecutionId, jobName, DagActionStore.DagActionType.RESUME);
- this.mysqlDagActionStore.addJobDagAction(flowGroup, flowName,
flowExecutionId_2, jobName, DagActionStore.DagActionType.KILL);
- }
-
Review Comment:
why get rid of this? I didn't recall a code change to motivate abandoning
this...
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -409,4 +424,12 @@ public static long getExecutionIdFromTableName(String
tableName) {
protected abstract org.apache.gobblin.configuration.State
parseJobStatus(GobblinTrackingEvent event);
+ public static boolean isExceptionInstanceOf(Exception exception,
List<Class<? extends Exception>> typesList) {
+ for (Class<? extends Exception> type : typesList) {
+ if (type.isInstance(exception)) {
+ return true;
+ }
+ }
+ return false;
+ }
Review Comment:
```
return typesList.stream().anyMatch(t -> t.isInstance(exception))
```
?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java:
##########
@@ -331,6 +345,7 @@ static Pair<org.apache.gobblin.configuration.State,
NewState> recalcJobStatus(or
private static NewState newState(org.apache.gobblin.configuration.State
jobStatus, List<org.apache.gobblin.configuration.State> states) {
if (isNewStateTransitionToFinal(jobStatus, states)) {
+ log.info("Flow ");
Review Comment:
snuck in?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]