This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2e3102f11 [GOBBLIN-1931] Refactor dag action updating method & add
clarifying comment (#3801)
2e3102f11 is described below
commit 2e3102f116986030a1b9f5f9d6cb6e031726a594
Author: umustafi <[email protected]>
AuthorDate: Fri Oct 20 14:35:26 2023 -0700
[GOBBLIN-1931] Refactor dag action updating method & add clarifying comment
(#3801)
* Refactor dag action updating method & add clarifying comment
* Log filtered out duplicate messages
* logs and metrics for missing messages from change monitor
* Only add gobblin.service prefix for dagActionStoreChangeMonitor
---------
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../java/org/apache/gobblin/runtime/api/DagActionStore.java | 7 +++----
.../gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java | 6 +++---
.../org/apache/gobblin/runtime/kafka/HighLevelConsumer.java | 4 +++-
.../runtime/api/MysqlMultiActiveLeaseArbiterTest.java | 4 ++--
.../service/modules/orchestration/FlowTriggerHandler.java | 12 +++++++-----
.../gobblin/service/monitoring/ChangeMonitorUtils.java | 2 +-
.../service/monitoring/DagActionStoreChangeMonitor.java | 3 ++-
7 files changed, 21 insertions(+), 17 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
index eb26acd16..4f3442597 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionStore.java
@@ -49,10 +49,9 @@ public interface DagActionStore {
/**
* Replace flow execution id with agreed upon event time to easily track
the flow
*/
- public static DagActionStore.DagAction
updateFlowExecutionId(DagActionStore.DagAction flowAction,
- long eventTimeMillis) {
- return new DagActionStore.DagAction(flowAction.getFlowGroup(),
flowAction.getFlowName(),
- String.valueOf(eventTimeMillis), flowAction.getFlowActionType());
+ public DagAction updateFlowExecutionId(long eventTimeMillis) {
+ return new DagAction(this.getFlowGroup(), this.getFlowName(),
+ String.valueOf(eventTimeMillis), this.getFlowActionType());
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
index c6161d936..338e908a2 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java
@@ -321,14 +321,14 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
// Lease is valid
if (leaseValidityStatus == 1) {
if (isWithinEpsilon) {
- DagActionStore.DagAction updatedFlowAction =
updateFlowExecutionId(flowAction, dbEventTimestamp.getTime());
+ DagActionStore.DagAction updatedFlowAction =
flowAction.updateFlowExecutionId(dbEventTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] -
CASE 2: Same event, lease is valid",
updatedFlowAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
// Utilize db timestamp for reminder
return new LeasedToAnotherStatus(updatedFlowAction,
dbLeaseAcquisitionTimestamp.getTime() + dbLinger -
dbCurrentTimestamp.getTime());
}
- DagActionStore.DagAction updatedFlowAction =
updateFlowExecutionId(flowAction, dbCurrentTimestamp.getTime());
+ DagActionStore.DagAction updatedFlowAction =
flowAction.updateFlowExecutionId(dbCurrentTimestamp.getTime());
log.info("tryAcquireLease for [{}, is: {}, eventTimestamp: {}] - CASE
3: Distinct event, lease is valid",
updatedFlowAction, isReminderEvent ? "reminder" : "original",
dbCurrentTimestamp.getTime());
// Utilize db lease acquisition timestamp for wait time
@@ -518,7 +518,7 @@ public class MysqlMultiActiveLeaseArbiter implements
MultiActiveLeaseArbiter {
if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
return new NoLongerLeasingStatus();
}
- DagActionStore.DagAction updatedFlowAction =
updateFlowExecutionId(flowAction, selectInfoResult.eventTimeMillis);
+ DagActionStore.DagAction updatedFlowAction =
flowAction.updateFlowExecutionId(selectInfoResult.eventTimeMillis);
if (numRowsUpdated == 1) {
log.info("Obtained lease for [{}, is: {}, eventTimestamp: {}]
successfully!", updatedFlowAction,
isReminderEvent ? "reminder" : "original",
selectInfoResult.eventTimeMillis);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index 5e8daaa26..7b494201e 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -95,7 +95,7 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
*/
@Getter
private MetricContext metricContext;
- private Counter messagesRead;
+ protected Counter messagesRead;
@Getter
private final GobblinKafkaConsumerClient gobblinKafkaConsumerClient;
private final ScheduledExecutorService consumerExecutor;
@@ -329,6 +329,8 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
}
}
} catch (InterruptedException e) {
+ log.warn("Encountered exception while processing queue ", e);
+ // TODO: evaluate whether we should interrupt the thread or continue
processing
Thread.currentThread().interrupt();
}
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
index 7bafc78ff..08630ab36 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java
@@ -217,7 +217,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Mark the resume action lease from above as completed by fabricating a
LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
- DagActionStore.DagAction updatedResumeDagAction =
DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction,
+ DagActionStore.DagAction updatedResumeDagAction =
resumeDagAction.updateFlowExecutionId(
selectInfoResult.getEventTimeMillis());
boolean markedSuccess =
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
updatedResumeDagAction,
selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
@@ -299,7 +299,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
// Mark the resume action lease from above as completed by fabricating a
LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
- DagActionStore.DagAction updatedResumeDagAction =
DagActionStore.DagAction.updateFlowExecutionId(resumeDagAction,
+ DagActionStore.DagAction updatedResumeDagAction =
resumeDagAction.updateFlowExecutionId(
selectInfoResult.getEventTimeMillis());
boolean markedSuccess =
mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
updatedResumeDagAction,
selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index 8abaa209c..c5a5bb8e0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -112,8 +112,12 @@ public class FlowTriggerHandler {
if (multiActiveLeaseArbiter.isPresent()) {
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus =
multiActiveLeaseArbiter.get().tryAcquireLease(
flowAction, eventTimeMillis, isReminderEvent);
+ // The flow action contained in the`LeaseAttemptStatus` from the lease
arbiter contains an updated flow execution
+ // id. From this point onwards, always use the newer version of the flow
action to easily track the action through
+ // orchestration and execution.
if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeaseObtainedStatus) {
- MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
+ MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus)
+ leaseAttemptStatus;
this.leaseObtainedCount.inc();
if (persistFlowAction(leaseObtainedStatus)) {
log.info("Successfully persisted lease: [{}, eventTimestamp: {}] ",
leaseObtainedStatus.getFlowAction(),
@@ -122,11 +126,9 @@ public class FlowTriggerHandler {
}
// If persisting the flow action failed, then we set another trigger
for this event to occur immediately to
// re-attempt handling the event
- DagActionStore.DagAction updatedFlowAction =
DagActionStore.DagAction.updateFlowExecutionId(flowAction,
- leaseObtainedStatus.getEventTimeMillis());
scheduleReminderForEvent(jobProps,
- new
MultiActiveLeaseArbiter.LeasedToAnotherStatus(updatedFlowAction, 0L),
- eventTimeMillis);
+ new
MultiActiveLeaseArbiter.LeasedToAnotherStatus(leaseObtainedStatus.getFlowAction(),
+ 0L), eventTimeMillis);
return;
} else if (leaseAttemptStatus instanceof
MultiActiveLeaseArbiter.LeasedToAnotherStatus) {
this.leasedToAnotherStatusCount.inc();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
index 33934ef06..a2d68fbc0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/ChangeMonitorUtils.java
@@ -35,7 +35,7 @@ public final class ChangeMonitorUtils {
String operation, String timestamp) {
// If we've already processed a message with this timestamp and key before
then skip duplicate message
if (cache.getIfPresent(changeIdentifier) != null) {
- log.debug("Duplicate change event with identifier {}", changeIdentifier);
+ log.info("Duplicate change event with identifier {}", changeIdentifier);
return false;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 1435e076a..e5a2d090d 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -34,6 +34,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
@@ -216,7 +217,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
@Override
protected void createMetrics() {
- super.createMetrics();
+ super.messagesRead =
this.getMetricContext().counter(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
RuntimeMetrics.GOBBLIN_KAFKA_HIGH_LEVEL_CONSUMER_MESSAGES_READ);
this.killsInvoked =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED);
this.resumesInvoked =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
this.flowsLaunched =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_FLOWS_LAUNCHED);