This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 07feec8af33 Add supervisor config to release task lock after segment 
handoff (#18436)
07feec8af33 is described below

commit 07feec8af336cda0332768d23d38e5219067619f
Author: Uddeshya Singh <[email protected]>
AuthorDate: Sat Aug 30 10:27:46 2025 +0530

    Add supervisor config to release task lock after segment handoff (#18436)
    
    Changes:
    - Add supervisor io config `releaseLocksOnHandoff`
    - Add interface `TaskIntervalUnlocker`
    - Use `TaskIntervalUnlocker` in `StreamAppenderator` to release a lock if 
there are no more overlapping sinks
---
 .../embedded/indexing/IngestionSmokeTest.java      |  2 +-
 .../embedded/indexing/KafkaClusterMetricsTest.java |  2 +-
 .../embedded/indexing/KafkaDataFormatsTest.java    |  2 +-
 .../RabbitStreamIndexTaskTuningConfig.java         |  3 +-
 .../indexing/kafka/KafkaIndexTaskTuningConfig.java | 15 ++--
 .../supervisor/KafkaSupervisorTuningConfig.java    | 10 ++-
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |  1 +
 .../kafka/KafkaIndexTaskTuningConfigTest.java      |  6 +-
 .../simulate/EmbeddedKafkaSupervisorTest.java      | 28 ++++++-
 .../kafka/supervisor/KafkaSupervisorTest.java      |  7 ++
 .../TestModifiedKafkaIndexTaskTuningConfig.java    |  3 +-
 .../kinesis/KinesisIndexTaskTuningConfig.java      |  3 +-
 .../SeekableStreamAppenderatorConfig.java          |  6 ++
 .../seekablestream/SeekableStreamIndexTask.java    |  6 +-
 .../SeekableStreamIndexTaskTuningConfig.java       | 17 +++-
 .../common/task/TestAppenderatorsManager.java      |  7 +-
 .../SeekableStreamSupervisorSpecTest.java          |  1 +
 .../SeekableStreamSupervisorStateTest.java         |  1 +
 .../realtime/appenderator/AppenderatorConfig.java  |  8 ++
 .../realtime/appenderator/Appenderators.java       |  6 +-
 .../appenderator/AppenderatorsManager.java         |  3 +-
 .../DummyForInjectionAppenderatorsManager.java     |  3 +-
 .../appenderator/PeonAppenderatorsManager.java     |  6 +-
 .../realtime/appenderator/StreamAppenderator.java  | 33 +++++++-
 .../appenderator/TaskIntervalUnlocker.java         | 38 +++++++++
 .../UnifiedIndexerAppenderatorsManager.java        | 12 ++-
 .../appenderator/BatchAppenderatorTester.java      |  3 +-
 .../appenderator/StreamAppenderatorTest.java       | 97 ++++++++++++++++++++++
 .../appenderator/StreamAppenderatorTester.java     | 35 ++++++--
 .../appenderator/TestAppenderatorConfig.java       | 16 +++-
 30 files changed, 339 insertions(+), 41 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
index 5b5f4e5083c..af9c3d21c8e 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
@@ -348,7 +348,7 @@ public class IngestionSmokeTest extends 
EmbeddedClusterTestBase
         null, null, null,
         1,
         null, null, null, null, null, null, null, null, null, null,
-        null, null, null, null, null, null, null, null, null, null
+        null, null, null, null, null, null, null, null, null, null, null
     );
   }
 
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
index aa3667405bd..2cf03e99ce9 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
@@ -449,7 +449,7 @@ public class KafkaClusterMetricsTest extends 
EmbeddedClusterTestBase
         null, null, null,
         maxRowsPerSegment,
         null, null, null, null, null, null, null, null, null, null,
-        null, null, null, null, null, null, null, null, null, null
+        null, null, null, null, null, null, null, null, null, null, null
     );
   }
 }
diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaDataFormatsTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaDataFormatsTest.java
index 4a97ba8811c..c8fff7347d5 100644
--- 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaDataFormatsTest.java
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaDataFormatsTest.java
@@ -643,7 +643,7 @@ public class KafkaDataFormatsTest extends 
EmbeddedClusterTestBase
         null, null, null,
         1,
         null, null, null, null, null, null, null, null, null, null,
-        null, null, null, null, null, null, null, null, null, null
+        null, null, null, null, null, null, null, null, null, null, null
     );
   }
 
diff --git 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfig.java
 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfig.java
index 288e28966a0..dd5bc84d523 100644
--- 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfig.java
+++ 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfig.java
@@ -100,7 +100,8 @@ public class RabbitStreamIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTu
         maxParseExceptions,
         maxSavedParseExceptions,
         numPersistThreads,
