This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0244ac8956b099741ea52034ff214a9e1579dc59
Author: Lakshmi Gururaja Rao <[email protected]>
AuthorDate: Tue Jul 24 14:13:53 2018 -0700

    [FLINK-9899] Add more ShardConsumer metrics
    
    This closes #6409.
---
 docs/monitoring/metrics.md                         | 65 +++++++++++++++++++
 .../kinesis/internals/KinesisDataFetcher.java      |  9 ++-
 .../kinesis/internals/ShardConsumer.java           | 18 +++++-
 .../metrics/KinesisConsumerMetricConstants.java    |  9 +++
 .../kinesis/metrics/ShardMetricsReporter.java      | 72 ++++++++++++++++++++++
 5 files changed, 170 insertions(+), 3 deletions(-)

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index e46549a..f1afa4e 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1424,6 +1424,71 @@ Thus, in order to infer the metric identifier:
       </td>
       <td>Gauge</td>
     </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>sleepTimeMillis</td>
+      <td>stream, shardId</td>
+      <td>The number of milliseconds the consumer spends sleeping before 
fetching records from Kinesis.
+      A particular shard's metric can be specified by stream name and shard id.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>maxNumberOfRecordsPerFetch</td>
+      <td>stream, shardId</td>
+      <td>The maximum number of records requested by the consumer in a single 
getRecords call to Kinesis. If ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS
+      is set to true, this value is adaptively calculated to maximize the 2 
Mbps read limits from Kinesis.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>numberOfAggregatedRecordsPerFetch</td>
+      <td>stream, shardId</td>
+      <td>The number of aggregated Kinesis records fetched by the consumer in 
a single getRecords call to Kinesis.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>numberOfDeggregatedRecordsPerFetch</td>
+      <td>stream, shardId</td>
+      <td>The number of deaggregated Kinesis records fetched by the consumer 
in a single getRecords call to Kinesis.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>averageRecordSizeBytes</td>
+      <td>stream, shardId</td>
+      <td>The average size of a Kinesis record in bytes, fetched by the 
consumer in a single getRecords call.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>runLoopTimeNanos</td>
+      <td>stream, shardId</td>
+      <td>The actual time taken, in nanoseconds, by the consumer in the run 
loop.
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>loopFrequencyHz</td>
+      <td>stream, shardId</td>
+      <td>The number of calls to getRecords in one second. 
+      </td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <th rowspan="1">Operator</th>
+      <td>bytesRequestedPerFetch</td>
+      <td>stream, shardId</td>
+      <td>The bytes requested (2 Mbps / loopFrequencyHz) in a single call to 
getRecords.
+      <td>Gauge</td>
+    </tr>
   </tbody>
 </table>
 
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 13de032..0981b76 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -630,7 +630,14 @@ public class KinesisDataFetcher<T> {
                                
shardState.getStreamShardHandle().getShard().getShardId());
 
                
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE,
 shardMetrics::getMillisBehindLatest);
-
+               
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH,
 shardMetrics::getMaxNumberOfRecordsPerFetch);
+               
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH,
 shardMetrics::getNumberOfAggregatedRecords);
+               
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH,
 shardMetrics::getNumberOfDeaggregatedRecords);
+               
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES,
 shardMetrics::getAverageRecordSizeBytes);
+               
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.BYTES_PER_READ, 
shardMetrics::getBytesPerRead);
+               
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS, 
shardMetrics::getRunLoopTimeNanos);
+               
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ, 
shardMetrics::getLoopFrequencyHz);
+               
streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS, 
shardMetrics::getSleepTimeMillis);
                return shardMetrics;
        }
 
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 6de7278..5845eea 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -212,12 +212,16 @@ public class ShardConsumer<T> implements Runnable {
                                        // we can close this consumer thread 
once we've reached the end of the subscribed shard
                                        break;
                                } else {
-
+                                       
shardMetricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecordsPerFetch);
                                        GetRecordsResult getRecordsResult = 
getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
 
+                                       List<Record> aggregatedRecords = 
getRecordsResult.getRecords();
+                                       int numberOfAggregatedRecords = 
aggregatedRecords.size();
+                                       
shardMetricsReporter.setNumberOfAggregatedRecords(numberOfAggregatedRecords);
+
                                        // each of the Kinesis records may be 
aggregated, so we must deaggregate them before proceeding
                                        List<UserRecord> fetchedRecords = 
deaggregateRecords(
-                                               getRecordsResult.getRecords(),
+                                               aggregatedRecords,
                                                
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
                                                
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
@@ -227,11 +231,15 @@ public class ShardConsumer<T> implements Runnable {
                                                
deserializeRecordForCollectionAndUpdateState(record);
                                        }
 
+                                       int numberOfDeaggregatedRecords = 
fetchedRecords.size();
+                                       
shardMetricsReporter.setNumberOfDeaggregatedRecords(numberOfDeaggregatedRecords);
+
                                        nextShardItr = 
getRecordsResult.getNextShardIterator();
 
                                        long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime());
                                        long runLoopTimeNanos = 
adjustmentEndTimeNanos - processingStartTimeNanos;
                                        maxNumberOfRecordsPerFetch = 
adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), 
recordBatchSizeBytes, maxNumberOfRecordsPerFetch);
+                                       
shardMetricsReporter.setRunLoopTimeNanos(runLoopTimeNanos);
                                        processingStartTimeNanos = 
