This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d4e972e1e4b Add checking for new checkpoint (#14353)
d4e972e1e4b is described below

commit d4e972e1e4b798d7e057e910deea4d192fe57952
Author: panhongan <[email protected]>
AuthorDate: Mon Sep 4 15:48:55 2023 +0800

    Add checking for new checkpoint (#14353)
    
    Check that a checkpoint is non-empty before adding it to the checkpoint 
sequence
    in a SeekableStreamSupervisor
---
 .../supervisor/SeekableStreamSupervisor.java       |   9 +-
 .../SeekableStreamSupervisorStateTest.java         | 263 +++++++++++++++++++++
 2 files changed, 270 insertions(+), 2 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index f79785b8237..2f6cb008b84 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -40,6 +40,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.data.input.impl.ByteEntity;
 import org.apache.druid.error.DruidException;
@@ -691,8 +692,12 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           return;
         }
         final Map<PartitionIdType, SequenceOffsetType> newCheckpoint = 
checkpointTaskGroup(taskGroup, false).get();
-        taskGroup.addNewCheckpoint(newCheckpoint);
-        log.info("Handled checkpoint notice, new checkpoint is [%s] for 
taskGroup [%s]", newCheckpoint, taskGroupId);
+        if (MapUtils.isNotEmpty(newCheckpoint)) {
+          taskGroup.addNewCheckpoint(newCheckpoint);
+          log.info("Handled checkpoint notice, new checkpoint is [%s] for 
taskGroup [%s]", newCheckpoint, taskGroupId);
+        } else {
+          log.warn("New checkpoint is null for taskGroup [%s]", taskGroupId);
+        }
       }
     }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 7e8afdf6bc5..819a6baacd8 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -19,12 +19,16 @@
 
 package org.apache.druid.indexing.seekablestream.supervisor;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import org.apache.druid.data.input.impl.ByteEntity;
 import org.apache.druid.data.input.impl.DimensionSchema;
@@ -34,6 +38,8 @@ import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.indexer.TaskLocation;
+import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.TestUtils;
 import org.apache.druid.indexing.common.task.Task;
@@ -44,6 +50,7 @@ import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskQueue;
 import org.apache.druid.indexing.overlord.TaskRunner;
 import org.apache.druid.indexing.overlord.TaskRunnerListener;
+import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
 import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState;
@@ -57,6 +64,7 @@ import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFac
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
 import 
org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
 import 
org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
@@ -98,6 +106,7 @@ import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -771,6 +780,169 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     verifyAll();
   }
 
+  @Test(timeout = 60_000L)
+  public void testCheckpointForActiveTaskGroup() throws InterruptedException, 
JsonProcessingException
+  {
+    DateTime startTime = DateTimes.nowUtc();
+    SeekableStreamSupervisorIOConfig ioConfig = new 
SeekableStreamSupervisorIOConfig(
+            STREAM,
+            new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false, false, false),
+            1,
+            1,
+            new Period("PT1H"),
+            new Period("PT1S"),
+            new Period("PT30S"),
+            false,
+            new Period("PT30M"),
+            null,
+            null,
+            null,
+            null,
+            new IdleConfig(true, 200L),
+            null
+    ) {};
+
+    EasyMock.reset(spec);
+    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
+    
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
+    EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
+    
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
+    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+    EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new 
DruidMonitorSchedulerConfig() {
+      @Override
+      public Duration getEmissionDuration()
+      {
+        return new Period("PT2S").toStandardDuration();
+      }
+    }).anyTimes();
+    EasyMock.expect(spec.getType()).andReturn("stream").anyTimes();
+    
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+    EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes();
+    
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
+    
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
+
+    SeekableStreamIndexTaskIOConfig taskIoConfig = createTaskIoConfigExt(
+            0,
+            Collections.singletonMap("0", "10"),
+            Collections.singletonMap("0", "20"),
+            "test",
+            startTime,
+            null,
+            Collections.emptySet(),
+            ioConfig
+    );
+
+    SeekableStreamIndexTaskTuningConfig taskTuningConfig = 
getTuningConfig().convertToTaskTuningConfig();
+
+    TreeMap<Integer, Map<String, Long>> sequenceOffsets = new TreeMap<>();
+    sequenceOffsets.put(0, ImmutableMap.of("0", 10L, "1", 20L));
+
+    Map<String, Object> context = new HashMap<>();
+    context.put("checkpoints", new 
ObjectMapper().writeValueAsString(sequenceOffsets));
+
+    SeekableStreamIndexTask id1 = new TestSeekableStreamIndexTask(
+            "id1",
+            null,
+            getDataSchema(),
+            taskTuningConfig,
+            taskIoConfig,
+            context,
+            "0"
+    );
+
+    SeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
+            "id2",
+            null,
+            getDataSchema(),
+            taskTuningConfig,
+            taskIoConfig,
+            context,
+            "0"
+    );
+
+    final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
+    final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
+
+    Collection workItems = new ArrayList<>();
+    workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
+    workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
+
+    
EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
+    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
+            .andReturn(ImmutableList.of(id1, id2))
+            .anyTimes();
+    
EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
+    
EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
+    
EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
+    
EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
+
+    EasyMock.reset(indexerMetadataStorageCoordinator);
+    EasyMock.expect(
+            
indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(new
 TestSeekableStreamDataSourceMetadata(null)
+    ).anyTimes();
+    
EasyMock.expect(indexTaskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)).anyTimes();
+    
EasyMock.expect(indexTaskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)).anyTimes();
+
+    
EasyMock.expect(indexTaskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime)).anyTimes();
+    
EasyMock.expect(indexTaskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)).anyTimes();
+
+    ImmutableMap<String, String> partitionOffset = ImmutableMap.of("0", "10");
+    final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
+    checkpoints.put(0, partitionOffset);
+
+    
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id1"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(checkpoints))
+            .anyTimes();
+    
EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id2"), 
EasyMock.anyBoolean()))
+            .andReturn(Futures.immediateFuture(checkpoints))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.pauseAsync("id1"))
+            .andReturn(Futures.immediateFuture(partitionOffset))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.pauseAsync("id2"))
+            .andReturn(Futures.immediateFuture(partitionOffset))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id1", partitionOffset, 
false))
+            .andReturn(Futures.immediateFuture(true))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id2", partitionOffset, 
false))
+            .andReturn(Futures.immediateFuture(true))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.resumeAsync("id1"))
+            .andReturn(Futures.immediateFuture(true))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.resumeAsync("id2"))
+            .andReturn(Futures.immediateFuture(true))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.stopAsync("id1", false))
+            .andReturn(Futures.immediateFuture(true))
+            .anyTimes();
+    EasyMock.expect(indexTaskClient.stopAsync("id2", false))
+            .andReturn(Futures.immediateFuture(true))
+            .anyTimes();
+
+    replayAll();
+
+    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
+
+    supervisor.start();
+    supervisor.runInternal();
+
+    supervisor.checkpoint(
+            0,
+            new TestSeekableStreamDataSourceMetadata(
+                    new SeekableStreamStartSequenceNumbers<>(STREAM, 
checkpoints.get(0), ImmutableSet.of())
+            )
+    );
+
+    while (supervisor.getNoticesQueueSize() > 0) {
+      Thread.sleep(100);
+    }
+
+    verifyAll();
+
+    Assert.assertTrue(supervisor.getNoticesQueueSize() == 0);
+  }
 
   @Test
   public void testEmitBothLag() throws Exception
