lakshmi-manasa-g commented on a change in pull request #1455:
URL: https://github.com/apache/samza/pull/1455#discussion_r553003257



##########
File path: docs/learn/documentation/versioned/container/metrics-table.html
##########
@@ -977,6 +983,57 @@ <h1>Samza Metrics Reference</h1>
         <td>single-barrier-rebalancing-time</td>
         <td><a href="#average-time">Average time</a> taken for all the 
processors to get the latest version of the job model after single processor 
change (without the occurence of a barrier timeout)</td>
     </tr>
+    <tr>
+        <th colspan="2" class="section" 
id="job-coordinator-metadata-manager-metrics">org.apache.samza.coordinator.JobCoordinatorMetadataManager.JobCoordinatorMetadataManagerMetrics<br><span
 style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following 
metrics are applicable when Application Master High Availability is 
enabled</span></th>
+    </tr>
+    <tr>
+        <td>application-attempt-count</td>
+        <td>Denotes the application attempt count within the scope of a single 
deployment</td>
+    </tr>
+    <tr>
+        <td>config-changed</td>
+        <td>Denotes configuration changed across attempts within the scope of 
a single deployment</td>
+    </tr>
+    <tr>
+        <td>job-model-changed</td>
+        <td>Denotes job model changed across attempts within the scope of a 
single deployment</td>
+    </tr>
+    <tr>
+        <td>metadata-read-failed-count</td>
+        <td>Read failure count from the underlying metadata store</td>
+    </tr>
+    <tr>
+        <td>metadata-write-failed-count</td>
+        <td>Write failure count to the underlying metadata store</td>
+    </tr>
+    <tr>
+        <td>metadata-generation-failed-count</td>
+        <td>Number of times the metadata generation failed</td>
+    </tr>
+    <tr>
+        <td>new-deployment</td>
+        <td>Denotes a new deployment due to changes in metadata either 
attributed to config or job model</td>
+    </tr>
+    </tbody>
+    <tr>
+        <th colspan="2" class="section" 
id="container-heartbeat-monitor-metrics">org.apache.samza.container.ContainerHeartbeatMonitor.ContainerHeartbeatMetrics<br><span
 style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following 
metrics are applicable when Application Master High Availability is 
enabled</span></th>
+    </tr>
+    <tr>
+        <td>heartbeat-discovery-time-ms</td>
+        <td>Time taken in millis for the container to discover new AM in the 
event AM changes for heartbeat establishment</td>
+    </tr>
+    <tr>
+        <td>heartbeat-established-failure-count</td>
+        <td>Number of times failed to establish heartbeat in the event of AM 
changes</td>
+    </tr>
+    <tr>
+        <td>heartbeat-established-with-new-am-count</td>
+        <td>Number of times heartbeat is established with the new AM in the 
event of AM changes</td>
+    </tr>
+    <tr>
+        <td>heartbeat-expired-count</td>
+        <td>Number of times heartbeat expired with the active AM</td>

Review comment:
       these are per container or job level metrics?

##########
File path: docs/learn/documentation/versioned/container/metrics-table.html
##########
@@ -977,6 +983,57 @@ <h1>Samza Metrics Reference</h1>
         <td>single-barrier-rebalancing-time</td>
         <td><a href="#average-time">Average time</a> taken for all the 
processors to get the latest version of the job model after single processor 
change (without the occurence of a barrier timeout)</td>
     </tr>
+    <tr>
+        <th colspan="2" class="section" 
id="job-coordinator-metadata-manager-metrics">org.apache.samza.coordinator.JobCoordinatorMetadataManager.JobCoordinatorMetadataManagerMetrics<br><span
 style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following 
metrics are applicable when Application Master High Availability is 
enabled</span></th>
+    </tr>
+    <tr>
+        <td>application-attempt-count</td>
+        <td>Denotes the application attempt count within the scope of a single 
deployment</td>
+    </tr>
+    <tr>
+        <td>config-changed</td>
+        <td>Denotes configuration changed across attempts within the scope of 
a single deployment</td>
+    </tr>
+    <tr>
+        <td>job-model-changed</td>
+        <td>Denotes job model changed across attempts within the scope of a 
single deployment</td>
+    </tr>
+    <tr>
+        <td>metadata-read-failed-count</td>
+        <td>Read failure count from the underlying metadata store</td>
+    </tr>
+    <tr>
+        <td>metadata-write-failed-count</td>
+        <td>Write failure count to the underlying metadata store</td>

Review comment:
       reads and writes are specifically in AM HA code and not all reads/writes 
to the metadata store right. 

