This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 800cf63  feat: add more custom metrics
     new 7a05f76  Merge pull request #17064 from [BEAM-12164]: add metrics for 
partition reads, queries, and latencies for Spanner Change Streams Connector
800cf63 is described below

commit 800cf63913cef8468fd52a05f3ad7582098fdfec
Author: Hengfeng Li <[email protected]>
AuthorDate: Fri Mar 4 12:02:05 2022 +1100

    feat: add more custom metrics
---
 .../spanner/changestreams/ChangeStreamMetrics.java | 67 +++++++++++++++++++++-
 .../changestreams/action/ActionFactory.java        |  7 ++-
 .../action/QueryChangeStreamAction.java            |  9 ++-
 .../dofn/PostProcessingMetricsDoFn.java            |  2 +
 .../dofn/ReadChangeStreamPartitionDoFn.java        |  4 +-
 .../action/QueryChangeStreamActionTest.java        |  6 +-
 .../dofn/ReadChangeStreamPartitionDoFnTest.java    |  3 +-
 7 files changed, 91 insertions(+), 7 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamMetrics.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamMetrics.java
index 1b9fb7c..f5dfaf0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamMetrics.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/ChangeStreamMetrics.java
@@ -75,6 +75,10 @@ public class ChangeStreamMetrics implements Serializable {
   public static final Distribution PARTITION_SCHEDULED_TO_RUNNING_MS =
       Metrics.distribution(ChangeStreamMetrics.class, 
"partition_scheduled_to_running_ms");
 
+  /** Counter for the active partition reads during the execution of the 
Connector. */
+  public static final Counter ACTIVE_PARTITION_READ_COUNT =
+      Metrics.counter(ChangeStreamMetrics.class, 
"active_partition_read_count");
+
   // -------------------
   // Data record metrics
 
@@ -84,6 +88,25 @@ public class ChangeStreamMetrics implements Serializable {
   public static final Counter DATA_RECORD_COUNT =
       Metrics.counter(ChangeStreamMetrics.class, "data_record_count");
 
+  /** Counter for the total number of queries issued during the execution of 
the Connector. */
+  public static final Counter QUERY_COUNT =
+      Metrics.counter(ChangeStreamMetrics.class, "query_count");
+
+  /** Counter for record latencies [0, 1000) ms during the execution of the 
Connector. */
+  public static final Counter 
DATA_RECORD_COMMITTED_TO_EMITTED_0MS_TO_1000MS_COUNT =
+      Metrics.counter(
+          ChangeStreamMetrics.class, 
"data_record_committed_to_emitted_0ms_to_1000ms_count");
+
+  /** Counter for record latencies [1000, 3000) ms during the execution of the 
Connector. */
+  public static final Counter 
DATA_RECORD_COMMITTED_TO_EMITTED_1000MS_TO_3000MS_COUNT =
+      Metrics.counter(
+          ChangeStreamMetrics.class, 
"data_record_committed_to_emitted_1000ms_to_3000ms_count");
+
+  /** Counter for record latencies equal or above 3000ms during the execution 
of the Connector. */
+  public static final Counter 
DATA_RECORD_COMMITTED_TO_EMITTED_3000MS_TO_INF_COUNT =
+      Metrics.counter(
+          ChangeStreamMetrics.class, 
"data_record_committed_to_emitted_3000ms_to_inf_count");
+
   // -------------------
   // Hearbeat record metrics
 
@@ -106,8 +129,12 @@ public class ChangeStreamMetrics implements Serializable {
    */
   public ChangeStreamMetrics() {
     enabledMetrics = new HashSet<>();
-    enabledMetrics.add(PARTITION_RECORD_COUNT.getName());
     enabledMetrics.add(DATA_RECORD_COUNT.getName());
+    enabledMetrics.add(ACTIVE_PARTITION_READ_COUNT.getName());
+    enabledMetrics.add(QUERY_COUNT.getName());
+    
enabledMetrics.add(DATA_RECORD_COMMITTED_TO_EMITTED_0MS_TO_1000MS_COUNT.getName());
+    
enabledMetrics.add(DATA_RECORD_COMMITTED_TO_EMITTED_1000MS_TO_3000MS_COUNT.getName());
+    
enabledMetrics.add(DATA_RECORD_COMMITTED_TO_EMITTED_3000MS_TO_INF_COUNT.getName());
   }
 
   /**
@@ -159,11 +186,32 @@ public class ChangeStreamMetrics implements Serializable {
     update(PARTITION_SCHEDULED_TO_RUNNING_MS, duration.getMillis());
   }
 
+  /**
+   * Increments the {@link ChangeStreamMetrics#ACTIVE_PARTITION_READ_COUNT} by 
1 if the metric is
+   * enabled.
+   */
+  public void incActivePartitionReadCounter() {
+    inc(ACTIVE_PARTITION_READ_COUNT);
+  }
+
+  /**
+   * Decrements the {@link ChangeStreamMetrics#ACTIVE_PARTITION_READ_COUNT} by 
1 if the metric is
+   * enabled.
+   */
+  public void decActivePartitionReadCounter() {
+    dec(ACTIVE_PARTITION_READ_COUNT);
+  }
+
   /** Increments the {@link ChangeStreamMetrics#DATA_RECORD_COUNT} by 1 if the 
metric is enabled. */
   public void incDataRecordCounter() {
     inc(DATA_RECORD_COUNT);
   }
 
+  /** Increments the {@link ChangeStreamMetrics#QUERY_COUNT} by 1 if the 
metric is enabled. */
+  public void incQueryCounter() {
+    inc(QUERY_COUNT);
+  }
+
   /**
    * Increments the {@link ChangeStreamMetrics#HEARTBEAT_RECORD_COUNT} by 1 if 
the metric is
    * enabled.
@@ -178,9 +226,26 @@ public class ChangeStreamMetrics implements Serializable {
     }
   }
 
+  private void dec(Counter counter) {
+    if (enabledMetrics.contains(counter.getName())) {
+      counter.dec();
+    }
+  }
+
   private void update(Distribution distribution, long value) {
     if (enabledMetrics.contains(distribution.getName())) {
       distribution.update(value);
     }
   }
+
+  public void updateDataRecordCommittedToEmitted(Duration duration) {
+    final long millis = duration.getMillis();
+    if (millis < 1000) {
+      inc(DATA_RECORD_COMMITTED_TO_EMITTED_0MS_TO_1000MS_COUNT);
+    } else if (millis < 3000) {
+      inc(DATA_RECORD_COMMITTED_TO_EMITTED_1000MS_TO_3000MS_COUNT);
+    } else {
+      inc(DATA_RECORD_COMMITTED_TO_EMITTED_3000MS_TO_INF_COUNT);
+    }
+  }
 }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
index 5277c36..cf77834 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java
@@ -107,6 +107,7 @@ public class ActionFactory implements Serializable {
    *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord}s
    * @param childPartitionsRecordAction action class to process {@link
    *     
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord}s
+   * @param metrics metrics gathering class
    * @return single instance of the {@link QueryChangeStreamAction}
    */
   public synchronized QueryChangeStreamAction queryChangeStreamAction(
@@ -116,7 +117,8 @@ public class ActionFactory implements Serializable {
       PartitionMetadataMapper partitionMetadataMapper,
       DataChangeRecordAction dataChangeRecordAction,
       HeartbeatRecordAction heartbeatRecordAction,
-      ChildPartitionsRecordAction childPartitionsRecordAction) {
+      ChildPartitionsRecordAction childPartitionsRecordAction,
+      ChangeStreamMetrics metrics) {
     if (queryChangeStreamActionInstance == null) {
       queryChangeStreamActionInstance =
           new QueryChangeStreamAction(
@@ -126,7 +128,8 @@ public class ActionFactory implements Serializable {
               partitionMetadataMapper,
               dataChangeRecordAction,
               heartbeatRecordAction,
-              childPartitionsRecordAction);
+              childPartitionsRecordAction,
+              metrics);
     }
     return queryChangeStreamActionInstance;
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
index 0a053aa..1114cb5 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java
@@ -29,6 +29,7 @@ import io.opencensus.trace.Tracer;
 import io.opencensus.trace.Tracing;
 import java.util.List;
 import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSet;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
@@ -78,6 +79,7 @@ public class QueryChangeStreamAction {
   private final DataChangeRecordAction dataChangeRecordAction;
   private final HeartbeatRecordAction heartbeatRecordAction;
   private final ChildPartitionsRecordAction childPartitionsRecordAction;
+  private final ChangeStreamMetrics metrics;
 
   /**
    * Constructs an action class for performing a change stream query for a 
given partition.
@@ -91,6 +93,7 @@ public class QueryChangeStreamAction {
    * @param dataChangeRecordAction action class to process {@link 
DataChangeRecord}s
    * @param heartbeatRecordAction action class to process {@link 
HeartbeatRecord}s
    * @param childPartitionsRecordAction action class to process {@link 
ChildPartitionsRecord}s
+   * @param metrics metrics gathering class
    */
   QueryChangeStreamAction(
       ChangeStreamDao changeStreamDao,
@@ -99,7 +102,8 @@ public class QueryChangeStreamAction {
       PartitionMetadataMapper partitionMetadataMapper,
       DataChangeRecordAction dataChangeRecordAction,
       HeartbeatRecordAction heartbeatRecordAction,
-      ChildPartitionsRecordAction childPartitionsRecordAction) {
+      ChildPartitionsRecordAction childPartitionsRecordAction,
+      ChangeStreamMetrics metrics) {
     this.changeStreamDao = changeStreamDao;
     this.partitionMetadataDao = partitionMetadataDao;
     this.changeStreamRecordMapper = changeStreamRecordMapper;
@@ -107,6 +111,7 @@ public class QueryChangeStreamAction {
     this.dataChangeRecordAction = dataChangeRecordAction;
     this.heartbeatRecordAction = heartbeatRecordAction;
     this.childPartitionsRecordAction = childPartitionsRecordAction;
+    this.metrics = metrics;
   }
 
   /**
@@ -193,6 +198,7 @@ public class QueryChangeStreamAction {
           changeStreamDao.changeStreamQuery(
               token, startTimestamp, endTimestamp, 
partition.getHeartbeatMillis())) {
 
+        metrics.incQueryCounter();
         while (resultSet.next()) {
           final List<ChangeStreamRecord> records =
               changeStreamRecordMapper.toChangeStreamRecords(
@@ -267,6 +273,7 @@ public class QueryChangeStreamAction {
     if (tracker.tryClaim(endTimestamp)) {
       LOG.debug("[" + token + "] Finishing partition");
       partitionMetadataDao.updateToFinished(token);
+      metrics.decActivePartitionReadCounter();
       LOG.info("[" + token + "] Partition finished");
     }
     return ProcessContinuation.stop();
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/PostProcessingMetricsDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/PostProcessingMetricsDoFn.java
index 7c365e1..377f405 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/PostProcessingMetricsDoFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/PostProcessingMetricsDoFn.java
@@ -90,6 +90,8 @@ public class PostProcessingMetricsDoFn extends 
DoFn<DataChangeRecord, DataChange
             emittedTimestamp.toSqlTimestamp().getTime());
     final long commitedToEmittedMillis = committedToEmitted.getMillis();
 
+    metrics.updateDataRecordCommittedToEmitted(committedToEmitted);
+
     if (commitedToEmittedMillis > COMMITTED_TO_EMITTED_THRESHOLD_MS) {
       LOG.debug(
           "Data record took "
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
index e3f066a..d26e79e 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
@@ -143,6 +143,7 @@ public class ReadChangeStreamPartitionDoFn extends 
DoFn<PartitionMetadata, DataC
               partitionRunningAt.toSqlTimestamp().getTime()));
     }
 
+    metrics.incActivePartitionReadCounter();
     return TimestampRange.of(startTimestamp, endTimestamp);
   }
 
@@ -179,7 +180,8 @@ public class ReadChangeStreamPartitionDoFn extends 
DoFn<PartitionMetadata, DataC
             partitionMetadataMapper,
             dataChangeRecordAction,
             heartbeatRecordAction,
-            childPartitionsRecordAction);
+            childPartitionsRecordAction,
+            metrics);
   }
 
   /**
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
index 17d0a83..c59721b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
@@ -29,6 +29,7 @@ import com.google.cloud.Timestamp;
 import com.google.cloud.spanner.Struct;
 import java.util.Arrays;
 import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSet;
 import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSetMetadata;
@@ -62,6 +63,7 @@ public class QueryChangeStreamActionTest {
   private ChangeStreamDao changeStreamDao;
   private PartitionMetadataDao partitionMetadataDao;
   private PartitionMetadata partition;
+  private ChangeStreamMetrics metrics;
   private TimestampRange restriction;
   private RestrictionTracker<TimestampRange, Timestamp> restrictionTracker;
   private OutputReceiver<DataChangeRecord> outputReceiver;
@@ -83,6 +85,7 @@ public class QueryChangeStreamActionTest {
     dataChangeRecordAction = mock(DataChangeRecordAction.class);
     heartbeatRecordAction = mock(HeartbeatRecordAction.class);
     childPartitionsRecordAction = mock(ChildPartitionsRecordAction.class);
+    metrics = mock(ChangeStreamMetrics.class);
 
     action =
         new QueryChangeStreamAction(
@@ -92,7 +95,8 @@ public class QueryChangeStreamActionTest {
             partitionMetadataMapper,
             dataChangeRecordAction,
             heartbeatRecordAction,
-            childPartitionsRecordAction);
+            childPartitionsRecordAction,
+            metrics);
     final Struct row = mock(Struct.class);
     partition =
         PartitionMetadata.newBuilder()
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
index 1afe0dc..6aa5acb 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
@@ -125,7 +125,8 @@ public class ReadChangeStreamPartitionDoFnTest {
             partitionMetadataMapper,
             dataChangeRecordAction,
             heartbeatRecordAction,
-            childPartitionsRecordAction))
+            childPartitionsRecordAction,
+            metrics))
         .thenReturn(queryChangeStreamAction);
 
     doFn.setup();

Reply via email to