jon-wei commented on a change in pull request #7212: Support Kafka supervisor 
adopting running tasks between versions 
URL: https://github.com/apache/incubator-druid/pull/7212#discussion_r274225465
 
 

 ##########
 File path: 
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 ##########
 @@ -2834,6 +2822,286 @@ public void testGetCurrentTotalStats()
     Assert.assertEquals(ImmutableMap.of("task2", ImmutableMap.of("prop2", 
"val2")), stats.get("1"));
   }
 
+  @Test
+  public void testDoNotKillCompatibleTasks()
+      throws Exception
+  {
+    // This supervisor always returns true for isTaskCurrent -> it should not 
kill its tasks
+    int numReplicas = 2;
+    supervisor = getTestableSupervisorCustomIsTaskCurrent(
+        numReplicas,
+        1,
+        true,
+        "PT1H",
+        new Period("P1D"),
+        new Period("P1D"),
+        false,
+        true
+    );
+
+    addSomeEvents(1);
+
+    Task task = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, 0L, 2, 0L),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)
+        ),
+        null,
+        null
+    );
+
+    List<Task> existingTasks = ImmutableList.of(task);
+
+    
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
+    
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes();
+    
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes();
+    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
+            .andReturn(Futures.immediateFuture(Status.NOT_STARTED))
+            .anyTimes();
+    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
+            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+            .anyTimes();
+    
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            null
+        )
+    ).anyTimes();
+
+    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
+    
EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true);
+
+    TreeMap<Integer, Map<Integer, Long>> checkpoints1 = new TreeMap<>();
+    checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L));
+
+    EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(checkpoints1))
+            .times(numReplicas);
+
+    replayAll();
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+  }
+
+  @Test
+  public void testKillIncompatibleTasks()
+      throws Exception
+  {
+    // This supervisor always returns false for isTaskCurrent -> it should 
kill its tasks
+    int numReplicas = 2;
+    supervisor = getTestableSupervisorCustomIsTaskCurrent(
+        numReplicas,
+        1,
+        true,
+        "PT1H",
+        new Period("P1D"),
+        new Period("P1D"),
+        false,
+        false
+    );
+
+    addSomeEvents(1);
+
+    Task task = createKafkaIndexTask(
+        "id1",
+        DATASOURCE,
+        0,
+        new SeekableStreamStartSequenceNumbers<>(
+            "topic",
+            ImmutableMap.of(0, 0L, 2, 0L),
+            ImmutableSet.of()
+        ),
+        new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, 
Long.MAX_VALUE, 2, Long.MAX_VALUE)),
+        null,
+        null
+    );
+
+    List<Task> existingTasks = ImmutableList.of(task);
+    
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+    
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes();
+    
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes();
+    
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes();
+    EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString()))
+            .andReturn(Futures.immediateFuture(Status.NOT_STARTED))
+            .anyTimes();
+    EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString()))
+            .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
+            .anyTimes();
+    
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
+        new KafkaDataSourceMetadata(
+            null
+        )
+    ).anyTimes();
+    EasyMock.expect(taskClient.stopAsync("id1", 
false)).andReturn(Futures.immediateFuture(true));
+    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), 
EasyMock.anyObject(Executor.class));
+    
EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true).times(2);
+
+    replayAll();
+    supervisor.start();
+    supervisor.runInternal();
+    verifyAll();
+  }
+
+  @Test
+  public void testIsTaskCurrent()
 
 Review comment:
   Ah, nevermind, the existing serde and isTaskCurrent tests are enough. I was 
thinking of the wrong place for inserting the modified config (the taskStorage 
on the overlord should always return tasks with the new class definition)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to