This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f5b5cb93ea Fix expiry timeout bug in LocalIntermediateDataManager
(#12722)
f5b5cb93ea is described below
commit f5b5cb93ead1cb3bbd7a525445be2461c977c52c
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Jul 1 16:29:22 2022 +0530
Fix expiry timeout bug in LocalIntermediateDataManager (#12722)
The expiry timeout is compared against the current time but the condition
is reversed.
This means that as soon as a supervisor task finishes, its partitions are
cleaned up,
irrespective of the specified `intermediaryPartitionTimeout` period.
After these changes, the `intermediaryPartitionTimeout` will start getting
honored.
Changes
* Fix the condition
* Add tests to verify the new correct behaviour
* Reduce the default expiry timeout from P1D to PT5M
to retain current behaviour in case of default configs.
---
.../shuffle/LocalIntermediaryDataManager.java | 9 +-
...ocalIntermediaryDataManagerAutoCleanupTest.java | 120 +++++++++++++++------
.../druid/indexing/worker/config/WorkerConfig.java | 2 +-
3 files changed, 90 insertions(+), 41 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
index 045e8dfb04..d629773bfc 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManager.java
@@ -151,7 +151,7 @@ public class LocalIntermediaryDataManager implements
IntermediaryDataManager
supervisorTaskChecker.scheduleAtFixedRate(
() -> {
try {
- deleteExpiredSuprevisorTaskPartitionsIfNotRunning();
+ deleteExpiredSupervisorTaskPartitionsIfNotRunning();
}
catch (InterruptedException e) {
LOG.error(e, "Error while cleaning up partitions for expired
supervisors");
@@ -236,14 +236,13 @@ public class LocalIntermediaryDataManager implements
IntermediaryDataManager
* Note that the overlord sends a cleanup request when a supervisorTask is
finished. The below check is to trigger
* the self-cleanup for when the cleanup request is missing.
*/
- private void deleteExpiredSuprevisorTaskPartitionsIfNotRunning() throws
InterruptedException
+ private void deleteExpiredSupervisorTaskPartitionsIfNotRunning() throws
InterruptedException
{
- final DateTime now = DateTimes.nowUtc();
final Set<String> expiredSupervisorTasks = new HashSet<>();
for (Entry<String, DateTime> entry : supervisorTaskCheckTimes.entrySet()) {
final String supervisorTaskId = entry.getKey();
final DateTime checkTime = entry.getValue();
- if (checkTime.isAfter(now)) {
+ if (checkTime.isBeforeNow()) {
expiredSupervisorTasks.add(supervisorTaskId);
}
}
@@ -318,7 +317,7 @@ public class LocalIntermediaryDataManager implements
IntermediaryDataManager
try (final Closer resourceCloser = closer) {
FileUtils.mkdirp(taskTempDir);
- // Tempary compressed file. Will be removed when taskTempDir is deleted.
+ // Temporary compressed file. Will be removed when taskTempDir is
deleted.
final File tempZippedFile = new File(taskTempDir,
segment.getId().toString());
final long unzippedSizeBytes = CompressionUtils.zip(segmentDir,
tempZippedFile);
if (unzippedSizeBytes == 0) {
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java
index dcb214484d..6120a88442 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/LocalIntermediaryDataManagerAutoCleanupTest.java
@@ -37,7 +37,6 @@ import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.ShardSpecLookup;
import org.joda.time.Interval;
import org.joda.time.Period;
-import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -57,33 +56,13 @@ public class LocalIntermediaryDataManagerAutoCleanupTest
@Rule
public TemporaryFolder tempDir = new TemporaryFolder();
- private LocalIntermediaryDataManager intermediaryDataManager;
+ private TaskConfig taskConfig;
+ private IndexingServiceClient indexingServiceClient;
@Before
public void setup() throws IOException
{
- final WorkerConfig workerConfig = new WorkerConfig()
- {
- @Override
- public long getIntermediaryPartitionDiscoveryPeriodSec()
- {
- return 1;
- }
-
- @Override
- public long getIntermediaryPartitionCleanupPeriodSec()
- {
- return 2;
- }
-
- @Override
- public Period getIntermediaryPartitionTimeout()
- {
- return new Period("PT2S");
- }
-
- };
- final TaskConfig taskConfig = new TaskConfig(
+ this.taskConfig = new TaskConfig(
null,
null,
null,
@@ -98,40 +77,79 @@ public class LocalIntermediaryDataManagerAutoCleanupTest
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name(),
null
);
- final IndexingServiceClient indexingServiceClient = new
NoopIndexingServiceClient()
+ this.indexingServiceClient = new NoopIndexingServiceClient()
{
@Override
public Map<String, TaskStatus> getTaskStatuses(Set<String> taskIds)
{
final Map<String, TaskStatus> result = new HashMap<>();
for (String taskId : taskIds) {
- result.put(taskId, new TaskStatus(taskId, TaskState.SUCCESS, 10));
+ TaskState state = taskId.startsWith("running_") ? TaskState.RUNNING
: TaskState.SUCCESS;
+ result.put(taskId, new TaskStatus(taskId, state, 10));
}
return result;
}
};
- intermediaryDataManager = new LocalIntermediaryDataManager(workerConfig,
taskConfig, indexingServiceClient);
- intermediaryDataManager.start();
}
- @After
- public void teardown()
+ @Test
+ public void testCompletedExpiredSupervisor() throws IOException,
InterruptedException
{
- intermediaryDataManager.stop();
+ Assert.assertTrue(
+ isCleanedUpAfter2s("supervisor_1", new Period("PT1S"))
+ );
}
@Test
- public void testCleanup() throws IOException, InterruptedException
+ public void testCompletedNotExpiredSupervisor() throws IOException,
InterruptedException
+ {
+ Assert.assertFalse(
+ isCleanedUpAfter2s("supervisor_2", new Period("PT10S"))
+ );
+ }
+
+ @Test
+ public void testRunningSupervisor() throws IOException, InterruptedException
+ {
+ Assert.assertFalse(
+ isCleanedUpAfter2s("running_supervisor_1", new Period("PT1S"))
+ );
+ }
+
+ /**
+ * Creates a LocalIntermediaryDataManager and adds a segment to it.
+ * Also checks the cleanup status after 2s.
+ *
+ * @return true if the cleanup has happened after 2s, false otherwise.
+ */
+ private boolean isCleanedUpAfter2s(String supervisorTaskId, Period
timeoutPeriod)
+ throws IOException, InterruptedException
{
- final String supervisorTaskId = "supervisorTaskId";
final String subTaskId = "subTaskId";
final Interval interval = Intervals.of("2018/2019");
final File segmentFile = generateSegmentDir("test");
final DataSegment segment = newSegment(interval);
+
+ // Setup data manager with expiry timeout 1s
+ WorkerConfig workerConfig = new TestWorkerConfig(1, 1, timeoutPeriod);
+ LocalIntermediaryDataManager intermediaryDataManager =
+ new LocalIntermediaryDataManager(workerConfig, taskConfig,
indexingServiceClient);
intermediaryDataManager.addSegment(supervisorTaskId, subTaskId, segment,
segmentFile);
- Thread.sleep(3000);
-
Assert.assertFalse(intermediaryDataManager.findPartitionFile(supervisorTaskId,
subTaskId, interval, 0).isPresent());
+ intermediaryDataManager
+ .findPartitionFile(supervisorTaskId, subTaskId, interval, 0);
+
+ // Start the data manager and the cleanup cycle
+ intermediaryDataManager.start();
+
+ // Check the state of the partition after 2s
+ Thread.sleep(2000);
+ boolean partitionFileExists = intermediaryDataManager
+ .findPartitionFile(supervisorTaskId, subTaskId, interval, 0)
+ .isPresent();
+
+ intermediaryDataManager.stop();
+ return !partitionFileExists;
}
private File generateSegmentDir(String fileName) throws IOException
@@ -178,4 +196,36 @@ public class LocalIntermediaryDataManagerAutoCleanupTest
throw new UnsupportedOperationException();
}
}
+
+ private static class TestWorkerConfig extends WorkerConfig
+ {
+ private final long cleanupPeriodSeconds;
+ private final long discoveryPeriodSeconds;
+ private final Period timeoutPeriod;
+
+ private TestWorkerConfig(long cleanupPeriodSeconds, long
discoveryPeriodSeconds, Period timeoutPeriod)
+ {
+ this.cleanupPeriodSeconds = cleanupPeriodSeconds;
+ this.discoveryPeriodSeconds = discoveryPeriodSeconds;
+ this.timeoutPeriod = timeoutPeriod;
+ }
+
+ @Override
+ public long getIntermediaryPartitionCleanupPeriodSec()
+ {
+ return cleanupPeriodSeconds;
+ }
+
+ @Override
+ public long getIntermediaryPartitionDiscoveryPeriodSec()
+ {
+ return discoveryPeriodSeconds;
+ }
+
+ @Override
+ public Period getIntermediaryPartitionTimeout()
+ {
+ return timeoutPeriod;
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
index 607cf51e6c..d8a3701e94 100644
---
a/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
+++
b/server/src/main/java/org/apache/druid/indexing/worker/config/WorkerConfig.java
@@ -53,7 +53,7 @@ public class WorkerConfig
private long intermediaryPartitionCleanupPeriodSec = 300L;
@JsonProperty
- private Period intermediaryPartitionTimeout = new Period("P1D");
+ private Period intermediaryPartitionTimeout = new Period("PT5M");
@JsonProperty
private final long globalIngestionHeapLimitBytes =
Runtime.getRuntime().maxMemory() / 6;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]