himanshug commented on a change in pull request #4550: Prioritized locking
URL: https://github.com/apache/incubator-druid/pull/4550#discussion_r331314968
 
 

 ##########
 File path: 
indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java
 ##########
 @@ -207,4 +258,206 @@ public void testSyncFromStorage() throws 
EntryExistsException
 
     Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage);
   }
+
+  @Test
+  public void testDoInCriticalSectionWithSharedLock() throws Exception
+  {
+    final Interval interval = Intervals.of("2017-01-01/2017-01-02");
+    final Task task = NoopTask.create();
+    lockbox.add(task);
+    Assert.assertTrue(lockbox.tryLock(TaskLockType.SHARED, task, 
interval).isOk());
+
+    Assert.assertFalse(
+        lockbox.doInCriticalSection(
+            task,
+            Collections.singletonList(interval),
+            CriticalAction.<Boolean>builder().onValidLocks(() -> 
true).onInvalidLocks(() -> false).build()
+        )
+    );
+  }
+
+  @Test
+  public void testDoInCriticalSectionWithExclusiveLock() throws Exception
+  {
+    final Interval interval = Intervals.of("2017-01-01/2017-01-02");
+    final Task task = NoopTask.create();
+    lockbox.add(task);
+    final TaskLock lock = lockbox.tryLock(TaskLockType.EXCLUSIVE, task, 
interval).getTaskLock();
+    Assert.assertNotNull(lock);
+
+    Assert.assertTrue(
+        lockbox.doInCriticalSection(
+            task,
+            Collections.singletonList(interval),
+            CriticalAction.<Boolean>builder().onValidLocks(() -> 
true).onInvalidLocks(() -> false).build()
+        )
+    );
+  }
+
+  @Test
+  public void testDoInCriticalSectionWithSmallerInterval() throws Exception
+  {
+    final Interval interval = Intervals.of("2017-01-01/2017-02-01");
+    final Interval smallInterval = Intervals.of("2017-01-10/2017-01-11");
+    final Task task = NoopTask.create();
+    lockbox.add(task);
+    final TaskLock lock = lockbox.tryLock(TaskLockType.EXCLUSIVE, task, 
interval).getTaskLock();
+    Assert.assertNotNull(lock);
+
+    Assert.assertTrue(
+        lockbox.doInCriticalSection(
+            task,
+            Collections.singletonList(smallInterval),
+            CriticalAction.<Boolean>builder().onValidLocks(() -> 
true).onInvalidLocks(() -> false).build()
+        )
+    );
+  }
+
+  @Test
+  public void testPreemptionAndDoInCriticalSection() throws Exception
+  {
+    final Interval interval = Intervals.of("2017-01-01/2017-01-02");
+    for (int i = 0; i < 5; i++) {
+      final Task task = NoopTask.create();
+      lockbox.add(task);
+      taskStorage.insert(task, TaskStatus.running(task.getId()));
+      Assert.assertTrue(lockbox.tryLock(TaskLockType.SHARED, task, 
interval).isOk());
+    }
+
+    final Task highPriorityTask = NoopTask.create(100);
+    lockbox.add(highPriorityTask);
+    taskStorage.insert(highPriorityTask, 
TaskStatus.running(highPriorityTask.getId()));
+    final TaskLock lock = lockbox.tryLock(TaskLockType.EXCLUSIVE, 
highPriorityTask, interval).getTaskLock();
+    Assert.assertNotNull(lock);
+
+    Assert.assertTrue(
+        lockbox.doInCriticalSection(
+            highPriorityTask,
+            Collections.singletonList(interval),
+            CriticalAction.<Boolean>builder().onValidLocks(() -> 
true).onInvalidLocks(() -> false).build()
+        )
+    );
+  }
+
+  @Test
+  public void testDoInCriticalSectionWithRevokedLock() throws Exception
+  {
+    final Interval interval = Intervals.of("2017-01-01/2017-01-02");
+    final Task lowPriorityTask = NoopTask.create("task1", 0);
+    final Task highPriorityTask = NoopTask.create("task2", 10);
+    lockbox.add(lowPriorityTask);
+    lockbox.add(highPriorityTask);
+    taskStorage.insert(lowPriorityTask, 
TaskStatus.running(lowPriorityTask.getId()));
+    taskStorage.insert(highPriorityTask, 
TaskStatus.running(highPriorityTask.getId()));
+
+    final TaskLock lowPriorityLock = lockbox.tryLock(TaskLockType.EXCLUSIVE, 
lowPriorityTask, interval).getTaskLock();
+    Assert.assertNotNull(lowPriorityLock);
+    Assert.assertTrue(lockbox.tryLock(TaskLockType.EXCLUSIVE, 
highPriorityTask, interval).isOk());
+    
Assert.assertTrue(Iterables.getOnlyElement(lockbox.findLocksForTask(lowPriorityTask)).isRevoked());
+
+    Assert.assertFalse(
+        lockbox.doInCriticalSection(
+            lowPriorityTask,
+            Collections.singletonList(interval),
+            CriticalAction.<Boolean>builder().onValidLocks(() -> 
true).onInvalidLocks(() -> false).build()
+        )
+    );
+  }
+
+  @Test(timeout = 5000L)
+  public void testAcquireLockAfterRevoked() throws EntryExistsException, 
InterruptedException
+  {
+    final Interval interval = Intervals.of("2017-01-01/2017-01-02");
+    final Task lowPriorityTask = NoopTask.create("task1", 0);
+    final Task highPriorityTask = NoopTask.create("task2", 10);
+    lockbox.add(lowPriorityTask);
+    lockbox.add(highPriorityTask);
+    taskStorage.insert(lowPriorityTask, 
TaskStatus.running(lowPriorityTask.getId()));
+    taskStorage.insert(highPriorityTask, 
TaskStatus.running(highPriorityTask.getId()));
+
+    final TaskLock lowPriorityLock = lockbox.lock(TaskLockType.EXCLUSIVE, 
lowPriorityTask, interval).getTaskLock();
+    Assert.assertNotNull(lowPriorityLock);
+    Assert.assertTrue(lockbox.tryLock(TaskLockType.EXCLUSIVE, 
highPriorityTask, interval).isOk());
+    
Assert.assertTrue(Iterables.getOnlyElement(lockbox.findLocksForTask(lowPriorityTask)).isRevoked());
+
+    lockbox.unlock(highPriorityTask, interval);
+
+    // Acquire again
+    final LockResult lockResult = lockbox.lock(TaskLockType.EXCLUSIVE, 
lowPriorityTask, interval);
+    Assert.assertFalse(lockResult.isOk());
 
 Review comment:
   I think current behavior makes sense for case ...
   1. low priority indexing task1 took lock , started running creating segments.
   2. high priority indexing task2 took lock, revoked above, created segments, 
published them
   3. at this point segments created by indexing task1 might be outdated and 
hence it should not be able to make progress.
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to