This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 6e1301b SAMZA-2259: Change boolean gauges to integer
6e1301b is described below
commit 6e1301bd0c282500a436011335e5d58a5314802c
Author: Daniel Chen <[email protected]>
AuthorDate: Mon Jun 24 10:00:30 2019 -0700
SAMZA-2259: Change boolean gauges to integer
Changing boolean gauges to integer since it is a more widely supported
reporter type.
prateekm Please take a look
Author: Daniel Chen <[email protected]>
Reviewers: Prateek Maheshwari <[email protected]>
Closes #1088 from dxichen/samza-2259
---
docs/learn/documentation/versioned/container/metrics-table.html | 4 ++--
docs/learn/documentation/versioned/operations/monitoring.md | 2 +-
.../src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java | 8 ++++----
.../src/main/java/org/apache/samza/zk/ZkJobCoordinator.java | 4 ++--
.../main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java | 4 ++--
5 files changed, 11 insertions(+), 11 deletions(-)
diff --git a/docs/learn/documentation/versioned/container/metrics-table.html
b/docs/learn/documentation/versioned/container/metrics-table.html
index f35d597..c895fca 100644
--- a/docs/learn/documentation/versioned/container/metrics-table.html
+++ b/docs/learn/documentation/versioned/container/metrics-table.html
@@ -555,7 +555,7 @@
</tr>
<tr>
<td>no-more-messages-SystemStreamPartition [<span
class="system">system</span>, <span class="stream">stream</span>, <span
class="partition">partition</span>]</td>
- <td>Indicates if kafka consumer is at head for particular
partition</td>
+ <td>Indicates if kafka consumer is at head for particular partition. 1
if it is caught up, 0 otherwise.</td>
</tr>
<tr>
<td>blocking-poll-count-SystemStreamPartition [<span
class="system">system</span>, <span class="stream">stream</span>, <span
class="partition">partition</span>]</td>
@@ -938,7 +938,7 @@
</tr>
<tr>
<td>is-leader</td>
- <td>Denotes if the processor is a leader or not</td>
+ <td>Denotes if the processor is a leader or not. 1 if it is a leader,
0 otherwise.</td>
</tr>
<tr>
<td>barrier-creation</td>
diff --git a/docs/learn/documentation/versioned/operations/monitoring.md
b/docs/learn/documentation/versioned/operations/monitoring.md
index bbad335..c11fbe0 100644
--- a/docs/learn/documentation/versioned/operations/monitoring.md
+++ b/docs/learn/documentation/versioned/operations/monitoring.md
@@ -478,7 +478,7 @@ All \<system\>, \<stream\>, \<partition\>, \<store-name\>,
\<topic\>, are popula
| | \<system\>-<host\>-<port\>-skipped-fetch-requests | Number of times the
fetchMessage method is called but no topic/partitions needed new messages. |
| | \<system\>-<host\>-<port\>-topic-partitions | Number of broker's
topic partitions which are being consumed. |
| | poll-count | Number of polls the KafkaSystemConsumer performed to get
new messages. |
-| | no-more-messages-SystemStreamPartition [\<system\>, \<stream\>,
\<partition\>] | Indicates if the Kafka consumer is at the head for particular
partition. |
+| | no-more-messages-SystemStreamPartition [\<system\>, \<stream\>,
\<partition\>] | Indicates if the Kafka consumer is at the head for particular
partition. 1 if it is caught up, 0 otherwise. |
| | blocking-poll-count-SystemStreamPartition [\<system\>, \<stream\>,
\<partition\>] | Number of times a blocking poll is executed (polling until we
get at least one message, or until we catch up to the head of the stream) (per
partition). |
| | blocking-poll-timeout-count-SystemStreamPartition [\<system\>,
\<stream\>, \<partition\>] | Number of times a blocking poll has timed out
(polling until we get at least one message within a timeout period) (per
partition). |
| | buffered-message-count-SystemStreamPartition [\<system\>, \<stream\>,
\<partition\>] | Current number of messages in queue (per partition). |
diff --git
a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 79f340f..8b792b4 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -244,7 +244,7 @@ public abstract class BlockingEnvelopeMap implements
SystemConsumer {
public class BlockingEnvelopeMapMetrics {
private final String group;
private final MetricsRegistry metricsRegistry;
- private final ConcurrentHashMap<SystemStreamPartition, Gauge<Boolean>>
noMoreMessageGaugeMap;
+ private final ConcurrentHashMap<SystemStreamPartition, Gauge<Integer>>
noMoreMessageGaugeMap;
private final ConcurrentHashMap<SystemStreamPartition, Counter>
blockingPollCountMap;
private final ConcurrentHashMap<SystemStreamPartition, Counter>
blockingPollTimeoutCountMap;
private final Counter pollCount;
@@ -252,14 +252,14 @@ public abstract class BlockingEnvelopeMap implements
SystemConsumer {
public BlockingEnvelopeMapMetrics(String group, MetricsRegistry
metricsRegistry) {
this.group = group;
this.metricsRegistry = metricsRegistry;
- this.noMoreMessageGaugeMap = new
ConcurrentHashMap<SystemStreamPartition, Gauge<Boolean>>();
+ this.noMoreMessageGaugeMap = new
ConcurrentHashMap<SystemStreamPartition, Gauge<Integer>>();
this.blockingPollCountMap = new ConcurrentHashMap<SystemStreamPartition,
Counter>();
this.blockingPollTimeoutCountMap = new
ConcurrentHashMap<SystemStreamPartition, Counter>();
this.pollCount = metricsRegistry.newCounter(group, "poll-count");
}
public void initMetrics(SystemStreamPartition systemStreamPartition) {
- this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition,
metricsRegistry.<Boolean>newGauge(group, "no-more-messages-" +
systemStreamPartition, false));
+ this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition,
metricsRegistry.newGauge(group, "no-more-messages-" + systemStreamPartition,
0));
this.blockingPollCountMap.putIfAbsent(systemStreamPartition,
metricsRegistry.newCounter(group, "blocking-poll-count-" +
systemStreamPartition));
this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition,
metricsRegistry.newCounter(group, "blocking-poll-timeout-count-" +
systemStreamPartition));
@@ -268,7 +268,7 @@ public abstract class BlockingEnvelopeMap implements
SystemConsumer {
}
public void setNoMoreMessages(SystemStreamPartition systemStreamPartition,
boolean noMoreMessages) {
-
this.noMoreMessageGaugeMap.get(systemStreamPartition).set(noMoreMessages);
+ this.noMoreMessageGaugeMap.get(systemStreamPartition).set(noMoreMessages
? 1 : 0);
}
public void incBlockingPoll(SystemStreamPartition systemStreamPartition) {
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index b0b9836..bcf0f11 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -171,7 +171,7 @@ public class ZkJobCoordinator implements JobCoordinator {
// Notify the metrics about abandoning the leadership. Moving it up the
chain in the shutdown sequence so that
// in case of unclean shutdown, we get notified about lack of leader and
we can set up some alerts around the absence of leader.
- metrics.isLeader.set(false);
+ metrics.isLeader.set(0);
try {
// todo: what does it mean for coordinator listener to be null? why
not have it part of constructor?
@@ -424,7 +424,7 @@ public class ZkJobCoordinator implements JobCoordinator {
@Override
public void onBecomingLeader() {
LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader");
- metrics.isLeader.set(true);
+ metrics.isLeader.set(1);
zkUtils.subscribeToProcessorChange(new ProcessorChangeHandler(zkUtils));
if (!new StorageConfig(config).hasDurableStores()) {
// 1. Stop if there's a existing StreamPartitionCountMonitor running.
diff --git
a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
index 3d00897..e39e058 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
@@ -34,7 +34,7 @@ public class ZkJobCoordinatorMetrics extends MetricsBase {
/**
* Denotes if the processor is a leader or not
*/
- public final Gauge<Boolean> isLeader;
+ public final Gauge<Integer> isLeader;
/**
* Number of times a barrier was created by the leader
@@ -60,7 +60,7 @@ public class ZkJobCoordinatorMetrics extends MetricsBase {
public ZkJobCoordinatorMetrics(MetricsRegistry metricsRegistry) {
super(metricsRegistry);
this.metricsRegistry = metricsRegistry;
- this.isLeader = newGauge("is-leader", false);
+ this.isLeader = newGauge("is-leader", 0);
this.barrierCreation = newCounter("barrier-creation");
this.barrierStateChange = newCounter("barrier-state-change");
this.barrierError = newCounter("barrier-error");