This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new f8bfe87  SAMZA-2464: Container shuts down when task fails to remove 
old state checkpoint dirs (#1283)
f8bfe87 is described below

commit f8bfe875e9866d350857cf99e5c2b0ddaf7d8ac1
Author: bkonold <[email protected]>
AuthorDate: Tue Mar 3 14:22:29 2020 -0800

    SAMZA-2464: Container shuts down when task fails to remove old state 
checkpoint dirs (#1283)
---
 .../src/main/scala/org/apache/samza/container/TaskInstance.scala    | 6 +++++-
 .../test/scala/org/apache/samza/container/TestTaskInstance.scala    | 6 ++----
 2 files changed, 7 insertions(+), 5 deletions(-)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 2a4f1d6..37aaeff 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -274,7 +274,11 @@ class TaskInstance(
 
     if (storageManager != null) {
       trace("Remove old checkpoint stores for taskName: %s" format taskName)
-      storageManager.removeOldCheckpoints(checkpointId)
+      try {
+        storageManager.removeOldCheckpoints(checkpointId)
+      } catch {
+        case e: Exception => error("Failed to remove old checkpoints for task: 
%s. Current checkpointId: %s" format (taskName, checkpointId), e)
+      }
     }
 
     if (inputCheckpoint != null) {
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index a54ae72..90f1b58 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -336,7 +336,7 @@ class TestTaskInstance extends AssertionsForJUnit with 
MockitoSugar {
   }
 
   @Test
-  def testCommitFailsIfErrorClearingOldCheckpoints() { // required for 
transactional state
+  def testCommitContinuesIfErrorClearingOldCheckpoints() { // required for 
transactional state
     val commitsCounter = mock[Counter]
     when(this.metrics.commits).thenReturn(commitsCounter)
 
@@ -352,10 +352,8 @@ class TestTaskInstance extends AssertionsForJUnit with 
MockitoSugar {
     } catch {
       case e: SamzaException =>
         // exception is expected, container should fail if could not get 
changelog offsets.
-        return
+        fail("Exception from removeOldCheckpoints should have been caught")
     }
-
-    fail("Should have failed commit if error getting newest changelog offests")
   }
 
   /**

Reply via email to