##########
File path: 
samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -55,31 +66,42 @@
   private final long retryCount;
 
   private ContainerHeartbeatClient containerHeartbeatClient;
+  private ContainerHeartbeatMetrics metrics;
+  private Map<String, MetricsReporter> reporters;
   private String coordinatorUrl;
   private boolean started = false;
 
   public ContainerHeartbeatMonitor(Runnable onContainerExpired, String 
coordinatorUrl, String containerExecutionId,
-      MetadataStore coordinatorStreamStore, boolean 
isApplicationMasterHighAvailabilityEnabled, long retryCount,
-      long sleepDurationForReconnectWithAM) {
+      MetadataStore coordinatorStreamStore, Config config) {
     this(onContainerExpired, new ContainerHeartbeatClient(coordinatorUrl, 
containerExecutionId),
         Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY), 
coordinatorUrl, containerExecutionId,
-        coordinatorStreamStore, isApplicationMasterHighAvailabilityEnabled, 
retryCount, sleepDurationForReconnectWithAM);
+        coordinatorStreamStore, config);
   }
 
   @VisibleForTesting
   ContainerHeartbeatMonitor(Runnable onContainerExpired, 
ContainerHeartbeatClient containerHeartbeatClient,
       ScheduledExecutorService scheduler, String coordinatorUrl, String 
containerExecutionId,
-      MetadataStore coordinatorStreamStore, boolean 
isApplicationMasterHighAvailabilityEnabled,
-      long retryCount, long sleepDurationForReconnectWithAM) {
+      MetadataStore coordinatorStreamStore, Config config) {
     this.onContainerExpired = onContainerExpired;
     this.containerHeartbeatClient = containerHeartbeatClient;
     this.scheduler = scheduler;
     this.coordinatorUrl = coordinatorUrl;
     this.containerExecutionId = containerExecutionId;
     this.coordinatorStreamStore = coordinatorStreamStore;
-    this.isApplicationMasterHighAvailabilityEnabled = 
isApplicationMasterHighAvailabilityEnabled;
-    this.retryCount = retryCount;
-    this.sleepDurationForReconnectWithAM = sleepDurationForReconnectWithAM;
+
+    JobConfig jobConfig = new JobConfig(config);
+    this.isApplicationMasterHighAvailabilityEnabled = 
jobConfig.getApplicationMasterHighAvailabilityEnabled();
+    this.retryCount = jobConfig.getContainerHeartbeatRetryCount();
+    this.sleepDurationForReconnectWithAM = 
jobConfig.getContainerHeartbeatRetrySleepDurationMs();
+
+    initializeMetrics(config);
+  }
+
+  private void initializeMetrics(Config config) {

Review comment:
       nit: please put private methods below public 

##########
File path: 
samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -100,7 +123,7 @@ public void start() {
           } catch (Exception e) {
             // On exception in re-establish connection with new AM, force exit.
             LOG.error("Exception trying to connect with new AM", e);
-            forceExit("failure in establishing cconnection with new AM", 0);
+            forceExit("failure in establishing cconnection with new AM", 
SHUTDOWN_TIMOUT_MS);

Review comment:
       thank you for fixing this.
   in case of exception caught in `checkAndEstablishConnectionWithNewAM`, we 
should increment getHeartbeatEstablishedFailureCount right. 

##########
File path: 
samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
##########
@@ -177,22 +213,26 @@ public void testConnectToNewAMFailed() throws 
InterruptedException {
 
   @Test
   public void testConnectToNewAMSerdeException() throws InterruptedException {
-    String containerExecutionId = "0";
     String newCoordinatorUrl = "http://some-host-2.prod.linkedin.com";;
-    this.containerHeartbeatMonitor =
-        spy(new ContainerHeartbeatMonitor(this.onExpired, 
this.containerHeartbeatClient, this.scheduler, COORDINATOR_URL,
-            containerExecutionId, coordinatorStreamStore, true, 5, 10));
+    this.containerHeartbeatMonitor = spy(buildContainerHeartbeatMonitor(true));
     CoordinatorStreamValueSerde serde = new 
CoordinatorStreamValueSerde(SetConfig.TYPE);
+    ContainerHeartbeatMonitor.ContainerHeartbeatMetrics metrics = 
this.containerHeartbeatMonitor.getMetrics();
+
     
when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(FAILURE_RESPONSE);
-    
when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl,
 containerExecutionId)).thenReturn(this.containerHeartbeatClient);
-    
when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL)).thenThrow(new
 NullPointerException("serde failed"));
