This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5db162165 [GOBBLIN-1766] Define metric to measure lag from producing 
to consume… (#3625)
5db162165 is described below

commit 5db16216558f23876a3a5aa77661341559e63a8d
Author: umustafi <[email protected]>
AuthorDate: Tue Jan 24 13:20:21 2023 -0800

    [GOBBLIN-1766] Define metric to measure lag from producing to consume… 
(#3625)
    
    * [GOBBLIN-1766] Define metric to measure lag from producing to consume 
change stream events
    
    * Refactor and add unit test for measuring lag
    
    * fix change identifier to use tid
    
    * address pr comments to clarify naming, make gauge threadsafe
    
    * fix unit test
    
    Co-authored-by: Urmi Mustafi <[email protected]>
---
 .../src/main/avro/GenericStoreChangeEvent.avsc     |  9 ++++++--
 .../gobblin/runtime/HighLevelConsumerTest.java     | 21 +++++++++++++++++
 .../gobblin/runtime/kafka/HighLevelConsumer.java   |  4 ++++
 .../gobblin/runtime/metrics/RuntimeMetrics.java    | 22 ++++++++++--------
 .../monitoring/DagActionStoreChangeMonitor.java    | 27 ++++++++++++++--------
 .../service/monitoring/SpecStoreChangeMonitor.java | 17 ++++++++++----
 6 files changed, 76 insertions(+), 24 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc
index c2298f460..08d93826e 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc
@@ -9,9 +9,14 @@
     "doc" : "Primary key for the store",
     "compliance" : "NONE"
   }, {
-    "name" : "timestamp",
+    "name" : "txId",
+    "type" : "string",
+    "doc" : "ID to uniquely identify the transaction. Used for identifying 
duplicate messages with different timestamps for the same transaction.",
+    "compliance" : "NONE"
+  }, {
+    "name" : "produceTimestampMillis",
     "type" : "long",
-    "doc" : "Time the change occurred",
+    "doc" : "Time the change was produced to topic (separate than the time of 
the update to the store)",
     "compliance" : "NONE"
   }, {
     "name": "operationType",
diff --git 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
index 8a28bf270..bb0b96f90 100644
--- 
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
+++ 
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Properties;
 
 import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Test;
@@ -155,6 +156,26 @@ public class HighLevelConsumerTest extends KafkaTestBase {
     consumer.shutDown();
   }
 
+  @Test
+  public void testCalculateProduceToConsumeLag() {
+    Properties consumerProps = new Properties();
+    consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
+    
consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
 "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+    consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + 
KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
+    //Generate a brand new consumer group id to ensure there are no previously 
committed offsets for this group id
+    String consumerGroupId = Joiner.on("-").join(TOPIC, "auto", 
System.currentTimeMillis());
+    consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT + 
HighLevelConsumer.GROUP_ID_KEY, consumerGroupId);
+    consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY, 
"true");
+    MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC, 
ConfigUtils.propertiesToConfig(consumerProps),
+        NUM_PARTITIONS) {
+      @Override public Long calcMillisSince(Long timestamp) {
+        return 1234L - timestamp;
+      }
+    };
+    Long produceTimestamp = 1000L;
+    Assert.assertTrue(consumer.calcMillisSince(produceTimestamp).equals(234L));
+  }
+
   private List<byte[]> createByteArrayMessages() {
     List<byte[]> records = Lists.newArrayList();
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index 7541503ee..0e68ea9aa 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -333,4 +333,8 @@ public abstract class HighLevelConsumer<K,V> extends 
AbstractIdleService {
       }
     }
   }
+
+  public Long calcMillisSince(Long timestamp) {
+    return System.currentTimeMillis() - timestamp;
+  }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index c2889ea95..1b7bb3ae5 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -36,15 +36,19 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_JOB_MONITOR_SLAEVENT_REJECTEDEVENTS = 
"gobblin.jobMonitor.slaevent.rejectedevents";
   public static final String GOBBLIN_JOB_MONITOR_KAFKA_MESSAGE_PARSE_FAILURES =
       "gobblin.jobMonitor.kafka.messageParseFailures";
-  public static final String 
GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.specStoreMonitor.successful.added.specs";
-  public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.specStoreMonitor.failed.added.specs";
-  public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.specStoreMonitor.deleted.specs";
-  public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.specStoreMonitor.unexpected.errors";
-  public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.specStoreMonitor.message.processed";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.dagActionStore.kills.invoked";
-  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.dagActionStoreMonitor.message.processed";
-  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED 
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.dagActionStore.resumes.invoked";
-  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.dagActionStore.unexpected.errors";
+  public static final String 
GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".specStoreMonitor.successful.added.specs";
+  public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".specStoreMonitor.failed.added.specs";
+  public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.deleted.specs";
+  public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".specStoreMonitor.unexpected.errors";
+  public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED= 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".specStoreMonitor.message.processed";
+  public static final String 
GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
+      ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".specstoreMonitor.produce.to.consume.delay";
+  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.kills.invoked";
+  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED= 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.message.processed";
+  public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED 
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.resumes.invoked";
+  public static final String 
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.unexpected.errors";
+  public static final String
+      GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
".dagActionStoreMonitor.produce.to.consume.delay";
 
   public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.mysql.quota.manager.unexpected.errors";
   public static final String 
GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = 
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + 
"gobblin.mysql.quota.manager.quotaRequests.exceeded";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 8aecd000a..a42a01e23 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -31,6 +31,7 @@ import com.typesafe.config.ConfigValueFactory;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.runtime.api.DagActionStore;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
@@ -53,6 +54,9 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
   private ContextAwareMeter resumesInvoked;
   private ContextAwareMeter unexpectedErrors;
   private ContextAwareMeter messageProcessedMeter;
+  private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from 
all partitions in one gauge
+
+  private volatile Long produceToConsumeDelayValue = -1L;
 
   protected CacheLoader<String, String> cacheLoader = new CacheLoader<String, 
String>() {
     @Override
@@ -98,18 +102,20 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     String key = (String) message.getKey();
     DagActionStoreChangeEvent value = (DagActionStoreChangeEvent) 
message.getValue();
 
-    Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+    String tid = value.getChangeEventIdentifier().getTxId();
+    Long produceTimestamp = 
value.getChangeEventIdentifier().getProduceTimestampMillis();
     String operation = 
value.getChangeEventIdentifier().getOperationType().name();
     String flowGroup = value.getFlowGroup();
     String flowName = value.getFlowName();
     String flowExecutionId = value.getFlowExecutionId();
 
-    log.debug("Processing Dag Action message for flow group: {} name: {} 
executionId: {} timestamp {} operation {}",
-        flowGroup, flowName, flowExecutionId, timestamp, operation);
+    produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
+    log.debug("Processing Dag Action message for flow group: {} name: {} 
executionId: {} tid: {} operation: {} lag: {}",
+        flowGroup, flowName, flowExecutionId, tid, operation, 
produceToConsumeDelayValue);
 
-    String changeIdentifier = timestamp + key;
+    String changeIdentifier = tid + key;
     if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, 
dagActionsSeenCache, operation,
-        timestamp.toString())) {
+        produceTimestamp.toString())) {
       return;
     }
 
@@ -119,15 +125,17 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
       try {
         dagAction = dagActionStore.getDagAction(flowGroup, flowName, 
flowExecutionId).getDagActionValue();
       } catch (IOException e) {
-        log.warn("Encountered IOException trying to retrieve dagAction for 
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, 
flowName, flowExecutionId, e);
+        log.error("Encountered IOException trying to retrieve dagAction for 
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, 
flowName, flowExecutionId, e);
         this.unexpectedErrors.mark();
+        return;
       } catch (SpecNotFoundException e) {
-        log.warn("DagAction not found for flow group: {} name: {} executionId: 
{} Exception: {}", flowGroup, flowName,
+        log.error("DagAction not found for flow group: {} name: {} 
executionId: {} Exception: {}", flowGroup, flowName,
             flowExecutionId, e);
         this.unexpectedErrors.mark();
+        return;
       } catch (SQLException throwables) {
-        log.warn("Encountered SQLException trying to retrieve dagAction for 
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, 
flowName, flowExecutionId, throwables);
-        throwables.printStackTrace();
+        log.error("Encountered SQLException trying to retrieve dagAction for 
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup, 
flowName, flowExecutionId, throwables);
+        return;
       }
     }
 
@@ -176,6 +184,7 @@ public class DagActionStoreChangeMonitor extends 
HighLevelConsumer {
     this.resumesInvoked = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
     this.unexpectedErrors = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
     this.messageProcessedMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
+    this.produceToConsumeDelayMillis = 
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
 () -> produceToConsumeDelayValue);
   }
 
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index 1e834c76d..4f7ac6e86 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -33,6 +33,7 @@ import com.typesafe.config.ConfigValueFactory;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
@@ -58,6 +59,9 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer 
{
   private ContextAwareMeter failedAddedSpecs;
   private ContextAwareMeter deletedSpecs;
   private ContextAwareMeter unexpectedErrors;
+  private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from 
all partitions in one gauge
+
+  private volatile Long produceToConsumeDelayValue = -1L;
 
   protected CacheLoader<String, String> cacheLoader = new CacheLoader<String, 
String>() {
     @Override
@@ -102,14 +106,17 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
     String key = (String) message.getKey();
     GenericStoreChangeEvent value = (GenericStoreChangeEvent) 
message.getValue();
 
-    Long timestamp = value.getTimestamp();
+    String tid = value.getTxId();
+    Long produceTimestamp = value.getProduceTimestampMillis();
     String operation = value.getOperationType().name();
 
-    log.debug("Processing message where specUri is {} timestamp: {} operation: 
{}", key, timestamp, operation);
+    produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
+    log.debug("Processing message where specUri is {} tid: {} operation: {} 
delay: {}", key, tid, operation,
+        produceToConsumeDelayValue);
 
-    String changeIdentifier = timestamp + key;
+    String changeIdentifier = tid + key;
     if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier, 
specChangesSeenCache, operation,
-        timestamp.toString())) {
+        produceTimestamp.toString())) {
       return;
     }
 
@@ -158,6 +165,7 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
     } catch (Exception e) {
       log.warn("Ran into unexpected error processing SpecStore changes. 
Reexamine scheduler. Error: {}", e);
       this.unexpectedErrors.mark();
+      return;
     }
 
     specChangesSeenCache.put(changeIdentifier, changeIdentifier);
@@ -171,5 +179,6 @@ public class SpecStoreChangeMonitor extends 
HighLevelConsumer {
     this.deletedSpecs = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS);
     this.unexpectedErrors = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS);
     this.messageProcessedMeter = 
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED);
+    this.produceToConsumeDelayMillis = 
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
 () -> produceToConsumeDelayValue);
   }
 }
\ No newline at end of file

Reply via email to