This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0244ac8956b099741ea52034ff214a9e1579dc59 Author: Lakshmi Gururaja Rao <[email protected]> AuthorDate: Tue Jul 24 14:13:53 2018 -0700 [FLINK-9899] Add more ShardConsumer metrics This closes #6409. --- docs/monitoring/metrics.md | 65 +++++++++++++++++++ .../kinesis/internals/KinesisDataFetcher.java | 9 ++- .../kinesis/internals/ShardConsumer.java | 18 +++++- .../metrics/KinesisConsumerMetricConstants.java | 9 +++ .../kinesis/metrics/ShardMetricsReporter.java | 72 ++++++++++++++++++++++ 5 files changed, 170 insertions(+), 3 deletions(-) diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index e46549a..f1afa4e 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -1424,6 +1424,71 @@ Thus, in order to infer the metric identifier: </td> <td>Gauge</td> </tr> + <tr> + <th rowspan="1">Operator</th> + <td>sleepTimeMillis</td> + <td>stream, shardId</td> + <td>The number of milliseconds the consumer spends sleeping before fetching records from Kinesis. + A particular shard's metric can be specified by stream name and shard id. + </td> + <td>Gauge</td> + </tr> + <tr> + <th rowspan="1">Operator</th> + <td>maxNumberOfRecordsPerFetch</td> + <td>stream, shardId</td> + <td>The maximum number of records requested by the consumer in a single getRecords call to Kinesis. If ConsumerConfigConstants.SHARD_USE_ADAPTIVE_READS + is set to true, this value is adaptively calculated to maximize the 2 Mbps read limits from Kinesis. + </td> + <td>Gauge</td> + </tr> + <tr> + <th rowspan="1">Operator</th> + <td>numberOfAggregatedRecordsPerFetch</td> + <td>stream, shardId</td> + <td>The number of aggregated Kinesis records fetched by the consumer in a single getRecords call to Kinesis. + </td> + <td>Gauge</td> + </tr> + <tr> + <th rowspan="1">Operator</th> + <td>numberOfDeggregatedRecordsPerFetch</td> + <td>stream, shardId</td> + <td>The number of deaggregated Kinesis records fetched by the consumer in a single getRecords call to Kinesis. + </td> + <td>Gauge</td> + </tr> + <tr> + <th rowspan="1">Operator</th> + <td>averageRecordSizeBytes</td> + <td>stream, shardId</td> + <td>The average size of a Kinesis record in bytes, fetched by the consumer in a single getRecords call. + </td> + <td>Gauge</td> + </tr> + <tr> + <th rowspan="1">Operator</th> + <td>runLoopTimeNanos</td> + <td>stream, shardId</td> + <td>The actual time taken, in nanoseconds, by the consumer in the run loop. + </td> + <td>Gauge</td> + </tr> + <tr> + <th rowspan="1">Operator</th> + <td>loopFrequencyHz</td> + <td>stream, shardId</td> + <td>The number of calls to getRecords in one second. + </td> + <td>Gauge</td> + </tr> + <tr> + <th rowspan="1">Operator</th> + <td>bytesRequestedPerFetch</td> + <td>stream, shardId</td> + <td>The bytes requested (2 Mbps / loopFrequencyHz) in a single call to getRecords. + <td>Gauge</td> + </tr> </tbody> </table> diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index 13de032..0981b76 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -630,7 +630,14 @@ public class KinesisDataFetcher<T> { shardState.getStreamShardHandle().getShard().getShardId()); streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MILLIS_BEHIND_LATEST_GAUGE, shardMetrics::getMillisBehindLatest); - + streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.MAX_RECORDS_PER_FETCH, shardMetrics::getMaxNumberOfRecordsPerFetch); + streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_AGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfAggregatedRecords); + streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.NUM_DEAGGREGATED_RECORDS_PER_FETCH, shardMetrics::getNumberOfDeaggregatedRecords); + streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.AVG_RECORD_SIZE_BYTES, shardMetrics::getAverageRecordSizeBytes); + streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.BYTES_PER_READ, shardMetrics::getBytesPerRead); + streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.RUNTIME_LOOP_NANOS, shardMetrics::getRunLoopTimeNanos); + streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.LOOP_FREQUENCY_HZ, shardMetrics::getLoopFrequencyHz); + streamShardMetricGroup.gauge(KinesisConsumerMetricConstants.SLEEP_TIME_MILLIS, shardMetrics::getSleepTimeMillis); return shardMetrics; } diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index 6de7278..5845eea 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -212,12 +212,16 @@ public class ShardConsumer<T> implements Runnable { // we can close this consumer thread once we've reached the end of the subscribed shard break; } else { - + shardMetricsReporter.setMaxNumberOfRecordsPerFetch(maxNumberOfRecordsPerFetch); GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch); + List<Record> aggregatedRecords = getRecordsResult.getRecords(); + int numberOfAggregatedRecords = aggregatedRecords.size(); + shardMetricsReporter.setNumberOfAggregatedRecords(numberOfAggregatedRecords); + // each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding List<UserRecord> fetchedRecords = deaggregateRecords( - getRecordsResult.getRecords(), + aggregatedRecords, subscribedShard.getShard().getHashKeyRange().getStartingHashKey(), subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); @@ -227,11 +231,15 @@ public class ShardConsumer<T> implements Runnable { deserializeRecordForCollectionAndUpdateState(record); } + int numberOfDeaggregatedRecords = fetchedRecords.size(); + shardMetricsReporter.setNumberOfDeaggregatedRecords(numberOfDeaggregatedRecords); + nextShardItr = getRecordsResult.getNextShardIterator(); long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, System.nanoTime()); long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos; maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes, maxNumberOfRecordsPerFetch); + shardMetricsReporter.setRunLoopTimeNanos(runLoopTimeNanos); processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop } } @@ -256,6 +264,7 @@ public class ShardConsumer<T> implements Runnable { if (sleepTimeMillis > 0) { Thread.sleep(sleepTimeMillis); endTimeNanos = System.nanoTime(); + shardMetricsReporter.setSleepTimeMillis(sleepTimeMillis); } } return endTimeNanos; @@ -280,6 +289,11 @@ public class ShardConsumer<T> implements Runnable { maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes); // Ensure the value is greater than 0 and not more than 10000L maxNumberOfRecordsPerFetch = Math.max(1, Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX)); + + // Set metrics + shardMetricsReporter.setAverageRecordSizeBytes(averageRecordSizeBytes); + shardMetricsReporter.setLoopFrequencyHz(loopFrequencyHz); + shardMetricsReporter.setBytesPerRead(bytesPerRead); } return maxNumberOfRecordsPerFetch; } diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java index 1b83f16..e850d25 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/KinesisConsumerMetricConstants.java @@ -34,4 +34,13 @@ public class KinesisConsumerMetricConstants { public static final String SHARD_METRICS_GROUP = "shardId"; public static final String MILLIS_BEHIND_LATEST_GAUGE = "millisBehindLatest"; + public static final String SLEEP_TIME_MILLIS = "sleepTimeMillis"; + public static final String MAX_RECORDS_PER_FETCH = "maxNumberOfRecordsPerFetch"; + public static final String NUM_AGGREGATED_RECORDS_PER_FETCH = "numberOfAggregatedRecordsPerFetch"; + public static final String NUM_DEAGGREGATED_RECORDS_PER_FETCH = "numberOfDeaggregatedRecordsPerFetch"; + public static final String AVG_RECORD_SIZE_BYTES = "averageRecordSizeBytes"; + public static final String RUNTIME_LOOP_NANOS = "runLoopTimeNanos"; + public static final String LOOP_FREQUENCY_HZ = "loopFrequencyHz"; + public static final String BYTES_PER_READ = "bytesRequestedPerFetch"; + } diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java index 2b6a491..4a27b9c 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java @@ -28,6 +28,14 @@ import org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer; public class ShardMetricsReporter { private volatile long millisBehindLatest = -1; + private volatile double loopFrequencyHz = 0.0; + private volatile double bytesPerRead = 0.0; + private volatile long runLoopTimeNanos = 0L; + private volatile long averageRecordSizeBytes = 0L; + private volatile long sleepTimeMillis = 0L; + private volatile int numberOfAggregatedRecords = 0; + private volatile int numberOfDeaggregatedRecords = 0; + private volatile int maxNumberOfRecordsPerFetch = 0; public long getMillisBehindLatest() { return millisBehindLatest; @@ -37,4 +45,68 @@ public class ShardMetricsReporter { this.millisBehindLatest = millisBehindLatest; } + public double getLoopFrequencyHz() { + return loopFrequencyHz; + } + + public void setLoopFrequencyHz(double loopFrequencyHz) { + this.loopFrequencyHz = loopFrequencyHz; + } + + public double getBytesPerRead() { + return bytesPerRead; + } + + public void setBytesPerRead(double bytesPerRead) { + this.bytesPerRead = bytesPerRead; + } + + public long getRunLoopTimeNanos() { + return runLoopTimeNanos; + } + + public void setRunLoopTimeNanos(long runLoopTimeNanos) { + this.runLoopTimeNanos = runLoopTimeNanos; + } + + public long getAverageRecordSizeBytes() { + return averageRecordSizeBytes; + } + + public void setAverageRecordSizeBytes(long averageRecordSizeBytes) { + this.averageRecordSizeBytes = averageRecordSizeBytes; + } + + public long getSleepTimeMillis() { + return sleepTimeMillis; + } + + public void setSleepTimeMillis(long sleepTimeMillis) { + this.sleepTimeMillis = sleepTimeMillis; + } + + public int getNumberOfAggregatedRecords() { + return numberOfAggregatedRecords; + } + + public void setNumberOfAggregatedRecords(int numberOfAggregatedRecords) { + this.numberOfAggregatedRecords = numberOfAggregatedRecords; + } + + public int getNumberOfDeaggregatedRecords() { + return numberOfDeaggregatedRecords; + } + + public void setNumberOfDeaggregatedRecords(int numberOfDeaggregatedRecords) { + this.numberOfDeaggregatedRecords = numberOfDeaggregatedRecords; + } + + public int getMaxNumberOfRecordsPerFetch() { + return maxNumberOfRecordsPerFetch; + } + + public void setMaxNumberOfRecordsPerFetch(int maxNumberOfRecordsPerFetch) { + this.maxNumberOfRecordsPerFetch = maxNumberOfRecordsPerFetch; + } + }
