prateekm commented on a change in pull request #1437:
URL: https://github.com/apache/samza/pull/1437#discussion_r528027101



##########
File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
##########
@@ -441,15 +441,24 @@ public void afterStop() {
 
     @Override
     public void afterFailure(Throwable t) {
-      processors.removeIf(pair -> pair.getLeft().equals(processor));
-
+      // we need to close associated coordinator metadata store, although the 
processor failed
+      processors.forEach(sp -> {
+        if (sp.getLeft().equals(processor)) {
+          if (sp.getRight() != null) {
+            sp.getRight().close();
+          }
+          processors.remove(sp);

Review comment:
       Is it safe to modify (remove) the map here while iterating over it 
(foreach)?

##########
File path: 
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
##########
@@ -441,15 +441,24 @@ public void afterStop() {
 
     @Override
     public void afterFailure(Throwable t) {
-      processors.removeIf(pair -> pair.getLeft().equals(processor));
-
+      // we need to close associated coordinator metadata store, although the 
processor failed
+      processors.forEach(sp -> {

Review comment:
       Won't this cause the current processor to not be stopped?
   
   Do we also need to stop the metadata store in afterStop()?

##########
File path: 
samza-core/src/main/java/org/apache/samza/coordinator/MetadataResourceUtil.java
##########
@@ -55,6 +55,7 @@ public MetadataResourceUtil(JobModel jobModel, 
MetricsRegistry metricsRegistry,
   public void createResources() {
     if (checkpointManager != null) {
       checkpointManager.createResources();
+      checkpointManager.stop();

Review comment:
       Do we know for sure that this method is only called once? If so, do we 
need to create the checkpoint manager during construction, or can we create it 
and close it within createResources?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to