-        maxColumnsToMerge
+        maxColumnsToMerge,
+        null
     );
 
     this.recordBufferSize = recordBufferSize;
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
index b0b358da20b..3fa046b63e6 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
@@ -53,7 +53,8 @@ public class KafkaIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningCon
       @Nullable Integer maxParseExceptions,
       @Nullable Integer maxSavedParseExceptions,
       @Nullable Integer numPersistThreads,
-      @Nullable Integer maxColumnsToMerge
+      @Nullable Integer maxColumnsToMerge,
+      @Nullable Boolean releaseLocksOnHandoff
   )
   {
     super(
@@ -78,7 +79,8 @@ public class KafkaIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningCon
         maxParseExceptions,
         maxSavedParseExceptions,
         numPersistThreads,
-        maxColumnsToMerge
+        maxColumnsToMerge,
+        releaseLocksOnHandoff
     );
   }
 
@@ -103,7 +105,8 @@ public class KafkaIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningCon
       @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
       @JsonProperty("maxSavedParseExceptions") @Nullable Integer 
maxSavedParseExceptions,
       @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
-      @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge
+      @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge,
+      @JsonProperty("releaseLocksOnHandoff") @Nullable Boolean 
releaseLocksOnHandoff
   )
   {
     this(
@@ -127,7 +130,8 @@ public class KafkaIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningCon
         maxParseExceptions,
         maxSavedParseExceptions,
         numPersistThreads,
-        maxColumnsToMerge
+        maxColumnsToMerge,
+        releaseLocksOnHandoff
     );
   }
 
@@ -155,7 +159,8 @@ public class KafkaIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningCon
         getMaxParseExceptions(),
         getMaxSavedParseExceptions(),
         getNumPersistThreads(),
-        getMaxColumnsToMerge()
+        getMaxColumnsToMerge(),
+        isReleaseLocksOnHandoff()
     );
   }
 
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
index 1e0b3587409..c4a21674d30 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
@@ -66,6 +66,7 @@ public class KafkaSupervisorTuningConfig extends 
KafkaIndexTaskTuningConfig
         null,
         null,
         null,
+        null,
         null
     );
   }
@@ -95,7 +96,8 @@ public class KafkaSupervisorTuningConfig extends 
KafkaIndexTaskTuningConfig
       @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
       @JsonProperty("maxSavedParseExceptions") @Nullable Integer 
maxSavedParseExceptions,
       @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
-      @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge
+      @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge,
+      @JsonProperty("releaseLocksOnHandoff") @Nullable Boolean 
releaseLocksOnHandoff
   )
   {
     super(
@@ -119,7 +121,8 @@ public class KafkaSupervisorTuningConfig extends 
KafkaIndexTaskTuningConfig
         maxParseExceptions,
         maxSavedParseExceptions,
         numPersistThreads,
-        maxColumnsToMerge
+        maxColumnsToMerge,
+        releaseLocksOnHandoff
     );
     this.workerThreads = workerThreads;
     this.chatRetries = (chatRetries != null ? chatRetries : 
DEFAULT_CHAT_RETRIES);
@@ -233,7 +236,8 @@ public class KafkaSupervisorTuningConfig extends 
KafkaIndexTaskTuningConfig
         getMaxParseExceptions(),
         getMaxSavedParseExceptions(),
         getNumPersistThreads(),
-        getMaxColumnsToMerge()
+        getMaxColumnsToMerge(),
+        isReleaseLocksOnHandoff()
     );
   }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index f736d01bacb..9954de88c26 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2911,6 +2911,7 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         maxParseExceptions,
         maxSavedParseExceptions,
         null,
+        null,
         null
     );
     if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index 20777b320a5..132b65a15cb 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -155,7 +155,8 @@ public class KafkaIndexTaskTuningConfigTest
         null,
         null,
         2,
-        5
+        5,
+        false
     );
     KafkaIndexTaskTuningConfig copy = original.convertToTaskTuningConfig();
 
@@ -198,7 +199,8 @@ public class KafkaIndexTaskTuningConfigTest
         42,
         42,
         2,
-        -1
+        -1,
+        false
     );
 
     String serialized = mapper.writeValueAsString(base);
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
index bab227dc864..058f3ffa736 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
@@ -34,21 +34,26 @@ import 
org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.query.DruidMetrics;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.testing.embedded.EmbeddedBroker;
 import org.apache.druid.testing.embedded.EmbeddedCoordinator;
 import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
 import org.apache.druid.testing.embedded.EmbeddedIndexer;
 import org.apache.druid.testing.embedded.EmbeddedOverlord;
 import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
 public class EmbeddedKafkaSupervisorTest extends EmbeddedClusterTestBase