+    
when(this.containerHeartbeatMonitor.createContainerHeartbeatClient(newCoordinatorUrl,
 CONTAINER_EXECUTION_ID))
+        .thenReturn(this.containerHeartbeatClient);
+    
when(this.coordinatorStreamStore.get(CoordinationConstants.YARN_COORDINATOR_URL))
+        .thenThrow(new NullPointerException("serde failed"));
 
     this.containerHeartbeatMonitor.start();
     // wait for the executor to finish the heartbeat check task
-    boolean fixedRateTaskCompleted = 
this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    boolean fixedRateTaskCompleted = 
this.schedulerFixedRateExecutionLatch.await(10, TimeUnit.SECONDS);
+
     assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    assertEquals("Heartbeat expired count should be 1", 1, 
metrics.getHeartbeatExpiredCount().getCount());

Review comment:
       should check for getHeartbeatEstablishedFailureCount = 1

##########
File path: 
samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -91,6 +113,7 @@ public void start() {
     scheduler.scheduleAtFixedRate(() -> {
       ContainerHeartbeatResponse response = 
containerHeartbeatClient.requestHeartbeat();
       if (!response.isAlive()) {
+        metrics.incrementHeartbeatExpiredCount();

Review comment:
       this metric is emitted even when AM HA is not enabled, do we want to 
send expired but not established when feature is off?

##########
File path: docs/learn/documentation/versioned/container/metrics-table.html
##########
@@ -977,6 +983,57 @@ <h1>Samza Metrics Reference</h1>
         <td>single-barrier-rebalancing-time</td>
         <td><a href="#average-time">Average time</a> taken for all the 
processors to get the latest version of the job model after single processor 
change (without the occurence of a barrier timeout)</td>
     </tr>
+    <tr>
+        <th colspan="2" class="section" 
id="job-coordinator-metadata-manager-metrics">org.apache.samza.coordinator.JobCoordinatorMetadataManager.JobCoordinatorMetadataManagerMetrics<br><span
 style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following 
metrics are applicable when Application Master High Availability is 
enabled</span></th>
+    </tr>
+    <tr>
+        <td>application-attempt-count</td>
+        <td>Denotes the application attempt count within the scope of a single 
deployment</td>
+    </tr>
+    <tr>
+        <td>config-changed</td>
+        <td>Denotes configuration changed across attempts within the scope of 
a single deployment</td>
+    </tr>
+    <tr>
+        <td>job-model-changed</td>
+        <td>Denotes job model changed across attempts within the scope of a 
single deployment</td>
+    </tr>
+    <tr>
+        <td>metadata-read-failed-count</td>
+        <td>Read failure count from the underlying metadata store</td>
+    </tr>
+    <tr>
+        <td>metadata-write-failed-count</td>
+        <td>Write failure count to the underlying metadata store</td>
+    </tr>
+    <tr>
+        <td>metadata-generation-failed-count</td>
+        <td>Number of times the metadata generation failed</td>
+    </tr>
+    <tr>
+        <td>new-deployment</td>
+        <td>Denotes a new deployment due to changes in metadata either 
attributed to config or job model</td>
+    </tr>
+    </tbody>
+    <tr>
+        <th colspan="2" class="section" 
id="container-heartbeat-monitor-metrics">org.apache.samza.container.ContainerHeartbeatMonitor.ContainerHeartbeatMetrics<br><span
 style="font-weight: normal;margin-left:40px;"><b>Note</b>: The following 
metrics are applicable when Application Master High Availability is 
enabled</span></th>
+    </tr>
+    <tr>
+        <td>heartbeat-discovery-time-ms</td>
+        <td>Time taken in millis for the container to discover new AM in the 
event AM changes for heartbeat establishment</td>

Review comment:
       we might want to clarify that this time is set even when heartbeat 
returned is false. 

##########
File path: 
samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
##########
@@ -150,8 +183,17 @@ ContainerHeartbeatClient 
createContainerHeartbeatClient(String coordinatorUrl, S
     return new ContainerHeartbeatClient(coordinatorUrl, containerExecutionId);
   }
 
+  @VisibleForTesting
+  ContainerHeartbeatMetrics getMetrics() {
+    return metrics;
+  }
+
   private void forceExit(String message, int timeout) {
     scheduler.schedule(() -> {
+      if (started) {
+        reporters.values().forEach(MetricsReporter::stop);

Review comment:
       iiuc, this is executed when `container.shutdown` invoked in 
`onContainerExpired.run` does not complete in timeout ms right. I feel we 
should shutdown the metrics reporters in the path where 
`onContainerExpired.run` is executed also.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to