lakshmi-manasa-g commented on a change in pull request #1455:
URL: https://github.com/apache/samza/pull/1455#discussion_r553003257
##########
File path: docs/learn/documentation/versioned/container/metrics-table.html
##########
@@ -977,6 +983,57 @@ <h1>Samza Metrics Reference</h1>
<td>single-barrier-rebalancing-time</td>
<td><a href="#average-time">Average time</a> taken for all the
processors to get the latest version of the job model after single processor
change (without the occurence of a barrier timeout)</td>
</tr>
+ <tr>
+ <th colspan="2" class="section"
id="job-coordinator-metadata-manager-metrics">org.apache.samza.coordinator.JobCoordinatorMetadataManager.JobCoordinatorMetadataManagerMetrics<br><span
style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following
metrics are applicable when Application Master High Availability is
enabled</span></th>
+ </tr>
+ <tr>
+ <td>application-attempt-count</td>
+ <td>Denotes the application attempt count within the scope of a single
deployment</td>
+ </tr>
+ <tr>
+ <td>config-changed</td>
+ <td>Denotes configuration changed across attempts within the scope of
a single deployment</td>
+ </tr>
+ <tr>
+ <td>job-model-changed</td>
+ <td>Denotes job model changed across attempts within the scope of a
single deployment</td>
+ </tr>
+ <tr>
+ <td>metadata-read-failed-count</td>
+ <td>Read failure count from the underlying metadata store</td>
+ </tr>
+ <tr>
+ <td>metadata-write-failed-count</td>
+ <td>Write failure count to the underlying metadata store</td>
+ </tr>
+ <tr>
+ <td>metadata-generation-failed-count</td>
+ <td>Number of times the metadata generation failed</td>
+ </tr>
+ <tr>
+ <td>new-deployment</td>
+ <td>Denotes a new deployment due to changes in metadata either
attributed to config or job model</td>
+ </tr>
+ </tbody>
+ <tr>
+ <th colspan="2" class="section"
id="container-heartbeat-monitor-metrics">org.apache.samza.container.ContainerHeartbeatMonitor.ContainerHeartbeatMetrics<br><span
style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following
metrics are applicable when Application Master High Availability is
enabled</span></th>
+ </tr>
+ <tr>
+ <td>heartbeat-discovery-time-ms</td>
+ <td>Time taken in millis for the container to discover new AM in the
event AM changes for heartbeat establishment</td>
+ </tr>
+ <tr>
+ <td>heartbeat-established-failure-count</td>
+ <td>Number of times failed to establish heartbeat in the event of AM
changes</td>
+ </tr>
+ <tr>
+ <td>heartbeat-established-with-new-am-count</td>
+ <td>Number of times heartbeat is established with the new AM in the
event of AM changes</td>
+ </tr>
+ <tr>
+ <td>heartbeat-expired-count</td>
+ <td>Number of times heartbeat expired with the active AM</td>
Review comment:
these are per container or job level metrics?
##########
File path: docs/learn/documentation/versioned/container/metrics-table.html
##########
@@ -977,6 +983,57 @@ <h1>Samza Metrics Reference</h1>
<td>single-barrier-rebalancing-time</td>
<td><a href="#average-time">Average time</a> taken for all the
processors to get the latest version of the job model after single processor
change (without the occurence of a barrier timeout)</td>
</tr>
+ <tr>
+ <th colspan="2" class="section"
id="job-coordinator-metadata-manager-metrics">org.apache.samza.coordinator.JobCoordinatorMetadataManager.JobCoordinatorMetadataManagerMetrics<br><span
style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following
metrics are applicable when Application Master High Availability is
enabled</span></th>
+ </tr>
+ <tr>
+ <td>application-attempt-count</td>
+ <td>Denotes the application attempt count within the scope of a single
deployment</td>
+ </tr>
+ <tr>
+ <td>config-changed</td>
+ <td>Denotes configuration changed across attempts within the scope of
a single deployment</td>
+ </tr>
+ <tr>
+ <td>job-model-changed</td>
+ <td>Denotes job model changed across attempts within the scope of a
single deployment</td>
+ </tr>
+ <tr>
+ <td>metadata-read-failed-count</td>
+ <td>Read failure count from the underlying metadata store</td>
+ </tr>
+ <tr>
+ <td>metadata-write-failed-count</td>
+ <td>Write failure count to the underlying metadata store</td>
Review comment:
reads and writes are specifically in AM HA code and not all reads/writes
to the metadata store right.
##########
File path:
samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -55,31 +66,42 @@
private final long retryCount;
private ContainerHeartbeatClient containerHeartbeatClient;
+ private ContainerHeartbeatMetrics metrics;
+ private Map<String, MetricsReporter> reporters;
private String coordinatorUrl;
private boolean started = false;
public ContainerHeartbeatMonitor(Runnable onContainerExpired, String
coordinatorUrl, String containerExecutionId,
- MetadataStore coordinatorStreamStore, boolean
isApplicationMasterHighAvailabilityEnabled, long retryCount,
- long sleepDurationForReconnectWithAM) {
+ MetadataStore coordinatorStreamStore, Config config) {
this(onContainerExpired, new ContainerHeartbeatClient(coordinatorUrl,
containerExecutionId),
Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY),
coordinatorUrl, containerExecutionId,
- coordinatorStreamStore, isApplicationMasterHighAvailabilityEnabled,
retryCount, sleepDurationForReconnectWithAM);
+ coordinatorStreamStore, config);
}
@VisibleForTesting
ContainerHeartbeatMonitor(Runnable onContainerExpired,
ContainerHeartbeatClient containerHeartbeatClient,
ScheduledExecutorService scheduler, String coordinatorUrl, String
containerExecutionId,
- MetadataStore coordinatorStreamStore, boolean
isApplicationMasterHighAvailabilityEnabled,
- long retryCount, long sleepDurationForReconnectWithAM) {
+ MetadataStore coordinatorStreamStore, Config config) {
this.onContainerExpired = onContainerExpired;
this.containerHeartbeatClient = containerHeartbeatClient;
this.scheduler = scheduler;
this.coordinatorUrl = coordinatorUrl;
this.containerExecutionId = containerExecutionId;
this.coordinatorStreamStore = coordinatorStreamStore;
- this.isApplicationMasterHighAvailabilityEnabled =
isApplicationMasterHighAvailabilityEnabled;
- this.retryCount = retryCount;
- this.sleepDurationForReconnectWithAM = sleepDurationForReconnectWithAM;
+
+ JobConfig jobConfig = new JobConfig(config);
+ this.isApplicationMasterHighAvailabilityEnabled =
jobConfig.getApplicationMasterHighAvailabilityEnabled();
+ this.retryCount = jobConfig.getContainerHeartbeatRetryCount();
+ this.sleepDurationForReconnectWithAM =
jobConfig.getContainerHeartbeatRetrySleepDurationMs();
+
+ initializeMetrics(config);
+ }
+
+ private void initializeMetrics(Config config) {
Review comment:
nit: please put private methods below public
##########
File path:
samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -100,7 +123,7 @@ public void start() {
} catch (Exception e) {
// On exception in re-establish connection with new AM, force exit.
LOG.error("Exception trying to connect with new AM", e);
- forceExit("failure in establishing cconnection with new AM", 0);
+ forceExit("failure in establishing cconnection with new AM",
SHUTDOWN_TIMOUT_MS);
Review comment:
thank you for fixing this.
in case of exception caught in `checkAndEstablishConnectionWithNewAM`, we
should increment getHeartbeatEstablishedFailureCount right.
##########
File path:
samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
##########
@@ -177,22 +213,26 @@ public void testConnectToNewAMFailed() throws
InterruptedException {
@Test
public void testConnectToNewAMSerdeException() throws InterruptedException {
- String containerExecutionId = "0";
String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";
- this.containerHeartbeatMonitor =
- spy(new ContainerHeartbeatMonitor(this.onExpired,
this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
- containerExecutionId, coordinatorStreamStore, true, 5, 10));
+ this.containerHeartbeatMonitor = spy(buildContainerHeartbeatMonitor(true));
CoordinatorStreamValueSerde serde = new
CoordinatorStreamValueSerde(SetConfig.TYPE);
+ ContainerHeartbeatMonitor.ContainerHeartbeatMetrics metrics =
this.containerHeartbeatMonitor.getMetrics();
+
when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
-
when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl,
containerExecutionId)).thenReturn(this.containerHeartbeatClient);
-
when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenThrow(new
NullPointerException("serde failed"));
+
when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl,
CONTAINER_EXECUTION_ID))
+ .thenReturn(this.containerHeartbeatClient);
+
when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL))
+ .thenThrow(new NullPointerException("serde failed"));
this.containerHeartbeatMonitor.start();
// wait for the executor to finish the heartbeat check task
- boolean fixedRateTaskCompleted =
this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+ boolean fixedRateTaskCompleted =
this.schedulerFixedRateExecutionLatch.await(10, TimeUnit.SECONDS);
+
assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+ assertEquals("Heartbeat expired count should be 1", 1,
metrics.getHeartbeatExpiredCount().getCount());
Review comment:
should check for getHeartbeatEstablishedFailureCount = 1
##########
File path:
samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -91,6 +113,7 @@ public void start() {
scheduler.scheduleAtFixedRate(() -> {
ContainerHeartbeatResponse response =
containerHeartbeatClient.requestHeartbeat();
if (!response.isAlive()) {
+ metrics.incrementHeartbeatExpiredCount();
Review comment:
this metric is emitted even when AM HA is not enabled, do we want to
send expired but not established when feature is off?
##########
File path: docs/learn/documentation/versioned/container/metrics-table.html
##########
@@ -977,6 +983,57 @@ <h1>Samza Metrics Reference</h1>
<td>single-barrier-rebalancing-time</td>
<td><a href="#average-time">Average time</a> taken for all the
processors to get the latest version of the job model after single processor
change (without the occurence of a barrier timeout)</td>
</tr>
+ <tr>
+ <th colspan="2" class="section"
id="job-coordinator-metadata-manager-metrics">org.apache.samza.coordinator.JobCoordinatorMetadataManager.JobCoordinatorMetadataManagerMetrics<br><span
style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following
metrics are applicable when Application Master High Availability is
enabled</span></th>
+ </tr>
+ <tr>
+ <td>application-attempt-count</td>
+ <td>Denotes the application attempt count within the scope of a single
deployment</td>
+ </tr>
+ <tr>
+ <td>config-changed</td>
+ <td>Denotes configuration changed across attempts within the scope of
a single deployment</td>
+ </tr>
+ <tr>
+ <td>job-model-changed</td>
+ <td>Denotes job model changed across attempts within the scope of a
single deployment</td>
+ </tr>
+ <tr>
+ <td>metadata-read-failed-count</td>
+ <td>Read failure count from the underlying metadata store</td>
+ </tr>
+ <tr>
+ <td>metadata-write-failed-count</td>
+ <td>Write failure count to the underlying metadata store</td>
+ </tr>
+ <tr>
+ <td>metadata-generation-failed-count</td>
+ <td>Number of times the metadata generation failed</td>
+ </tr>
+ <tr>
+ <td>new-deployment</td>
+ <td>Denotes a new deployment due to changes in metadata either
attributed to config or job model</td>
+ </tr>
+ </tbody>
+ <tr>
+ <th colspan="2" class="section"
id="container-heartbeat-monitor-metrics">org.apache.samza.container.ContainerHeartbeatMonitor.ContainerHeartbeatMetrics<br><span
style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following
metrics are applicable when Application Master High Availability is
enabled</span></th>
+ </tr>
+ <tr>
+ <td>heartbeat-discovery-time-ms</td>
+ <td>Time taken in millis for the container to discover new AM in the
event AM changes for heartbeat establishment</td>
Review comment:
we might want to clarify that this time is set even when heartbeat
returned is false.
##########
File path:
samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -150,8 +183,17 @@ ContainerHeartbeatClient
createContainerHeartbeatClient(String coordinatorUrl, S
return new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId);
}
+ @VisibleForTesting
+ ContainerHeartbeatMetrics getMetrics() {
+ return metrics;
+ }
+
private void forceExit(String message, int timeout) {
scheduler.schedule(() -> {
+ if (started) {
+ reporters.values().forEach(MetricsReporter::stop);
Review comment:
iiuc, this is executed when `container.shutdown` invoked in
`onContainerExpired.run` does not complete in timeout ms right. I feel we
should shutdown the metrics reporters in the path where
`onContainerExpired.run` is executed also.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]