This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f5b5cb93ea Fix expiry timeout bug in LocalIntermediateDataManager 
(#12722)
f5b5cb93ea is described below

commit f5b5cb93ead1cb3bbd7a525445be2461c977c52c
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Jul 1 16:29:22 2022 +0530

    Fix expiry timeout bug in LocalIntermediateDataManager (#12722)
    
    The expiry timeout is compared against the current time but the condition 
is reversed.
    This means that as soon as a supervisor task finishes, its partitions are 
cleaned up,
    irrespective of the specified `intermediaryPartitionTimeout` period.
    
    After these changes, the `intermediaryPartitionTimeout` will start getting 
honored.
    
    Changes
    * Fix the condition
    * Add tests to verify the new correct behaviour
    * Reduce the default expiry timeout from P1D to PT5M
       to retain current behaviour in case of default configs.
---
 .../shuffle/LocalIntermediaryDataManager.java      |   9 +-
 ...ocalIntermediaryDataManagerAutoCleanupTest.java | 120 +++++++++++++++------
 .../druid/indexing/worker/config/WorkerConfig.java |   2 +-
 3 files changed, 90 insertions(+), 41 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
index 045e8dfb04..d629773bfc 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
@@ -151,7 +151,7 @@ public class LocalIntermediaryDataManager implements 
IntermediaryDataManager
     supervisorTaskChecker.scheduleAtFixedRate(
         () -> {
           try {
-            deleteExpiredSuprevisorTaskPartitionsIfNotRunning();
+            deleteExpiredSupervisorTaskPartitionsIfNotRunning();
           }
           catch (InterruptedException e) {
             LOG.error(e, "Error while cleaning up partitions for expired 
supervisors");
@@ -236,14 +236,13 @@ public class LocalIntermediaryDataManager implements 
IntermediaryDataManager
    * Note that the overlord sends a cleanup request when a supervisorTask is 
finished. The below check is to trigger
    * the self-cleanup for when the cleanup request is missing.
    */
-  private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws 
InterruptedException
+  private void deleteExpiredSupervisorTaskPartitionsIfNotRunning() throws 
InterruptedException
   {
-    final DateTime now = DateTimes.nowUtc();
     final Set<String> expiredSupervisorTasks = new HashSet<>();
     for (Entry<String, DateTime> entry : supervisorTaskCheckTimes.entrySet()) {
       final String supervisorTaskId = entry.getKey();
       final DateTime checkTime = entry.getValue();
-      if (checkTime.isAfter(now)) {
+      if (checkTime.isBeforeNow()) {
         expiredSupervisorTasks.add(supervisorTaskId);
       }
     }
@@ -318,7 +317,7 @@ public class LocalIntermediaryDataManager implements 
IntermediaryDataManager
     try (final Closer resourceCloser = closer) {
       FileUtils.mkdirp(taskTempDir);
 
-      // Tempary compressed file. Will be removed when taskTempDir is deleted.
+      // Temporary compressed file. Will be removed when taskTempDir is 
deleted.
       final File tempZippedFile = new File(taskTempDir, 
segment.getId().toString());
       final long unzippedSizeBytes = CompressionUtils.zip(segmentDir, 
tempZippedFile);
       if (unzippedSizeBytes == 0) {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java
index dcb214484d..6120a88442 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java
@@ -37,7 +37,6 @@ import org.apache.druid.timeline.partition.ShardSpec;
 import org.apache.druid.timeline.partition.ShardSpecLookup;
 import org.joda.time.Interval;
 import org.joda.time.Period;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
@@ -57,33 +56,13 @@ public class LocalIntermediaryDataManagerAutoCleanupTest
   @Rule
   public TemporaryFolder tempDir = new TemporaryFolder();
 
-  private LocalIntermediaryDataManager intermediaryDataManager;
+  private TaskConfig taskConfig;
+  private IndexingServiceClient indexingServiceClient;
 
   @Before
   public void setup() throws IOException
   {
-    final WorkerConfig workerConfig = new WorkerConfig()
-    {
-      @Override
-      public long getIntermediaryPartitionDiscoveryPeriodSec()
-      {
-        return 1;
-      }
-
-      @Override
-      public long getIntermediaryPartitionCleanupPeriodSec()
-      {
-        return 2;
-      }
-
-      @Override
-      public Period getIntermediaryPartitionTimeout()
-      {
-        return new Period("PT2S");
-      }
-
-    };
-    final TaskConfig taskConfig = new TaskConfig(
+    this.taskConfig = new TaskConfig(
         null,
         null,
         null,
@@ -98,40 +77,79 @@ public class LocalIntermediaryDataManagerAutoCleanupTest
         TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
         null
     );
-    final IndexingServiceClient indexingServiceClient = new 
NoopIndexingServiceClient()
+    this.indexingServiceClient = new NoopIndexingServiceClient()
     {
       @Override
       public Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds)
       {
         final Map<String, TaskStatus> result = new HashMap<>();
         for (String taskId : taskIds) {
-          result.put(taskId, new TaskStatus(taskId, TaskState.SUCCESS, 10));
+          TaskState state = taskId.startsWith("running_") ? TaskState.RUNNING 
: TaskState.SUCCESS;
+          result.put(taskId, new TaskStatus(taskId, state, 10));
         }
         return result;
       }
     };
-    intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig, 
taskConfig, indexingServiceClient);
-    intermediaryDataManager.start();
   }
 
-  @After
-  public void teardown()
+  @Test
+  public void testCompletedExpiredSupervisor() throws IOException, 
InterruptedException
   {
-    intermediaryDataManager.stop();
+    Assert.assertTrue(
+        isCleanedUpAfter2s("supervisor_1", new Period("PT1S"))
+    );
   }
 
   @Test
-  public void testCleanup() throws IOException, InterruptedException
+  public void testCompletedNotExpiredSupervisor() throws IOException, 
InterruptedException
+  {
+    Assert.assertFalse(
+        isCleanedUpAfter2s("supervisor_2", new Period("PT10S"))
+    );
+  }
+
+  @Test
+  public void testRunningSupervisor() throws IOException, InterruptedException
+  {
+    Assert.assertFalse(
+        isCleanedUpAfter2s("running_supervisor_1", new Period("PT1S"))
+    );
+  }
+
+  /**
+   * Creates a LocalIntermediaryDataManager and adds a segment to it.
+   * Also checks the cleanup status after 2s.
+   *
+   * @return true if the cleanup has happened after 2s, false otherwise.
+   */
+  private boolean isCleanedUpAfter2s(String supervisorTaskId, Period 
timeoutPeriod)
+      throws IOException, InterruptedException
   {
-    final String supervisorTaskId = "supervisorTaskId";
     final String subTaskId = "subTaskId";
     final Interval interval = Intervals.of("2018/2019");
     final File segmentFile = generateSegmentDir("test");
     final DataSegment segment = newSegment(interval);
+
+    // Setup data manager with expiry timeout 1s
+    WorkerConfig workerConfig = new TestWorkerConfig(1, 1, timeoutPeriod);
+    LocalIntermediaryDataManager intermediaryDataManager =
+        new LocalIntermediaryDataManager(workerConfig, taskConfig, 
indexingServiceClient);
     intermediaryDataManager.addSegment(supervisorTaskId, subTaskId, segment, 
segmentFile);
 
-    Thread.sleep(3000);
-    
Assert.assertFalse(intermediaryDataManager.findPartitionFile(supervisorTaskId, 
subTaskId, interval, 0).isPresent());
+    intermediaryDataManager
+        .findPartitionFile(supervisorTaskId, subTaskId, interval, 0);
+
+    // Start the data manager and the cleanup cycle
+    intermediaryDataManager.start();
+
+    // Check the state of the partition after 2s
+    Thread.sleep(2000);
+    boolean partitionFileExists = intermediaryDataManager
+        .findPartitionFile(supervisorTaskId, subTaskId, interval, 0)
+        .isPresent();
+
+    intermediaryDataManager.stop();
+    return !partitionFileExists;
   }
 
   private File generateSegmentDir(String fileName) throws IOException
@@ -178,4 +196,36 @@ public class LocalIntermediaryDataManagerAutoCleanupTest
       throw new UnsupportedOperationException();
     }
   }
+
+  private static class TestWorkerConfig extends WorkerConfig
+  {
+    private final long cleanupPeriodSeconds;
+    private final long discoveryPeriodSeconds;
+    private final Period timeoutPeriod;
+
+    private TestWorkerConfig(long cleanupPeriodSeconds, long 
discoveryPeriodSeconds, Period timeoutPeriod)
+    {
+      this.cleanupPeriodSeconds = cleanupPeriodSeconds;
+      this.discoveryPeriodSeconds = discoveryPeriodSeconds;
+      this.timeoutPeriod = timeoutPeriod;
+    }
+
+    @Override
+    public long getIntermediaryPartitionCleanupPeriodSec()
+    {
+      return cleanupPeriodSeconds;
+    }
+
+    @Override
+    public long getIntermediaryPartitionDiscoveryPeriodSec()
+    {
+      return discoveryPeriodSeconds;
+    }
+
+    @Override
+    public Period getIntermediaryPartitionTimeout()
+    {
+      return timeoutPeriod;
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
 
b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
index 607cf51e6c..d8a3701e94 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
@@ -53,7 +53,7 @@ public class WorkerConfig
   private long intermediaryPartitionCleanupPeriodSec = 300L;
 
   @JsonProperty
-  private Period intermediaryPartitionTimeout = new Period("P1D");
+  private Period intermediaryPartitionTimeout = new Period("PT5M");
 
   @JsonProperty
   private final long globalIngestionHeapLimitBytes = 
Runtime.getRuntime().maxMemory() / 6;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to