jon-wei commented on a change in pull request #7212: Support Kafka supervisor adopting running tasks between versions URL: https://github.com/apache/incubator-druid/pull/7212#discussion_r274225465
########## File path: extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java ########## @@ -2834,6 +2822,286 @@ public void testGetCurrentTotalStats() Assert.assertEquals(ImmutableMap.of("task2", ImmutableMap.of("prop2", "val2")), stats.get("1")); } + @Test + public void testDoNotKillCompatibleTasks() + throws Exception + { + // This supervisor always returns true for isTaskCurrent -> it should not kill its tasks + int numReplicas = 2; + supervisor = getTestableSupervisorCustomIsTaskCurrent( + numReplicas, + 1, + true, + "PT1H", + new Period("P1D"), + new Period("P1D"), + false, + true + ); + + addSomeEvents(1); + + Task task = createKafkaIndexTask( + "id1", + DATASOURCE, + 0, + new SeekableStreamStartSequenceNumbers<>( + "topic", + ImmutableMap.of(0, 0L, 2, 0L), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>( + "topic", + ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE) + ), + null, + null + ); + + List<Task> existingTasks = ImmutableList.of(task); + + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); + EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes(); + EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) + .andReturn(Futures.immediateFuture(Status.NOT_STARTED)) + .anyTimes(); + EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) + .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) + .anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + null + ) + ).anyTimes(); + + taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); + EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true); + + TreeMap<Integer, Map<Integer, Long>> checkpoints1 = new TreeMap<>(); + checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); + + EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(numReplicas); + + replayAll(); + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + } + + @Test + public void testKillIncompatibleTasks() + throws Exception + { + // This supervisor always returns false for isTaskCurrent -> it should kill its tasks + int numReplicas = 2; + supervisor = getTestableSupervisorCustomIsTaskCurrent( + numReplicas, + 1, + true, + "PT1H", + new Period("P1D"), + new Period("P1D"), + false, + false + ); + + addSomeEvents(1); + + Task task = createKafkaIndexTask( + "id1", + DATASOURCE, + 0, + new SeekableStreamStartSequenceNumbers<>( + "topic", + ImmutableMap.of(0, 0L, 2, 0L), + ImmutableSet.of() + ), + new SeekableStreamEndSequenceNumbers<>("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + List<Task> existingTasks = ImmutableList.of(task); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + EasyMock.expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasks()).andReturn(existingTasks).anyTimes(); + EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(task)).anyTimes(); + EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) + .andReturn(Futures.immediateFuture(Status.NOT_STARTED)) + .anyTimes(); + EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) + .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) + .anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + null + ) + ).anyTimes(); + EasyMock.expect(taskClient.stopAsync("id1", false)).andReturn(Futures.immediateFuture(true)); + taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); + EasyMock.expect(taskQueue.add(EasyMock.anyObject(Task.class))).andReturn(true).times(2); + + replayAll(); + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + } + + @Test + public void testIsTaskCurrent() Review comment: Ah, nevermind, the existing serde and isTaskCurrent tests are enough. I was thinking of the wrong place for inserting the modified config (the taskStorage on the overlord should always return tasks with the new class definition) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org