This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 800cf63 feat: add more custom metrics
new 7a05f76 Merge pull request #17064 from [BEAM-12164]: add metrics for
partition reads, queries, and latencies for Spanner Change Streams Connector
800cf63 is described below
commit 800cf63913cef8468fd52a05f3ad7582098fdfec
Author: Hengfeng Li <[email protected]>
AuthorDate: Fri Mar 4 12:02:05 2022 +1100
feat: add more custom metrics
---
.../spanner/changestreams/ChangeStreamMetrics.java | 67 +++++++++++++++++++++-
.../changestreams/action/ActionFactory.java | 7 ++-
.../action/QueryChangeStreamAction.java | 9 ++-
.../dofn/PostProcessingMetricsDoFn.java | 2 +
.../dofn/ReadChangeStreamPartitionDoFn.java | 4 +-
.../action/QueryChangeStreamActionTest.java | 6 +-
.../dofn/ReadChangeStreamPartitionDoFnTest.java | 3 +-
7 files changed, 91 insertions(+), 7 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamMetrics.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamMetrics.java
index 1b9fb7c..f5dfaf0 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamMetrics.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamMetrics.java
@@ -75,6 +75,10 @@ public class ChangeStreamMetrics implements Serializable {
public static final Distribution PARTITION_SCHEDULED_TO_RUNNING_MS =
Metrics.distribution(ChangeStreamMetrics.class,
"partition_scheduled_to_running_ms");
+ /** Counter for the active partition reads during the execution of the
Connector. */
+ public static final Counter ACTIVE_PARTITION_READ_COUNT =
+ Metrics.counter(ChangeStreamMetrics.class,
"active_partition_read_count");
+
// -------------------
// Data record metrics
@@ -84,6 +88,25 @@ public class ChangeStreamMetrics implements Serializable {
public static final Counter DATA_RECORD_COUNT =
Metrics.counter(ChangeStreamMetrics.class, "data_record_count");
+ /** Counter for the total number of queries issued during the execution of
the Connector. */
+ public static final Counter QUERY_COUNT =
+ Metrics.counter(ChangeStreamMetrics.class, "query_count");
+
+ /** Counter for record latencies [0, 1000) ms during the execution of the
Connector. */
+ public static final Counter
DATA_RECORD_COMMITTED_TO_EMITTED_0MS_TO_1000MS_COUNT =
+ Metrics.counter(
+ ChangeStreamMetrics.class,
"data_record_committed_to_emitted_0ms_to_1000ms_count");
+
+ /** Counter for record latencies [1000, 3000) ms during the execution of the
Connector. */
+ public static final Counter
DATA_RECORD_COMMITTED_TO_EMITTED_1000MS_TO_3000MS_COUNT =
+ Metrics.counter(
+ ChangeStreamMetrics.class,
"data_record_committed_to_emitted_1000ms_to_3000ms_count");
+
+ /** Counter for record latencies equal or above 3000ms during the execution
of the Connector. */
+ public static final Counter
DATA_RECORD_COMMITTED_TO_EMITTED_3000MS_TO_INF_COUNT =
+ Metrics.counter(
+ ChangeStreamMetrics.class,
"data_record_committed_to_emitted_3000ms_to_inf_count");
+
// -------------------
// Hearbeat record metrics
@@ -106,8 +129,12 @@ public class ChangeStreamMetrics implements Serializable {
*/
public ChangeStreamMetrics() {
enabledMetrics = new HashSet<>();
- enabledMetrics.add(PARTITION_RECORD_COUNT.getName());
enabledMetrics.add(DATA_RECORD_COUNT.getName());
+ enabledMetrics.add(ACTIVE_PARTITION_READ_COUNT.getName());
+ enabledMetrics.add(QUERY_COUNT.getName());
+
enabledMetrics.add(DATA_RECORD_COMMITTED_TO_EMITTED_0MS_TO_1000MS_COUNT.getName());
+
enabledMetrics.add(DATA_RECORD_COMMITTED_TO_EMITTED_1000MS_TO_3000MS_COUNT.getName());
+
enabledMetrics.add(DATA_RECORD_COMMITTED_TO_EMITTED_3000MS_TO_INF_COUNT.getName());
}
/**
@@ -159,11 +186,32 @@ public class ChangeStreamMetrics implements Serializable {
update(PARTITION_SCHEDULED_TO_RUNNING_MS, duration.getMillis());
}
+ /**
+ * Increments the {@link ChangeStreamMetrics#ACTIVE_PARTITION_READ_COUNT} by
1 if the metric is
+ * enabled.
+ */
+ public void incActivePartitionReadCounter() {
+ inc(ACTIVE_PARTITION_READ_COUNT);
+ }
+
+ /**
+ * Decrements the {@link ChangeStreamMetrics#ACTIVE_PARTITION_READ_COUNT} by
1 if the metric is
+ * enabled.
+ */
+ public void decActivePartitionReadCounter() {
+ dec(ACTIVE_PARTITION_READ_COUNT);
+ }
+
/** Increments the {@link ChangeStreamMetrics#DATA_RECORD_COUNT} by 1 if the
metric is enabled. */
public void incDataRecordCounter() {
inc(DATA_RECORD_COUNT);
}
+ /** Increments the {@link ChangeStreamMetrics#QUERY_COUNT} by 1 if the
metric is enabled. */
+ public void incQueryCounter() {
+ inc(QUERY_COUNT);
+ }
+
/**
* Increments the {@link ChangeStreamMetrics#HEARTBEAT_RECORD_COUNT} by 1 if
the metric is
* enabled.
@@ -178,9 +226,26 @@ public class ChangeStreamMetrics implements Serializable {
}
}
+ private void dec(Counter counter) {
+ if (enabledMetrics.contains(counter.getName())) {
+ counter.dec();
+ }
+ }
+
private void update(Distribution distribution, long value) {
if (enabledMetrics.contains(distribution.getName())) {
distribution.update(value);
}
}
+
+ public void updateDataRecordCommittedToEmitted(Duration duration) {
+ final long millis = duration.getMillis();
+ if (millis < 1000) {
+ inc(DATA_RECORD_COMMITTED_TO_EMITTED_0MS_TO_1000MS_COUNT);
+ } else if (millis < 3000) {
+ inc(DATA_RECORD_COMMITTED_TO_EMITTED_1000MS_TO_3000MS_COUNT);
+ } else {
+ inc(DATA_RECORD_COMMITTED_TO_EMITTED_3000MS_TO_INF_COUNT);
+ }
+ }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
index 5277c36..cf77834 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
@@ -107,6 +107,7 @@ public class ActionFactory implements Serializable {
*
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord}s
* @param childPartitionsRecordAction action class to process {@link
*
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord}s
+ * @param metrics metrics gathering class
* @return single instance of the {@link QueryChangeStreamAction}
*/
public synchronized QueryChangeStreamAction queryChangeStreamAction(
@@ -116,7 +117,8 @@ public class ActionFactory implements Serializable {
PartitionMetadataMapper partitionMetadataMapper,
DataChangeRecordAction dataChangeRecordAction,
HeartbeatRecordAction heartbeatRecordAction,
- ChildPartitionsRecordAction childPartitionsRecordAction) {
+ ChildPartitionsRecordAction childPartitionsRecordAction,
+ ChangeStreamMetrics metrics) {
if (queryChangeStreamActionInstance == null) {
queryChangeStreamActionInstance =
new QueryChangeStreamAction(
@@ -126,7 +128,8 @@ public class ActionFactory implements Serializable {
partitionMetadataMapper,
dataChangeRecordAction,
heartbeatRecordAction,
- childPartitionsRecordAction);
+ childPartitionsRecordAction,
+ metrics);
}
return queryChangeStreamActionInstance;
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index 0a053aa..1114cb5 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -29,6 +29,7 @@ import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.util.List;
import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
import
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSet;
import
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
@@ -78,6 +79,7 @@ public class QueryChangeStreamAction {
private final DataChangeRecordAction dataChangeRecordAction;
private final HeartbeatRecordAction heartbeatRecordAction;
private final ChildPartitionsRecordAction childPartitionsRecordAction;
+ private final ChangeStreamMetrics metrics;
/**
* Constructs an action class for performing a change stream query for a
given partition.
@@ -91,6 +93,7 @@ public class QueryChangeStreamAction {
* @param dataChangeRecordAction action class to process {@link
DataChangeRecord}s
* @param heartbeatRecordAction action class to process {@link
HeartbeatRecord}s
* @param childPartitionsRecordAction action class to process {@link
ChildPartitionsRecord}s
+ * @param metrics metrics gathering class
*/
QueryChangeStreamAction(
ChangeStreamDao changeStreamDao,
@@ -99,7 +102,8 @@ public class QueryChangeStreamAction {
PartitionMetadataMapper partitionMetadataMapper,
DataChangeRecordAction dataChangeRecordAction,
HeartbeatRecordAction heartbeatRecordAction,
- ChildPartitionsRecordAction childPartitionsRecordAction) {
+ ChildPartitionsRecordAction childPartitionsRecordAction,
+ ChangeStreamMetrics metrics) {
this.changeStreamDao = changeStreamDao;
this.partitionMetadataDao = partitionMetadataDao;
this.changeStreamRecordMapper = changeStreamRecordMapper;
@@ -107,6 +111,7 @@ public class QueryChangeStreamAction {
this.dataChangeRecordAction = dataChangeRecordAction;
this.heartbeatRecordAction = heartbeatRecordAction;
this.childPartitionsRecordAction = childPartitionsRecordAction;
+ this.metrics = metrics;
}
/**
@@ -193,6 +198,7 @@ public class QueryChangeStreamAction {
changeStreamDao.changeStreamQuery(
token, startTimestamp, endTimestamp,
partition.getHeartbeatMillis())) {
+ metrics.incQueryCounter();
while (resultSet.next()) {
final List<ChangeStreamRecord> records =
changeStreamRecordMapper.toChangeStreamRecords(
@@ -267,6 +273,7 @@ public class QueryChangeStreamAction {
if (tracker.tryClaim(endTimestamp)) {
LOG.debug("[" + token + "] Finishing partition");
partitionMetadataDao.updateToFinished(token);
+ metrics.decActivePartitionReadCounter();
LOG.info("[" + token + "] Partition finished");
}
return ProcessContinuation.stop();
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/PostProcessingMetricsDoFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/PostProcessingMetricsDoFn.java
index 7c365e1..377f405 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/PostProcessingMetricsDoFn.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/PostProcessingMetricsDoFn.java
@@ -90,6 +90,8 @@ public class PostProcessingMetricsDoFn extends
DoFn<DataChangeRecord, DataChange
emittedTimestamp.toSqlTimestamp().getTime());
final long commitedToEmittedMillis = committedToEmitted.getMillis();
+ metrics.updateDataRecordCommittedToEmitted(committedToEmitted);
+
if (commitedToEmittedMillis > COMMITTED_TO_EMITTED_THRESHOLD_MS) {
LOG.debug(
"Data record took "
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
index e3f066a..d26e79e 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
@@ -143,6 +143,7 @@ public class ReadChangeStreamPartitionDoFn extends
DoFn<PartitionMetadata, DataC
partitionRunningAt.toSqlTimestamp().getTime()));
}
+ metrics.incActivePartitionReadCounter();
return TimestampRange.of(startTimestamp, endTimestamp);
}
@@ -179,7 +180,8 @@ public class ReadChangeStreamPartitionDoFn extends
DoFn<PartitionMetadata, DataC
partitionMetadataMapper,
dataChangeRecordAction,
heartbeatRecordAction,
- childPartitionsRecordAction);
+ childPartitionsRecordAction,
+ metrics);
}
/**
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
index 17d0a83..c59721b 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
@@ -29,6 +29,7 @@ import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Struct;
import java.util.Arrays;
import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
import
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSet;
import
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSetMetadata;
@@ -62,6 +63,7 @@ public class QueryChangeStreamActionTest {
private ChangeStreamDao changeStreamDao;
private PartitionMetadataDao partitionMetadataDao;
private PartitionMetadata partition;
+ private ChangeStreamMetrics metrics;
private TimestampRange restriction;
private RestrictionTracker<TimestampRange, Timestamp> restrictionTracker;
private OutputReceiver<DataChangeRecord> outputReceiver;
@@ -83,6 +85,7 @@ public class QueryChangeStreamActionTest {
dataChangeRecordAction = mock(DataChangeRecordAction.class);
heartbeatRecordAction = mock(HeartbeatRecordAction.class);
childPartitionsRecordAction = mock(ChildPartitionsRecordAction.class);
+ metrics = mock(ChangeStreamMetrics.class);
action =
new QueryChangeStreamAction(
@@ -92,7 +95,8 @@ public class QueryChangeStreamActionTest {
partitionMetadataMapper,
dataChangeRecordAction,
heartbeatRecordAction,
- childPartitionsRecordAction);
+ childPartitionsRecordAction,
+ metrics);
final Struct row = mock(Struct.class);
partition =
PartitionMetadata.newBuilder()
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
index 1afe0dc..6aa5acb 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
@@ -125,7 +125,8 @@ public class ReadChangeStreamPartitionDoFnTest {
partitionMetadataMapper,
dataChangeRecordAction,
heartbeatRecordAction,
- childPartitionsRecordAction))
+ childPartitionsRecordAction,
+ metrics))
.thenReturn(queryChangeStreamAction);
doFn.setup();