MabelYC commented on a change in pull request #1437:
URL: https://github.com/apache/samza/pull/1437#discussion_r528043358



##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
##########
@@ -55,6 +55,7 @@ public MetadataResourceUtil(JobModel jobModel, 
MetricsRegistry metricsRegistry,
   public void createResources() {
     if (checkpointManager != null) {
       checkpointManager.createResources();
+      checkpointManager.stop();

Review comment:
       I also thought we could create an admin client only to create resources. 
But i think it is not that good that then we would create two kafka 
adminClients for the same system in the same manager, and also create another 
thread. I think it would be better if we only create the clients we really need 
to create.
   I double checked the method is only called once. And advantages not to 
create it during construction?

##########
File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
##########
@@ -441,15 +441,24 @@ public void afterStop() {
 
     @Override
     public void afterFailure(Throwable t) {
-      processors.removeIf(pair -> pair.getLeft().equals(processor));
-
+      // we need to close associated coordinator metadata store, although the 
processor failed
+      processors.forEach(sp -> {
+        if (sp.getLeft().equals(processor)) {
+          if (sp.getRight() != null) {
+            sp.getRight().close();
+          }
+          processors.remove(sp);

Review comment:
       Thanks for pointing this. Changed to remove it after finishing iterating.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to