@@ -56,12 +61,14 @@ public class EmbeddedKafkaSupervisorTest extends 
EmbeddedClusterTestBase
   private final EmbeddedBroker broker = new EmbeddedBroker();
   private final EmbeddedIndexer indexer = new EmbeddedIndexer();
   private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedHistorical historical = new EmbeddedHistorical();
   private KafkaResource kafkaServer;
 
   @Override
   public EmbeddedDruidCluster createCluster()
   {
     final EmbeddedDruidCluster cluster = 
EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper();
+    indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
 
     kafkaServer = new KafkaResource();
 
@@ -71,6 +78,7 @@ public class EmbeddedKafkaSupervisorTest extends 
EmbeddedClusterTestBase
            .addServer(new EmbeddedCoordinator())
            .addServer(overlord)
            .addServer(indexer)
+           .addServer(historical)
            .addServer(broker);
 
     return cluster;
@@ -82,8 +90,9 @@ public class EmbeddedKafkaSupervisorTest extends 
EmbeddedClusterTestBase
     final String topic = dataSource;
     kafkaServer.createTopicWithPartitions(topic, 2);
 
+    final int expectedSegments = 10;
     kafkaServer.produceRecordsToTopic(
-        generateRecordsForTopic(topic, 10, DateTimes.of("2025-06-01"))
+        generateRecordsForTopic(topic, expectedSegments, 
DateTimes.of("2025-06-01"))
     );
 
     // Submit and start a supervisor
@@ -119,6 +128,21 @@ public class EmbeddedKafkaSupervisorTest extends 
EmbeddedClusterTestBase
     
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
     supervisorStatus = cluster.callApi().getSupervisorStatus(supervisorId);
     Assertions.assertTrue(supervisorStatus.isSuspended());
+    indexer.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("ingest/handoff/count")
+                      .hasDimension(DruidMetrics.DATASOURCE, 
List.of(dataSource)),
+        agg -> agg.hasSumAtLeast(expectedSegments)
+    );
+    overlord.latchableEmitter().waitForEventAggregate(
+        event -> event.hasMetricName("task/action/run/time")
+                      .hasDimension(DruidMetrics.DATASOURCE, dataSource)
+                      .hasDimension(DruidMetrics.TASK_ACTION_TYPE, 
"lockRelease"),
+        agg -> agg.hasCountAtLeast(expectedSegments)
+    );
+    List<LockFilterPolicy> lockFilterPolicies = List.of(new 
LockFilterPolicy(dataSource, 0, null, null));
+    Map<String, List<Interval>> lockedIntervals = cluster.callApi()
+                                                         
.onLeaderOverlord(client -> client.findLockedIntervals(lockFilterPolicies));
+    Assertions.assertEquals(0, lockedIntervals.size());
   }
 
   private KafkaSupervisorSpec createKafkaSupervisor(String supervisorId, 
String topic)
@@ -154,7 +178,7 @@ public class EmbeddedKafkaSupervisorTest extends 
EmbeddedClusterTestBase
         null, null, null,
         1,
         null, null, null, null, null, null, null, null, null, null,
-        null, null, null, null, null, null, null, null, null, null
+        null, null, null, null, new Period("PT2S"), null, null, null, null, 
null, true
     );
   }
 
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index eabacd33a45..9b80d2db41a 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -357,6 +357,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
+        null,
         null
     );
 
@@ -532,6 +533,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
             null,
             null,
             null,
+            null,
             null
         ),
         null
@@ -4672,6 +4674,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
             null,
             null,
             null,
+            null,
             null
         )
     );
@@ -4712,6 +4715,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
+        null,
         null
     );
 
@@ -4866,6 +4870,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
             null,
             null,
             null,
+            null,
             null
         )
     );
@@ -5386,6 +5391,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         10,
         null,
+        null,
         null
     );
 
@@ -5503,6 +5509,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
+        null,
         null
     );
 
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
index a2b7228bb46..08a083fe112 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
@@ -81,7 +81,8 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends 
KafkaIndexTaskTuning
         maxParseExceptions,
         maxSavedParseExceptions,
         numPersistThreads,
-        maxColumnsToMerge
+        maxColumnsToMerge,
+        null
     );
     this.extra = extra;
   }
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
index 97d63913bc1..5720ce2ae22 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
@@ -112,7 +112,8 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
         maxParseExceptions,
         maxSavedParseExceptions,
         null,
-        maxColumnsToMerge
+        maxColumnsToMerge,
+        false
     );
     this.recordBufferSize = recordBufferSize;
     this.recordBufferSizeBytes = recordBufferSizeBytes;
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfig.java
index f6490579c75..6e4065ea1c9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfig.java
@@ -192,6 +192,12 @@ public class SeekableStreamAppenderatorConfig implements 
AppenderatorConfig
     return maxColumnsToMerge;
   }
 
+  @Override
+  public boolean isReleaseLocksOnHandoff()
+  {
+    return tuningConfig.isReleaseLocksOnHandoff();
+  }
+
   @Override
   public boolean equals(Object o)
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 59dc4c433dc..24f74b8da8b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -32,6 +32,7 @@ import 
org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.actions.LockReleaseAction;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.actions.TaskLocks;
@@ -228,7 +229,10 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
         toolbox.getPolicyEnforcer(),
         rowIngestionMeters,
         parseExceptionHandler,