@@ -2208,4 +2380,95 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
       }
     }
   }
+
+  private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
+  {
+    private final String taskType;
+    private final TaskLocation location;
+    private final String dataSource;
+
+    TestTaskRunnerWorkItem(Task task, ListenableFuture<TaskStatus> result, 
TaskLocation location)
+    {
+      super(task.getId(), result);
+      this.taskType = task.getType();
+      this.location = location;
+      this.dataSource = task.getDataSource();
+    }
+
+    @Override
+    public TaskLocation getLocation()
+    {
+      return location;
+    }
+
+    @Override
+    public String getTaskType()
+    {
+      return taskType;
+    }
+
+    @Override
+    public String getDataSource()
+    {
+      return dataSource;
+    }
+  }
+
+  private static class TestSeekableStreamDataSourceMetadata extends 
SeekableStreamDataSourceMetadata<String, String>
+  {
+
+    @JsonCreator
+    public TestSeekableStreamDataSourceMetadata(
+            @JsonProperty("partitions") SeekableStreamSequenceNumbers<String, 
String> partitions
+    )
+    {
+      super(partitions);
+    }
+
+    @Override
+    public DataSourceMetadata asStartMetadata()
+    {
+      final SeekableStreamSequenceNumbers<String, String> sequenceNumbers = 
getSeekableStreamSequenceNumbers();
+      if (sequenceNumbers instanceof SeekableStreamEndSequenceNumbers) {
+        return createConcreteDataSourceMetaData(
+                ((SeekableStreamEndSequenceNumbers<String, String>) 
sequenceNumbers).asStartPartitions(true)
+        );
+      } else {
+        return this;
+      }
+    }
+
+    @Override
+    protected SeekableStreamDataSourceMetadata<String, String> 
createConcreteDataSourceMetaData(
+            SeekableStreamSequenceNumbers<String, String> 
seekableStreamSequenceNumbers
+    )
+    {
+      return new 
TestSeekableStreamDataSourceMetadata(seekableStreamSequenceNumbers);
+    }
+  }
+
+  private static SeekableStreamIndexTaskIOConfig createTaskIoConfigExt(
+          int groupId,
+          Map<String, String> startPartitions,
+          Map<String, String> endPartitions,
+          String baseSequenceName,
+          DateTime minimumMessageTime,
+          DateTime maximumMessageTime,
+          Set<String> exclusiveStartSequenceNumberPartitions,
+          SeekableStreamSupervisorIOConfig ioConfig
+  )
+  {
+    return new SeekableStreamIndexTaskIOConfig<String, String>(
+            groupId,
+            baseSequenceName,
+            new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, 
exclusiveStartSequenceNumberPartitions),
+            new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
+            true,
+            minimumMessageTime,
+            maximumMessageTime,
+            ioConfig.getInputFormat()
+    )
+    {
+    };
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to