This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5db162165 [GOBBLIN-1766] Define metric to measure lag from producing
to consume… (#3625)
5db162165 is described below
commit 5db16216558f23876a3a5aa77661341559e63a8d
Author: umustafi <[email protected]>
AuthorDate: Tue Jan 24 13:20:21 2023 -0800
[GOBBLIN-1766] Define metric to measure lag from producing to consume…
(#3625)
* [GOBBLIN-1766] Define metric to measure lag from producing to consume
change stream events
* Refactor and add unit test for measuring lag
* fix change identifier to use tid
* address pr comments to clarify naming, make gauge threadsafe
* fix unit test
Co-authored-by: Urmi Mustafi <[email protected]>
---
.../src/main/avro/GenericStoreChangeEvent.avsc | 9 ++++++--
.../gobblin/runtime/HighLevelConsumerTest.java | 21 +++++++++++++++++
.../gobblin/runtime/kafka/HighLevelConsumer.java | 4 ++++
.../gobblin/runtime/metrics/RuntimeMetrics.java | 22 ++++++++++--------
.../monitoring/DagActionStoreChangeMonitor.java | 27 ++++++++++++++--------
.../service/monitoring/SpecStoreChangeMonitor.java | 17 ++++++++++----
6 files changed, 76 insertions(+), 24 deletions(-)
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc
index c2298f460..08d93826e 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GenericStoreChangeEvent.avsc
@@ -9,9 +9,14 @@
"doc" : "Primary key for the store",
"compliance" : "NONE"
}, {
- "name" : "timestamp",
+ "name" : "txId",
+ "type" : "string",
+ "doc" : "ID to uniquely identify the transaction. Used for identifying
duplicate messages with different timestamps for the same transaction.",
+ "compliance" : "NONE"
+ }, {
+ "name" : "produceTimestampMillis",
"type" : "long",
- "doc" : "Time the change occurred",
+ "doc" : "Time the change was produced to topic (separate than the time of
the update to the store)",
"compliance" : "NONE"
}, {
"name": "operationType",
diff --git
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
index 8a28bf270..bb0b96f90 100644
---
a/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
+++
b/gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/runtime/HighLevelConsumerTest.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Properties;
import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
@@ -155,6 +156,26 @@ public class HighLevelConsumerTest extends KafkaTestBase {
consumer.shutDown();
}
+ @Test
+ public void testCalculateProduceToConsumeLag() {
+ Properties consumerProps = new Properties();
+ consumerProps.setProperty(ConfigurationKeys.KAFKA_BROKERS, _kafkaBrokers);
+
consumerProps.setProperty(Kafka09ConsumerClient.GOBBLIN_CONFIG_VALUE_DESERIALIZER_CLASS_KEY,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT +
KAFKA_AUTO_OFFSET_RESET_KEY, "earliest");
+ //Generate a brand new consumer group id to ensure there are no previously
committed offsets for this group id
+ String consumerGroupId = Joiner.on("-").join(TOPIC, "auto",
System.currentTimeMillis());
+ consumerProps.setProperty(SOURCE_KAFKA_CONSUMERCONFIG_KEY_WITH_DOT +
HighLevelConsumer.GROUP_ID_KEY, consumerGroupId);
+ consumerProps.setProperty(HighLevelConsumer.ENABLE_AUTO_COMMIT_KEY,
"true");
+ MockedHighLevelConsumer consumer = new MockedHighLevelConsumer(TOPIC,
ConfigUtils.propertiesToConfig(consumerProps),
+ NUM_PARTITIONS) {
+ @Override public Long calcMillisSince(Long timestamp) {
+ return 1234L - timestamp;
+ }
+ };
+ Long produceTimestamp = 1000L;
+ Assert.assertTrue(consumer.calcMillisSince(produceTimestamp).equals(234L));
+ }
+
private List<byte[]> createByteArrayMessages() {
List<byte[]> records = Lists.newArrayList();
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
index 7541503ee..0e68ea9aa 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/kafka/HighLevelConsumer.java
@@ -333,4 +333,8 @@ public abstract class HighLevelConsumer<K,V> extends
AbstractIdleService {
}
}
}
+
+ public Long calcMillisSince(Long timestamp) {
+ return System.currentTimeMillis() - timestamp;
+ }
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index c2889ea95..1b7bb3ae5 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -36,15 +36,19 @@ public class RuntimeMetrics {
public static final String GOBBLIN_JOB_MONITOR_SLAEVENT_REJECTEDEVENTS =
"gobblin.jobMonitor.slaevent.rejectedevents";
public static final String GOBBLIN_JOB_MONITOR_KAFKA_MESSAGE_PARSE_FAILURES =
"gobblin.jobMonitor.kafka.messageParseFailures";
- public static final String
GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.specStoreMonitor.successful.added.specs";
- public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.specStoreMonitor.failed.added.specs";
- public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.specStoreMonitor.deleted.specs";
- public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.specStoreMonitor.unexpected.errors";
- public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED=
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.specStoreMonitor.message.processed";
- public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.dagActionStore.kills.invoked";
- public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED=
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.dagActionStoreMonitor.message.processed";
- public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.dagActionStore.resumes.invoked";
- public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.dagActionStore.unexpected.errors";
+ public static final String
GOBBLIN_SPEC_STORE_MONITOR_SUCCESSFULLY_ADDED_SPECS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".specStoreMonitor.successful.added.specs";
+ public static final String GOBBLIN_SPEC_STORE_MONITOR_FAILED_ADDED_SPECS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".specStoreMonitor.failed.added.specs";
+ public static final String GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".specStoreMonitor.deleted.specs";
+ public static final String GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".specStoreMonitor.unexpected.errors";
+ public static final String GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED=
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".specStoreMonitor.message.processed";
+ public static final String
GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
+ ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".specstoreMonitor.produce.to.consume.delay";
+ public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_KILLS_INVOKED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.kills.invoked";
+ public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED=
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.message.processed";
+ public static final String GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED
= ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.resumes.invoked";
+ public static final String
GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.unexpected.errors";
+ public static final String
+ GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
".dagActionStoreMonitor.produce.to.consume.delay";
public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_UNEXPECTED_ERRORS =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.mysql.quota.manager.unexpected.errors";
public static final String
GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED =
ServiceMetricNames.GOBBLIN_SERVICE_PREFIX +
"gobblin.mysql.quota.manager.quotaRequests.exceeded";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
index 8aecd000a..a42a01e23 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/DagActionStoreChangeMonitor.java
@@ -31,6 +31,7 @@ import com.typesafe.config.ConfigValueFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.runtime.api.DagActionStore;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
@@ -53,6 +54,9 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
private ContextAwareMeter resumesInvoked;
private ContextAwareMeter unexpectedErrors;
private ContextAwareMeter messageProcessedMeter;
+ private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from
all partitions in one gauge
+
+ private volatile Long produceToConsumeDelayValue = -1L;
protected CacheLoader<String, String> cacheLoader = new CacheLoader<String,
String>() {
@Override
@@ -98,18 +102,20 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
String key = (String) message.getKey();
DagActionStoreChangeEvent value = (DagActionStoreChangeEvent)
message.getValue();
- Long timestamp = value.getChangeEventIdentifier().getTimestamp();
+ String tid = value.getChangeEventIdentifier().getTxId();
+ Long produceTimestamp =
value.getChangeEventIdentifier().getProduceTimestampMillis();
String operation =
value.getChangeEventIdentifier().getOperationType().name();
String flowGroup = value.getFlowGroup();
String flowName = value.getFlowName();
String flowExecutionId = value.getFlowExecutionId();
- log.debug("Processing Dag Action message for flow group: {} name: {}
executionId: {} timestamp {} operation {}",
- flowGroup, flowName, flowExecutionId, timestamp, operation);
+ produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
+ log.debug("Processing Dag Action message for flow group: {} name: {}
executionId: {} tid: {} operation: {} lag: {}",
+ flowGroup, flowName, flowExecutionId, tid, operation,
produceToConsumeDelayValue);
- String changeIdentifier = timestamp + key;
+ String changeIdentifier = tid + key;
if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier,
dagActionsSeenCache, operation,
- timestamp.toString())) {
+ produceTimestamp.toString())) {
return;
}
@@ -119,15 +125,17 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
try {
dagAction = dagActionStore.getDagAction(flowGroup, flowName,
flowExecutionId).getDagActionValue();
} catch (IOException e) {
- log.warn("Encountered IOException trying to retrieve dagAction for
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup,
flowName, flowExecutionId, e);
+ log.error("Encountered IOException trying to retrieve dagAction for
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup,
flowName, flowExecutionId, e);
this.unexpectedErrors.mark();
+ return;
} catch (SpecNotFoundException e) {
- log.warn("DagAction not found for flow group: {} name: {} executionId:
{} Exception: {}", flowGroup, flowName,
+ log.error("DagAction not found for flow group: {} name: {}
executionId: {} Exception: {}", flowGroup, flowName,
flowExecutionId, e);
this.unexpectedErrors.mark();
+ return;
} catch (SQLException throwables) {
- log.warn("Encountered SQLException trying to retrieve dagAction for
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup,
flowName, flowExecutionId, throwables);
- throwables.printStackTrace();
+ log.error("Encountered SQLException trying to retrieve dagAction for
flow group: {} name: {} executionId: {}. " + "Exception: {}", flowGroup,
flowName, flowExecutionId, throwables);
+ return;
}
}
@@ -176,6 +184,7 @@ public class DagActionStoreChangeMonitor extends
HighLevelConsumer {
this.resumesInvoked =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_RESUMES_INVOKED);
this.unexpectedErrors =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_UNEXPECTED_ERRORS);
this.messageProcessedMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_MONITOR_MESSAGE_PROCESSED);
+ this.produceToConsumeDelayMillis =
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_DAG_ACTION_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
() -> produceToConsumeDelayValue);
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index 1e834c76d..4f7ac6e86 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -33,6 +33,7 @@ import com.typesafe.config.ConfigValueFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
@@ -58,6 +59,9 @@ public class SpecStoreChangeMonitor extends HighLevelConsumer
{
private ContextAwareMeter failedAddedSpecs;
private ContextAwareMeter deletedSpecs;
private ContextAwareMeter unexpectedErrors;
+ private ContextAwareGauge produceToConsumeDelayMillis; // Reports delay from
all partitions in one gauge
+
+ private volatile Long produceToConsumeDelayValue = -1L;
protected CacheLoader<String, String> cacheLoader = new CacheLoader<String,
String>() {
@Override
@@ -102,14 +106,17 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer {
String key = (String) message.getKey();
GenericStoreChangeEvent value = (GenericStoreChangeEvent)
message.getValue();
- Long timestamp = value.getTimestamp();
+ String tid = value.getTxId();
+ Long produceTimestamp = value.getProduceTimestampMillis();
String operation = value.getOperationType().name();
- log.debug("Processing message where specUri is {} timestamp: {} operation:
{}", key, timestamp, operation);
+ produceToConsumeDelayValue = calcMillisSince(produceTimestamp);
+ log.debug("Processing message where specUri is {} tid: {} operation: {}
delay: {}", key, tid, operation,
+ produceToConsumeDelayValue);
- String changeIdentifier = timestamp + key;
+ String changeIdentifier = tid + key;
if (!ChangeMonitorUtils.shouldProcessMessage(changeIdentifier,
specChangesSeenCache, operation,
- timestamp.toString())) {
+ produceTimestamp.toString())) {
return;
}
@@ -158,6 +165,7 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer {
} catch (Exception e) {
log.warn("Ran into unexpected error processing SpecStore changes.
Reexamine scheduler. Error: {}", e);
this.unexpectedErrors.mark();
+ return;
}
specChangesSeenCache.put(changeIdentifier, changeIdentifier);
@@ -171,5 +179,6 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer {
this.deletedSpecs =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_DELETED_SPECS);
this.unexpectedErrors =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MONITOR_UNEXPECTED_ERRORS);
this.messageProcessedMeter =
this.getMetricContext().contextAwareMeter(RuntimeMetrics.GOBBLIN_SPEC_STORE_MESSAGE_PROCESSED);
+ this.produceToConsumeDelayMillis =
this.getMetricContext().newContextAwareGauge(RuntimeMetrics.GOBBLIN_SPEC_STORE_PRODUCE_TO_CONSUME_DELAY_MILLIS,
() -> produceToConsumeDelayValue);
}
}
\ No newline at end of file