-        toolbox.getCentralizedTableSchemaConfig()
+        toolbox.getCentralizedTableSchemaConfig(),
+        interval -> {
+          toolbox.getTaskActionClient().submit(new 
LockReleaseAction(interval));
+        }
     );
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
index 124682bcb2e..ea531d77f27 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.seekablestream;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.incremental.AppendableIndexSpec;
@@ -41,6 +42,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig 
implements TuningConfi
   private static final Period DEFAULT_INTERMEDIATE_PERSIST_PERIOD = new 
Period("PT10M");
   private static final IndexSpec DEFAULT_INDEX_SPEC = IndexSpec.DEFAULT;
   private static final Boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = Boolean.FALSE;
+  private static final boolean DEFAULT_RELEASE_LOCKS_ON_HANDOFF = false;
   private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = 
Duration.ofMinutes(15).toMillis();
 
   private final AppendableIndexSpec appendableIndexSpec;
@@ -68,6 +70,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig 
implements TuningConfi
 
   private final int numPersistThreads;
   private final int maxColumnsToMerge;
+  private final boolean releaseLocksOnHandoff;
 
   public SeekableStreamIndexTaskTuningConfig(
       @Nullable AppendableIndexSpec appendableIndexSpec,
@@ -91,7 +94,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig 
implements TuningConfi
       @Nullable Integer maxParseExceptions,
       @Nullable Integer maxSavedParseExceptions,
       @Nullable Integer numPersistThreads,
-      @Nullable Integer maxColumnsToMerge
+      @Nullable Integer maxColumnsToMerge,
+      @Nullable Boolean releaseLocksOnHandoff
   )
   {
     this.appendableIndexSpec = appendableIndexSpec == null ? 
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
@@ -146,6 +150,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig 
implements TuningConfi
       this.numPersistThreads = Math.max(numPersistThreads, 
AppenderatorConfig.DEFAULT_NUM_PERSIST_THREADS);
     }
     this.maxColumnsToMerge = maxColumnsToMerge == null ? 
DEFAULT_MAX_COLUMNS_TO_MERGE : maxColumnsToMerge;
+    this.releaseLocksOnHandoff = Configs.valueOrDefault(releaseLocksOnHandoff, 
DEFAULT_RELEASE_LOCKS_ON_HANDOFF);
   }
 
   @Override
@@ -293,6 +298,12 @@ public abstract class SeekableStreamIndexTaskTuningConfig 
implements TuningConfi
     return maxColumnsToMerge;
   }
 
+  @JsonProperty
+  public boolean isReleaseLocksOnHandoff()
+  {
+    return releaseLocksOnHandoff;
+  }
+
   public abstract SeekableStreamIndexTaskTuningConfig 
withBasePersistDirectory(File dir);
 
   @Override
@@ -319,6 +330,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig 
implements TuningConfi
            maxSavedParseExceptions == that.maxSavedParseExceptions &&
            numPersistThreads == that.numPersistThreads &&
            maxColumnsToMerge == that.maxColumnsToMerge &&
+           releaseLocksOnHandoff == that.releaseLocksOnHandoff &&
            Objects.equals(partitionsSpec, that.partitionsSpec) &&
            Objects.equals(intermediatePersistPeriod, 
that.intermediatePersistPeriod) &&
            Objects.equals(basePersistDirectory, that.basePersistDirectory) &&
@@ -352,7 +364,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig 
implements TuningConfi
         maxParseExceptions,
         maxSavedParseExceptions,
         numPersistThreads,
-        maxColumnsToMerge
+        maxColumnsToMerge,
+        releaseLocksOnHandoff
     );
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
index 1a501abba70..e859eb9700d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java
@@ -45,6 +45,7 @@ import 
org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
 import org.apache.druid.segment.realtime.appenderator.Appenderators;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
 import org.apache.druid.segment.realtime.appenderator.TaskDirectory;
+import org.apache.druid.segment.realtime.appenderator.TaskIntervalUnlocker;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.joda.time.Interval;
 
@@ -75,7 +76,8 @@ public class TestAppenderatorsManager implements 
AppenderatorsManager
       PolicyEnforcer policyEnforcer,
       RowIngestionMeters rowIngestionMeters,
       ParseExceptionHandler parseExceptionHandler,
