mynameborat commented on a change in pull request #1553:
URL: https://github.com/apache/samza/pull/1553#discussion_r751481543
##########
File path:
samza-core/src/main/java/org/apache/samza/coordinator/staticresource/StaticResourceJobCoordinator.java
##########
@@ -234,13 +262,33 @@ private void prepareWorkerExecution(JobModel jobModel,
JobCoordinatorMetadata ne
}
}
+ /**
+ * Wrapper around {@link MetadataResourceUtil} constructor so it can be
stubbed during testing.
+ */
@VisibleForTesting
MetadataResourceUtil metadataResourceUtil(JobModel jobModel) {
return new MetadataResourceUtil(jobModel, this.metrics, this.config);
}
+ /**
+ * Wrapper around {@link DiagnosticsUtil#buildDiagnosticsManager} so it can
be stubbed during testing.
+ */
+ @VisibleForTesting
+ Optional<DiagnosticsManager> buildDiagnosticsManager(String jobName,
+ String jobId, JobModel jobModel, String containerId, Optional<String>
execEnvContainerId, Config config) {
+ return DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel,
containerId, execEnvContainerId, config);
+ }
+
private Set<JobMetadataChange>
checkForMetadataChanges(JobCoordinatorMetadata newMetadata) {
JobCoordinatorMetadata previousMetadata =
this.jobCoordinatorMetadataManager.readJobCoordinatorMetadata();
return
this.jobCoordinatorMetadataManager.checkForMetadataChanges(newMetadata,
previousMetadata);
}
+
+ private static void quietlyStop(DiagnosticsManager diagnosticsManager) {
+ try {
+ diagnosticsManager.stop();
Review comment:
Since you are making some changes related to diagnostics manager, I
noticed the following within the `stop()` method
```
scheduler.shutdown();
// Allow any scheduled publishes to finish, and block for termination
scheduler.awaitTermination(terminationDuration.toMillis(),
TimeUnit.MILLISECONDS);
if (!scheduler.isTerminated()) {
LOG.warn("Unable to terminate scheduler");
scheduler.shutdownNow();
}
this.systemProducer.stop();
```
It seems like the `systemProducer` can linger in the event Interruption
occurs due to `awaitTermination`. Would you mind fixing this as well to ensure
`systemProducer.stop()` is invoked as part of finally?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]