adjustmentEndTimeNanos; // for next time through the loop
                                }
                        }
@@ -256,6 +264,7 @@ public class ShardConsumer<T> implements Runnable {
                        if (sleepTimeMillis > 0) {
                                Thread.sleep(sleepTimeMillis);
                                endTimeNanos = System.nanoTime();
+                               
shardMetricsReporter.setSleepTimeMillis(sleepTimeMillis);
                        }
                }
                return endTimeNanos;
@@ -280,6 +289,11 @@ public class ShardConsumer<T> implements Runnable {
                        maxNumberOfRecordsPerFetch = (int) (bytesPerRead / 
averageRecordSizeBytes);
                        // Ensure the value is greater than 0 and not more than 
10000L
                        maxNumberOfRecordsPerFetch = Math.max(1, 
Math.min(maxNumberOfRecordsPerFetch, 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX));
+
+                       // Set metrics
+                       
shardMetricsReporter.setAverageRecordSizeBytes(averageRecordSizeBytes);
+                       
shardMetricsReporter.setLoopFrequencyHz(loopFrequencyHz);
+                       shardMetricsReporter.setBytesPerRead(bytesPerRead);
                }
                return maxNumberOfRecordsPerFetch;
        }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java
index 1b83f16..e850d25 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java
@@ -34,4 +34,13 @@ public class KinesisConsumerMetricConstants {
        public static final String SHARD_METRICS_GROUP = "shardId";
 
        public static final String MILLIS_BEHIND_LATEST_GAUGE = 
"millisBehindLatest";
+       public static final String SLEEP_TIME_MILLIS = "sleepTimeMillis";
+       public static final String MAX_RECORDS_PER_FETCH = 
"maxNumberOfRecordsPerFetch";
+       public static final String NUM_AGGREGATED_RECORDS_PER_FETCH = 
"numberOfAggregatedRecordsPerFetch";
+       public static final String NUM_DEAGGREGATED_RECORDS_PER_FETCH = 
"numberOfDeaggregatedRecordsPerFetch";
+       public static final String AVG_RECORD_SIZE_BYTES = 
"averageRecordSizeBytes";
+       public static final String RUNTIME_LOOP_NANOS = "runLoopTimeNanos";
+       public static final String LOOP_FREQUENCY_HZ = "loopFrequencyHz";
+       public static final String BYTES_PER_READ = "bytesRequestedPerFetch";
+
 }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
index 2b6a491..4a27b9c 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java
@@ -28,6 +28,14 @@ import 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer;
 public class ShardMetricsReporter {
 
        private volatile long millisBehindLatest = -1;
+       private volatile double loopFrequencyHz = 0.0;
+       private volatile double bytesPerRead = 0.0;
+       private volatile long runLoopTimeNanos = 0L;
+       private volatile long averageRecordSizeBytes = 0L;
+       private volatile long sleepTimeMillis = 0L;
+       private volatile int numberOfAggregatedRecords = 0;
+       private volatile int numberOfDeaggregatedRecords = 0;
+       private volatile int maxNumberOfRecordsPerFetch = 0;
 
        public long getMillisBehindLatest() {
                return millisBehindLatest;
@@ -37,4 +45,68 @@ public class ShardMetricsReporter {
                this.millisBehindLatest = millisBehindLatest;
        }
 
+       public double getLoopFrequencyHz() {
+               return loopFrequencyHz;
+       }
+
+       public void setLoopFrequencyHz(double loopFrequencyHz) {
+               this.loopFrequencyHz = loopFrequencyHz;
+       }
+
+       public double getBytesPerRead() {
+               return bytesPerRead;
+       }
+
+       public void setBytesPerRead(double bytesPerRead) {
+               this.bytesPerRead = bytesPerRead;
+       }
+
+       public long getRunLoopTimeNanos() {
+               return runLoopTimeNanos;
+       }
+
+       public void setRunLoopTimeNanos(long runLoopTimeNanos) {
+               this.runLoopTimeNanos = runLoopTimeNanos;
+       }
+
+       public long getAverageRecordSizeBytes() {
+               return averageRecordSizeBytes;
+       }
+
+       public void setAverageRecordSizeBytes(long averageRecordSizeBytes) {
+               this.averageRecordSizeBytes = averageRecordSizeBytes;
+       }
+
+       public long getSleepTimeMillis() {
+               return sleepTimeMillis;
+       }
+
+       public void setSleepTimeMillis(long sleepTimeMillis) {
+               this.sleepTimeMillis = sleepTimeMillis;
+       }
+
+       public int getNumberOfAggregatedRecords() {
+               return numberOfAggregatedRecords;
+       }
+
+       public void setNumberOfAggregatedRecords(int numberOfAggregatedRecords) 
{
+               this.numberOfAggregatedRecords = numberOfAggregatedRecords;
+       }
+
+       public int getNumberOfDeaggregatedRecords() {
+               return numberOfDeaggregatedRecords;
+       }
+
+       public void setNumberOfDeaggregatedRecords(int 
numberOfDeaggregatedRecords) {
+               this.numberOfDeaggregatedRecords = numberOfDeaggregatedRecords;
+       }
+
+       public int getMaxNumberOfRecordsPerFetch() {
+               return maxNumberOfRecordsPerFetch;
+       }
+
+       public void setMaxNumberOfRecordsPerFetch(int 
maxNumberOfRecordsPerFetch) {
+               this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch;
+       }
+
 }

Reply via email to