-      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+      TaskIntervalUnlocker taskIntervalUnlocker
   )
   {
     realtimeAppenderator = Appenderators.createRealtime(
@@ -98,7 +100,8 @@ public class TestAppenderatorsManager implements 
AppenderatorsManager
         policyEnforcer,
         rowIngestionMeters,
         parseExceptionHandler,
-        centralizedDatasourceSchemaConfig
+        centralizedDatasourceSchemaConfig,
+        taskIntervalUnlocker
     );
     return realtimeAppenderator;
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index a663c7c19aa..f0ba8966626 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -519,6 +519,7 @@ public class SeekableStreamSupervisorSpecTest extends 
EasyMockSupport
             null,
             null,
             null,
+            null,
             null
         )
         {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index feba656ee63..54bf3c84bce 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -2837,6 +2837,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
             null,
             null,
             null,
+            null,
             null
         )
         {
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
index 7e5f1b3f25f..a88bc864c94 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
@@ -38,6 +38,14 @@ public interface AppenderatorConfig extends TuningConfig
 
   boolean isSkipBytesInMemoryOverheadCheck();
 
+  /**
+   * @return true if locks should be released after segments have been handed 
off to Historicals.
+   */
+  default boolean isReleaseLocksOnHandoff()
+  {
+    return false;
+  }
+
   default int getNumPersistThreads()
   {
     return DEFAULT_NUM_PERSIST_THREADS;
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
index 360ea444c6d..096db844e4d 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java
@@ -68,7 +68,8 @@ public class Appenderators
       PolicyEnforcer policyEnforcer,
       RowIngestionMeters rowIngestionMeters,
       ParseExceptionHandler parseExceptionHandler,
-      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+      TaskIntervalUnlocker taskIntervalUnlocker
   )
   {
     return new StreamAppenderator(
@@ -99,7 +100,8 @@ public class Appenderators
         cache,
         rowIngestionMeters,
         parseExceptionHandler,
-        centralizedDatasourceSchemaConfig
+        centralizedDatasourceSchemaConfig,
+        taskIntervalUnlocker
     );
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
index 97f8c07c78e..dea08ecddbf 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java
@@ -89,7 +89,8 @@ public interface AppenderatorsManager
       PolicyEnforcer policyEnforcer,
       RowIngestionMeters rowIngestionMeters,
       ParseExceptionHandler parseExceptionHandler,
-      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+      TaskIntervalUnlocker taskIntervalUnlocker
   );
 
   /**
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
index fcf541aed41..70a820f9886 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java
@@ -79,7 +79,8 @@ public class DummyForInjectionAppenderatorsManager implements 
AppenderatorsManag
       PolicyEnforcer policyEnforcer,
       RowIngestionMeters rowIngestionMeters,
       ParseExceptionHandler parseExceptionHandler,
-      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+      TaskIntervalUnlocker taskIntervalUnlocker
   )
   {
     throw new UOE(ERROR_MSG);
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
index 1f894bff3cc..518e3f74ad9 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java
@@ -85,7 +85,8 @@ public class PeonAppenderatorsManager implements 
AppenderatorsManager
       PolicyEnforcer policyEnforcer,
       RowIngestionMeters rowIngestionMeters,
       ParseExceptionHandler parseExceptionHandler,
-      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+      TaskIntervalUnlocker taskIntervalUnlocker
   )
   {
     if (realtimeAppenderator != null) {
@@ -113,7 +114,8 @@ public class PeonAppenderatorsManager implements 
AppenderatorsManager
           policyEnforcer,
           rowIngestionMeters,
           parseExceptionHandler,
-          centralizedDatasourceSchemaConfig
+          centralizedDatasourceSchemaConfig,
+          taskIntervalUnlocker
       );
     }
     return realtimeAppenderator;
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
index 0d41f396f0e..8a64225964c 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
@@ -202,6 +202,7 @@ public class StreamAppenderator implements Appenderator
   private final SegmentLoaderConfig segmentLoaderConfig;
   private ScheduledExecutorService exec;
   private final FingerprintGenerator fingerprintGenerator;
+  private final TaskIntervalUnlocker taskIntervalUnlocker;
 
   /**
    * This constructor allows the caller to provide its own 
SinkQuerySegmentWalker.
@@ -227,7 +228,8 @@ public class StreamAppenderator implements Appenderator
       Cache cache,
       RowIngestionMeters rowIngestionMeters,
       ParseExceptionHandler parseExceptionHandler,
-      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+      TaskIntervalUnlocker taskIntervalUnlocker
   )
   {
     this.segmentLoaderConfig = segmentLoaderConfig;
@@ -255,6 +257,7 @@ public class StreamAppenderator implements Appenderator
         Execs.makeThreadFactory("StreamAppenderSegmentRemoval-%s")
     );
     this.fingerprintGenerator = new FingerprintGenerator(objectMapper);
+    this.taskIntervalUnlocker = taskIntervalUnlocker;
   }
 
   @VisibleForTesting
@@ -1525,6 +1528,10 @@ public class StreamAppenderator implements Appenderator
                 removeDirectory(computePersistDir(identifier));
               }
 
+              if (tuningConfig.isReleaseLocksOnHandoff()) {
+                unlockIntervalIfApplicable(sink);
+              }
+
               log.info("Dropped segment[%s].", identifier);
             };
 
@@ -1559,6 +1566,30 @@ public class StreamAppenderator implements Appenderator
     );
   }
 
+  /**
+   * Unlock the interval if there are more active sinks writing for this 
interval.
+   * The interval will be unlocked if there is no other sink writing to any 
overlapping intervals.
+   */
+  private void unlockIntervalIfApplicable(Sink abandonedSink)
+  {
+    Interval abandonedInterval = abandonedSink.getInterval();
+    boolean isIntervalActive = sinks.entrySet().stream()
+                                    .anyMatch(entry -> {
+                                      return 
entry.getValue().getInterval().overlaps(abandonedInterval);
+                                    });
+    if (isIntervalActive) {
+      log.info("Interval[%s] is still being appended to by other sinks.", 
abandonedInterval);
+    } else {
+      log.info("Unlocking interval[%s] as there are no more active sinks for 
it.", abandonedInterval);
+      try {
+        taskIntervalUnlocker.releaseLock(abandonedInterval);
+      }
+      catch (IOException e) {
+        log.makeAlert(e, "Failed to unlock interval[%s]", 
abandonedInterval).emit();
+      }
+    }
+  }
+
   private Committed readCommit() throws IOException
   {
     final File commitFile = computeCommitFile();
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskIntervalUnlocker.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskIntervalUnlocker.java
new file mode 100644
index 00000000000..00c23a997ff
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TaskIntervalUnlocker.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.realtime.appenderator;
+
+import org.joda.time.Interval;
+
+import java.io.IOException;
+
+/**
+ * Used to release task locks after segments have been handed off, typically 
with long-running tasks
+ * to avoid holding locks for longer than necessary. This interface is used 
instead of {@code TaskActionClient}
+ * to prevent a cyclic dependency with druid-indexing-service module.
+ */
+@FunctionalInterface
+public interface TaskIntervalUnlocker
+{
+  /**
+   * Releases the lock for the exact interval for a task.
+   */
+  void releaseLock(Interval interval) throws IOException;
+}
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
index 9a4c20e9a07..9eafd853d3f 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java
@@ -170,7 +170,8 @@ public class UnifiedIndexerAppenderatorsManager implements 
AppenderatorsManager
       PolicyEnforcer policyEnforcer,
       RowIngestionMeters rowIngestionMeters,
       ParseExceptionHandler parseExceptionHandler,
-      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
+      CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
+      TaskIntervalUnlocker taskIntervalUnlocker
   )
   {
     synchronized (this) {
@@ -194,7 +195,8 @@ public class UnifiedIndexerAppenderatorsManager implements 
AppenderatorsManager
           cache,
           rowIngestionMeters,
           parseExceptionHandler,
-          centralizedDatasourceSchemaConfig
+          centralizedDatasourceSchemaConfig,
+          taskIntervalUnlocker
       )
       {
         @Override
@@ -507,6 +509,12 @@ public class UnifiedIndexerAppenderatorsManager implements 
AppenderatorsManager
     {
       return baseConfig.getNumPersistThreads();
     }
+
+    @Override
+    public boolean isReleaseLocksOnHandoff()
+    {
+      return baseConfig.isReleaseLocksOnHandoff();
+    }
   }
 
   private IndexMerger wrapIndexMerger(IndexMerger baseMerger)
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java
index 19e75a712f3..6ac64dc0570 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorTester.java
@@ -174,7 +174,8 @@ public class BatchAppenderatorTester implements 
AutoCloseable
         0L,
         OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE,
-        basePersistDirectory == null ? createNewBasePersistDirectory() : 
basePersistDirectory
+        basePersistDirectory == null ? createNewBasePersistDirectory() : 
basePersistDirectory,
+        null
     );
     metrics = new SegmentGenerationMetrics();
 
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
index f7e2bb0b57e..22074f4f78f 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
@@ -63,6 +63,7 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.hamcrest.CoreMatchers;
 import org.hamcrest.MatcherAssert;
+import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -2449,6 +2450,102 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
     }
   }
 
+
+  @Test
+  public void test_dropSegment_unlocksInterval() throws Exception
+  {
+    final List<Interval> unlockedIntervals = Collections.synchronizedList(new 
ArrayList<>());
+    final TaskIntervalUnlocker intervalUnlocker = unlockedIntervals::add;
+
+    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester.Builder()
+        .basePersistDirectory(temporaryFolder.newFolder())
+        .maxRowsInMemory(2)
+        .releaseLocksOnHandoff(true)
+        .taskIntervalUnlocker(intervalUnlocker)
+        .build()) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      appenderator.startJob();
+
+      final SegmentIdWithShardSpec segmentId1 = 
si("2000-01-01T00:00/2000-01-01T01:00", "version1", 0);
+      final SegmentIdWithShardSpec segmentId2 = 
si("2000-01-01T01:00/2000-01-01T02:00", "version1", 0);
+
+      final InputRow row1 = new MapBasedInputRow(
+          DateTimes.of("2000"),
+          List.of("dim1"),
+          Map.of("dim1", "bar", "met1", 1)
+      );
+
+      final InputRow row2 = new MapBasedInputRow(
+          DateTimes.of("2000-01-01T02:30"),
+          List.of("dim1"),
+          Map.of("dim1", "baz", "met1", 1)
+      );
+
+      appenderator.add(segmentId1, row1, 
Suppliers.ofInstance(Committers.nil()), false);
+      appenderator.add(segmentId2, row2, 
Suppliers.ofInstance(Committers.nil()), false);
+
+      Assert.assertEquals(2, appenderator.getSegments().size());
+
+      appenderator.drop(segmentId1).get();
+
+      synchronized (unlockedIntervals) {
+        Assert.assertEquals(1, unlockedIntervals.size());
+        Assert.assertEquals(segmentId1.getInterval(), 
unlockedIntervals.get(0));
+      }
+
+      Assert.assertEquals(1, appenderator.getSegments().size());
+      Assert.assertTrue(appenderator.getSegments().contains(segmentId2));
+    }
+  }
+
+  @Test
+  public void test_dropSegment_skipsUnlockInterval_ifOverlappingSinkIsActive() 
throws Exception
+  {
+    final List<Interval> unlockedIntervals = Collections.synchronizedList(new 
ArrayList<>());
+    final TaskIntervalUnlocker intervalUnlocker = unlockedIntervals::add;
+
+    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester.Builder()
+        .basePersistDirectory(temporaryFolder.newFolder())
+        .maxRowsInMemory(2)
+        .releaseLocksOnHandoff(true)
+        .taskIntervalUnlocker(intervalUnlocker)
+        .build()) {
+      final Appenderator appenderator = tester.getAppenderator();
+
+      appenderator.startJob();
+
+      final SegmentIdWithShardSpec segmentId1 = 
si("2000-01-01T00:00/2000-01-01T01:00", "version1", 0);
+      final SegmentIdWithShardSpec segmentId2 = 
si("2000-01-01T00:30/2000-01-01T01:30", "version2", 0);
+
+      final InputRow row1 = new MapBasedInputRow(
+          DateTimes.of("2000"),
+          List.of("dim1"),
+          Map.of("dim1", "bar", "met1", 1)
+      );
+
+      final InputRow row2 = new MapBasedInputRow(
+          DateTimes.of("2000-01-01T02:30"),
+          List.of("dim1"),
+          Map.of("dim1", "baz", "met1", 1)
+      );
+
+      appenderator.add(segmentId1, row1, 
Suppliers.ofInstance(Committers.nil()), false);
+      appenderator.add(segmentId2, row2, 
Suppliers.ofInstance(Committers.nil()), false);
+
+      Assert.assertEquals(2, appenderator.getSegments().size());
+
+      appenderator.drop(segmentId1).get();
+
+      synchronized (unlockedIntervals) {
+        Assert.assertEquals(0, unlockedIntervals.size());
+      }
+
+      Assert.assertEquals(1, appenderator.getSegments().size());
+      Assert.assertTrue(appenderator.getSegments().contains(segmentId2));
+    }
+  }
+
   private static SegmentIdWithShardSpec si(String interval, String version, 
int partitionNum)
   {
     return new SegmentIdWithShardSpec(
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
index bdde04a2a2f..d4d0c97535a 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
@@ -113,7 +113,9 @@ public class StreamAppenderatorTester implements 
AutoCloseable
       final DataSegmentAnnouncer announcer,
       final CentralizedDatasourceSchemaConfig 
centralizedDatasourceSchemaConfig,
       final ServiceEmitter serviceEmitter,
-      final PolicyEnforcer policyEnforcer
+      final PolicyEnforcer policyEnforcer,
+      final boolean releaseLocksOnHandoff,
+      final TaskIntervalUnlocker taskIntervalUnlocker
   )
   {
     objectMapper = new DefaultObjectMapper();
@@ -159,7 +161,8 @@ public class StreamAppenderatorTester implements 
AutoCloseable
         0L,
         OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE,
-        basePersistDirectory
+        basePersistDirectory,
+        releaseLocksOnHandoff
     );
 
     metrics = new SegmentGenerationMetrics();
@@ -250,7 +253,8 @@ public class StreamAppenderatorTester implements 
AutoCloseable
           policyEnforcer,
           rowIngestionMeters,
           new ParseExceptionHandler(rowIngestionMeters, false, 
Integer.MAX_VALUE, 0),
-          centralizedDatasourceSchemaConfig
+          centralizedDatasourceSchemaConfig,
+          taskIntervalUnlocker
       );
     } else {
       SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
@@ -292,7 +296,8 @@ public class StreamAppenderatorTester implements 
AutoCloseable
           NoopPolicyEnforcer.instance(),
           rowIngestionMeters,
           new ParseExceptionHandler(rowIngestionMeters, false, 
Integer.MAX_VALUE, 0),
-          centralizedDatasourceSchemaConfig
+          centralizedDatasourceSchemaConfig,
+          taskIntervalUnlocker
       );
     }
   }
@@ -357,6 +362,8 @@ public class StreamAppenderatorTester implements 
AutoCloseable
     private int delayInMilli = 0;
     private ServiceEmitter serviceEmitter;
     private PolicyEnforcer policyEnforcer = NoopPolicyEnforcer.instance();
+    private boolean releaseLocksOnHandoff;
+    private TaskIntervalUnlocker taskIntervalUnlocker = interval -> {};
 
     public Builder maxRowsInMemory(final int maxRowsInMemory)
     {
@@ -412,6 +419,18 @@ public class StreamAppenderatorTester implements 
AutoCloseable
       return this;
     }
 
+    public Builder releaseLocksOnHandoff(boolean releaseLocksOnHandoff)
+    {
+      this.releaseLocksOnHandoff = releaseLocksOnHandoff;
+      return this;
+    }
+
+    public Builder taskIntervalUnlocker(TaskIntervalUnlocker 
taskIntervalUnlocker)
+    {
+      this.taskIntervalUnlocker = taskIntervalUnlocker;
+      return this;
+    }
+
     public StreamAppenderatorTester build()
     {
       return new StreamAppenderatorTester(
@@ -425,7 +444,9 @@ public class StreamAppenderatorTester implements 
AutoCloseable
           new NoopDataSegmentAnnouncer(),
           CentralizedDatasourceSchemaConfig.create(),
           serviceEmitter,
-          policyEnforcer
+          policyEnforcer,
+          releaseLocksOnHandoff,
+          taskIntervalUnlocker
       );
     }
 
@@ -445,7 +466,9 @@ public class StreamAppenderatorTester implements 
AutoCloseable
           dataSegmentAnnouncer,
           config,
           serviceEmitter,
-          policyEnforcer
+          policyEnforcer,
+          releaseLocksOnHandoff,
+          taskIntervalUnlocker
       );
     }
   }
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestAppenderatorConfig.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestAppenderatorConfig.java
index 5dba99de500..58d00b899ae 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestAppenderatorConfig.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TestAppenderatorConfig.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment.realtime.appenderator;
 
+import org.apache.druid.common.config.Configs;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.incremental.AppendableIndexSpec;
@@ -45,6 +46,7 @@ public class TestAppenderatorConfig implements 
AppenderatorConfig
   private final IndexSpec indexSpecForIntermediatePersists;
   @Nullable
   private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
+  private final boolean releaseLocksOnHandoff;
 
   public TestAppenderatorConfig(
       AppendableIndexSpec appendableIndexSpec,
@@ -57,7 +59,8 @@ public class TestAppenderatorConfig implements 
AppenderatorConfig
       Long pushTimeout,
       @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
       Integer maxColumnsToMerge,
-      File basePersistDirectory
+      File basePersistDirectory,
+      @Nullable Boolean releaseLocksOnHandoff
   )
   {
     this.appendableIndexSpec = appendableIndexSpec;
@@ -74,6 +77,7 @@ public class TestAppenderatorConfig implements 
AppenderatorConfig
 
     this.partitionsSpec = null;
     this.indexSpecForIntermediatePersists = this.indexSpec;
+    this.releaseLocksOnHandoff = Configs.valueOrDefault(releaseLocksOnHandoff, 
false);
   }
 
   @Override
@@ -162,6 +166,12 @@ public class TestAppenderatorConfig implements 
AppenderatorConfig
     return new Period(Integer.MAX_VALUE); // intermediate persist doesn't make 
much sense for batch jobs
   }
 
+  @Override
+  public boolean isReleaseLocksOnHandoff()
+  {
+    return releaseLocksOnHandoff;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -180,6 +190,7 @@ public class TestAppenderatorConfig implements 
AppenderatorConfig
            maxPendingPersists == that.maxPendingPersists &&
            reportParseExceptions == that.reportParseExceptions &&
            pushTimeout == that.pushTimeout &&
+           releaseLocksOnHandoff == that.releaseLocksOnHandoff &&
            Objects.equals(partitionsSpec, that.partitionsSpec) &&
            Objects.equals(indexSpec, that.indexSpec) &&
            Objects.equals(indexSpecForIntermediatePersists, 
that.indexSpecForIntermediatePersists) &&
@@ -203,7 +214,8 @@ public class TestAppenderatorConfig implements 
AppenderatorConfig
         maxPendingPersists,
         reportParseExceptions,
         pushTimeout,
-        segmentWriteOutMediumFactory
+        segmentWriteOutMediumFactory,
+        releaseLocksOnHandoff
     );
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to