This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 00ce18a  Additional Kinesis resharding fixes (#8870)
00ce18a is described below

commit 00ce18a0ea1b984a07479489d43d9901575d3d28
Author: Jonathan Wei <[email protected]>
AuthorDate: Thu Nov 28 12:59:01 2019 -0800

    Additional Kinesis resharding fixes (#8870)
    
    * Additional Kinesis resharding fixes
    
    * Address PR comments
    
    * Remove unused method
    
    * Adjust SegmentTransactionalInsertAction null handling
    
    * Check for unchanged metadata on empty publish
    
    * Add logs for empty publish
    
    * Fix javadoc
    
    * Clear offset when invalid endOffsets are seen
    
    * Fix LGTM alert
    
    * Fix build
    
    * Add resharding note to Kinesis docs
    
    * Checkstyle
    
    * Spelling
    
    * Address PR comments
    
    * Checkstyle
---
 .../extensions-core/kinesis-ingestion.md           |  11 +
 .../indexing/kafka/supervisor/KafkaSupervisor.java |   5 -
 .../kafka/supervisor/KafkaSupervisorTest.java      |   7 +-
 .../kinesis/supervisor/KinesisSupervisor.java      |  73 +++--
 .../kinesis/supervisor/KinesisSupervisorTest.java  |  70 +++--
 .../actions/SegmentTransactionalInsertAction.java  |  50 +++-
 .../apache/druid/indexing/overlord/TaskQueue.java  |  27 ++
 .../indexing/seekablestream/SequenceMetadata.java  |  91 +++++-
 .../supervisor/SeekableStreamSupervisor.java       | 317 ++++++++++++++-------
 .../TestIndexerMetadataStorageCoordinator.java     |  10 +
 .../IndexerMetadataStorageCoordinator.java         |  27 ++
 .../IndexerSQLMetadataStorageCoordinator.java      |  68 +++++
 .../appenderator/BaseAppenderatorDriver.java       |  20 +-
 .../TransactionalSegmentPublisher.java             |   9 +
 website/.spelling                                  |   2 +
 15 files changed, 609 insertions(+), 178 deletions(-)

diff --git a/docs/development/extensions-core/kinesis-ingestion.md 
b/docs/development/extensions-core/kinesis-ingestion.md
index aec4699..9ede4f3 100644
--- a/docs/development/extensions-core/kinesis-ingestion.md
+++ b/docs/development/extensions-core/kinesis-ingestion.md
@@ -440,3 +440,14 @@ compatible with Apache projects.
 
 To enable this feature, add the `amazon-kinesis-client` (tested on version 
`1.9.2`) jar file 
([link](https://mvnrepository.com/artifact/com.amazonaws/amazon-kinesis-client/1.9.2))
 under `dist/druid/extensions/druid-kinesis-indexing-service/`.
 Then when submitting a supervisor-spec, set `deaggregate` to true.
+
+## Resharding
+
+When changing the shard count for a Kinesis stream, there will be a window of 
time around the resharding operation with early shutdown of Kinesis ingestion 
tasks.
+This occurs because the supervisor will update the shard -> task group 
mappings as shards are closed and fully read, to ensure that tasks are not 
running 
+with an assignment of closed shards that have been fully read and to ensure a 
balanced distribution of active shards across tasks. 
+
+This window with early task shutdowns will conclude when:
+- All closed shards have been fully read and the Kinesis ingestion tasks have 
published the data from those shards, committing the "closed" state to metadata 
storage
+- Any remaining tasks that had inactive shards in the assignment have been 
shutdown (these tasks would have been created before the closed shards were 
completely drained)
+
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index 98ba169..99c8105 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -154,14 +154,9 @@ public class KafkaSupervisor extends 
SeekableStreamSupervisor<Integer, Long>
     );
   }
 
-
   @Override
   protected int getTaskGroupIdForPartition(Integer partitionId)
   {
-    // record partitionIds so that supervisor knows when a partition is 
discovered.
-    if (!partitionIds.contains(partitionId)) {
-      partitionIds.add(partitionId);
-    }
     return partitionId % spec.getIoConfig().getTaskCount();
   }
 
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 47b2d11..a50356f 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -1274,12 +1274,12 @@ public class KafkaSupervisorTest extends EasyMockSupport
             .andReturn(Futures.immediateFuture(DateTimes.nowUtc()))
             .times(2);
     EasyMock.expect(taskClient.pauseAsync(EasyMock.contains("sequenceName-0")))
-            .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L, 1, 20L, 
2, 30L)))
-            .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L, 1, 15L, 
2, 35L)));
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L, 2, 
30L)))
+            .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L, 2, 
35L)));
     EasyMock.expect(
         taskClient.setEndOffsetsAsync(
             EasyMock.contains("sequenceName-0"),
-            EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)),
+            EasyMock.eq(ImmutableMap.of(0, 10L, 2, 35L)),
             EasyMock.eq(true)
         )
     ).andReturn(Futures.immediateFuture(true)).times(2);
@@ -1312,7 +1312,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
 
       Assert.assertEquals(topic, 
taskConfig.getStartSequenceNumbers().getStream());
       Assert.assertEquals(10L, (long) 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(0));
-      Assert.assertEquals(20L, (long) 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(1));
       Assert.assertEquals(35L, (long) 
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(2));
     }
   }
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index f7454a4..f150067 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -24,8 +24,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
 import org.apache.druid.common.aws.AWSCredentialsConfig;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.Task;
@@ -66,7 +64,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 
 /**
@@ -81,8 +78,6 @@ import java.util.concurrent.ScheduledExecutorService;
  */
 public class KinesisSupervisor extends SeekableStreamSupervisor<String, String>
 {
-  private static final HashFunction HASH_FUNCTION = Hashing.sha1();
-
   private static final EmittingLogger log = new 
EmittingLogger(KinesisSupervisor.class);
 
   public static final TypeReference<TreeMap<Integer, Map<String, String>>> 
CHECKPOINTS_TYPE_REF =
@@ -232,39 +227,32 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String>
   @Override
   protected int getTaskGroupIdForPartition(String partitionId)
   {
-    if (!partitionIds.contains(partitionId)) {
-      partitionIds.add(partitionId);
-    }
-
     return getTaskGroupIdForPartitionWithProvidedList(partitionId, 
partitionIds);
   }
 
   private int getTaskGroupIdForPartitionWithProvidedList(String partitionId, 
List<String> availablePartitions)
   {
+    int index = availablePartitions.indexOf(partitionId);
+    if (index < 0) {
+      return index;
+    }
     return availablePartitions.indexOf(partitionId) % 
spec.getIoConfig().getTaskCount();
   }
 
   @Override
-  protected Map<Integer, ConcurrentHashMap<String, String>> 
recomputePartitionGroupsForExpiration(
-      Set<String> availablePartitions
-  )
+  protected Map<Integer, Set<String>> 
recomputePartitionGroupsForExpiration(Set<String> availablePartitions)
   {
     List<String> availablePartitionsList = new 
ArrayList<>(availablePartitions);
 
-    Map<Integer, ConcurrentHashMap<String, String>> newPartitionGroups = new 
HashMap<>();
-
-    for (ConcurrentHashMap<String, String> oldGroup : 
partitionGroups.values()) {
-      for (Map.Entry<String, String> partitionOffsetMapping : 
oldGroup.entrySet()) {
-        String partitionId = partitionOffsetMapping.getKey();
-        if (availablePartitions.contains(partitionId)) {
-          int newTaskGroupId = 
getTaskGroupIdForPartitionWithProvidedList(partitionId, 
availablePartitionsList);
-          ConcurrentHashMap<String, String> partitionMap = 
newPartitionGroups.computeIfAbsent(
-              newTaskGroupId,
-              k -> new ConcurrentHashMap<>()
-          );
-          partitionMap.put(partitionId, partitionOffsetMapping.getValue());
-        }
-      }
+    Map<Integer, Set<String>> newPartitionGroups = new HashMap<>();
+
+    for (String availablePartition : availablePartitions) {
+      int newTaskGroupId = 
getTaskGroupIdForPartitionWithProvidedList(availablePartition, 
availablePartitionsList);
+      Set<String> newGroup = newPartitionGroups.computeIfAbsent(
+          newTaskGroupId,
+          k -> new HashSet<>()
+      );
+      newGroup.add(availablePartition);
     }
 
     return newPartitionGroups;
@@ -398,6 +386,32 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String>
   {
     log.info("Marking expired shards in metadata: " + expiredPartitionIds);
 
+    return createDataSourceMetadataWithClosedOrExpiredPartitions(
+        currentMetadata,
+        expiredPartitionIds,
+        KinesisSequenceNumber.EXPIRED_MARKER
+    );
+  }
+
+  @Override
+  protected SeekableStreamDataSourceMetadata<String, String> 
createDataSourceMetadataWithClosedPartitions(
+      SeekableStreamDataSourceMetadata<String, String> currentMetadata, 
Set<String> closedPartitionIds
+  )
+  {
+    log.info("Marking closed shards in metadata: " + closedPartitionIds);
+    return createDataSourceMetadataWithClosedOrExpiredPartitions(
+        currentMetadata,
+        closedPartitionIds,
+        KinesisSequenceNumber.END_OF_SHARD_MARKER
+    );
+  }
+
+  private SeekableStreamDataSourceMetadata<String, String> 
createDataSourceMetadataWithClosedOrExpiredPartitions(
+      SeekableStreamDataSourceMetadata<String, String> currentMetadata,
+      Set<String> terminatedPartitionIds,
+      String terminationMarker
+  )
+  {
     final KinesisDataSourceMetadata dataSourceMetadata = 
(KinesisDataSourceMetadata) currentMetadata;
 
     SeekableStreamSequenceNumbers<String, String> old = 
dataSourceMetadata.getSeekableStreamSequenceNumbers();
@@ -405,10 +419,10 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String>
     Map<String, String> oldPartitionSequenceNumberMap = 
old.getPartitionSequenceNumberMap();
     Map<String, String> newPartitionSequenceNumberMap = new HashMap<>();
     for (Map.Entry<String, String> entry : 
oldPartitionSequenceNumberMap.entrySet()) {
-      if (!expiredPartitionIds.contains(entry.getKey())) {
+      if (!terminatedPartitionIds.contains(entry.getKey())) {
         newPartitionSequenceNumberMap.put(entry.getKey(), entry.getValue());
       } else {
-        newPartitionSequenceNumberMap.put(entry.getKey(), 
KinesisSequenceNumber.EXPIRED_MARKER);
+        newPartitionSequenceNumberMap.put(entry.getKey(), terminationMarker);
       }
     }
 
@@ -420,7 +434,7 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String>
       newExclusiveStartPartitions = new HashSet<>();
       oldExclusiveStartPartitions = 
((SeekableStreamStartSequenceNumbers<String, String>) 
old).getExclusivePartitions();
       for (String partitionId : oldExclusiveStartPartitions) {
-        if (!expiredPartitionIds.contains(partitionId)) {
+        if (!terminatedPartitionIds.contains(partitionId)) {
           newExclusiveStartPartitions.add(partitionId);
         }
       }
@@ -443,4 +457,5 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String>
 
     return new KinesisDataSourceMetadata(newSequences);
   }
+
 }
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 292e9dc..5ce8f36 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -104,7 +104,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
 public class KinesisSupervisorTest extends EasyMockSupport
@@ -1236,24 +1238,18 @@ public class KinesisSupervisorTest extends 
EasyMockSupport
     EasyMock.expect(taskClient.pauseAsync(EasyMock.contains("sequenceName-0")))
             .andReturn(Futures.immediateFuture(ImmutableMap.of(
                 SHARD_ID1,
-                "1",
-                SHARD_ID0,
-                "0"
+                "1"
             )))
             .andReturn(Futures.immediateFuture(ImmutableMap.of(
                 SHARD_ID1,
-                "3",
-                SHARD_ID0,
-                "1"
+                "3"
             )));
     EasyMock.expect(
         taskClient.setEndOffsetsAsync(
             EasyMock.contains("sequenceName-0"),
             EasyMock.eq(ImmutableMap.of(
                 SHARD_ID1,
-                "3",
-                SHARD_ID0,
-                "1"
+                "3"
             )),
             EasyMock.eq(true)
         )
@@ -1296,13 +1292,9 @@ public class KinesisSupervisorTest extends 
EasyMockSupport
           "3",
           
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID1)
       );
-      Assert.assertEquals(
-          "1",
-          
taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(SHARD_ID0)
-      );
       // start sequenceNumbers should be exclusive for the second batch of 
tasks
       Assert.assertEquals(
-          ImmutableSet.of(SHARD_ID0, SHARD_ID1),
+          ImmutableSet.of(SHARD_ID1),
           ((KinesisIndexTask) 
task).getIOConfig().getStartSequenceNumbers().getExclusivePartitions()
       );
     }
@@ -3354,8 +3346,11 @@ public class KinesisSupervisorTest extends 
EasyMockSupport
   public void testGetCurrentTotalStats()
   {
     supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null, false);
+    supervisor.setPartitionIdsForTests(
+        ImmutableList.of(SHARD_ID0, SHARD_ID1)
+    );
     supervisor.addTaskGroupToActivelyReadingTaskGroup(
-        supervisor.getTaskGroupIdForPartition("0"),
+        supervisor.getTaskGroupIdForPartition(SHARD_ID0),
         ImmutableMap.of("0", "0"),
         Optional.absent(),
         Optional.absent(),
@@ -3364,7 +3359,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
     );
 
     supervisor.addTaskGroupToPendingCompletionTaskGroup(
-        supervisor.getTaskGroupIdForPartition("1"),
+        supervisor.getTaskGroupIdForPartition(SHARD_ID1),
         ImmutableMap.of("0", "0"),
         Optional.absent(),
         Optional.absent(),
@@ -3906,12 +3901,11 @@ public class KinesisSupervisorTest extends 
EasyMockSupport
             .anyTimes();
     TreeMap<Integer, Map<String, String>> checkpointsGroup0 = new TreeMap<>();
     checkpointsGroup0.put(0, ImmutableMap.of(
-        SHARD_ID2, "0",
-        SHARD_ID0, KinesisSequenceNumber.END_OF_SHARD_MARKER
+        SHARD_ID1, "0"
     ));
     TreeMap<Integer, Map<String, String>> checkpointsGroup1 = new TreeMap<>();
     checkpointsGroup1.put(1, ImmutableMap.of(
-        SHARD_ID1, "0"
+        SHARD_ID2, "0"
     ));
     // there would be 2 tasks, 1 for each task group
     
EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"),
 EasyMock.anyBoolean()))
@@ -3940,7 +3934,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
         new SeekableStreamStartSequenceNumbers<>(
             STREAM,
             ImmutableMap.of(
-                SHARD_ID2, "0"
+                SHARD_ID1, "0"
             ),
             ImmutableSet.of()
         );
@@ -3949,7 +3943,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
         new SeekableStreamEndSequenceNumbers<>(
             STREAM,
             ImmutableMap.of(
-                SHARD_ID2, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
+                SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
             )
         );
 
@@ -3957,7 +3951,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
         new SeekableStreamStartSequenceNumbers<>(
             STREAM,
             ImmutableMap.of(
-                SHARD_ID1, "0"
+                SHARD_ID2, "0"
             ),
             ImmutableSet.of()
         );
@@ -3966,7 +3960,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
         new SeekableStreamEndSequenceNumbers<>(
             STREAM,
             ImmutableMap.of(
-                SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
+                SHARD_ID2, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER
             )
         );
 
@@ -4163,11 +4157,21 @@ public class KinesisSupervisorTest extends 
EasyMockSupport
     Assert.assertEquals(group1ExpectedStartSequenceNumbers, 
group1Config.getStartSequenceNumbers());
     Assert.assertEquals(group1ExpectedEndSequenceNumbers, 
group1Config.getEndSequenceNumbers());
 
-    Map<Integer, Map<String, String>> expectedPartitionGroups = 
ImmutableMap.of(
-        0, ImmutableMap.of(SHARD_ID1, "-1"),
-        1, ImmutableMap.of(SHARD_ID2, "-1")
+    Map<Integer, Set<String>> expectedPartitionGroups = ImmutableMap.of(
+        0, ImmutableSet.of(SHARD_ID1),
+        1, ImmutableSet.of(SHARD_ID2)
     );
     Assert.assertEquals(expectedPartitionGroups, 
supervisor.getPartitionGroups());
+
+    ConcurrentHashMap<String, String> expectedPartitionOffsets = new 
ConcurrentHashMap<>(
+        ImmutableMap.of(
+            SHARD_ID2, "-1",
+            SHARD_ID1, "-1",
+            SHARD_ID0, "-1"
+        )
+    );
+    Assert.assertEquals(expectedPartitionOffsets, 
supervisor.getPartitionOffsets());
+
   }
 
   @Test
@@ -4557,11 +4561,19 @@ public class KinesisSupervisorTest extends 
EasyMockSupport
     Assert.assertEquals(group0ExpectedStartSequenceNumbers, 
group0Config.getStartSequenceNumbers());
     Assert.assertEquals(group0ExpectedEndSequenceNumbers, 
group0Config.getEndSequenceNumbers());
 
-    Map<Integer, Map<String, String>> expectedPartitionGroups = 
ImmutableMap.of(
-        0, ImmutableMap.of(SHARD_ID2, "-1"),
-        1, ImmutableMap.of()
+    Map<Integer, Set<String>> expectedPartitionGroups = ImmutableMap.of(
+        0, ImmutableSet.of(SHARD_ID2),
+        1, ImmutableSet.of()
+    );
+    ConcurrentHashMap<String, String> expectedPartitionOffsets = new 
ConcurrentHashMap<>(
+        ImmutableMap.of(
+            SHARD_ID2, "-1",
+            SHARD_ID1, "-1",
+            SHARD_ID0, "-1"
+        )
     );
     Assert.assertEquals(expectedPartitionGroups, 
supervisor.getPartitionGroups());
+    Assert.assertEquals(expectedPartitionOffsets, 
supervisor.getPartitionOffsets());
   }
 
   private TestableKinesisSupervisor getTestableSupervisor(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
index e1428c6..02cbb92 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java
@@ -64,13 +64,15 @@ public class SegmentTransactionalInsertAction implements 
TaskAction<SegmentPubli
   private final DataSourceMetadata startMetadata;
   @Nullable
   private final DataSourceMetadata endMetadata;
+  @Nullable
+  private final String dataSource;
 
   public static SegmentTransactionalInsertAction overwriteAction(
       @Nullable Set<DataSegment> segmentsToBeOverwritten,
       Set<DataSegment> segmentsToPublish
   )
   {
-    return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, 
segmentsToPublish, null, null);
+    return new SegmentTransactionalInsertAction(segmentsToBeOverwritten, 
segmentsToPublish, null, null, null);
   }
 
   public static SegmentTransactionalInsertAction appendAction(
@@ -79,21 +81,32 @@ public class SegmentTransactionalInsertAction implements 
TaskAction<SegmentPubli
       @Nullable DataSourceMetadata endMetadata
   )
   {
-    return new SegmentTransactionalInsertAction(null, segments, startMetadata, 
endMetadata);
+    return new SegmentTransactionalInsertAction(null, segments, startMetadata, 
endMetadata, null);
+  }
+
+  public static SegmentTransactionalInsertAction commitMetadataOnlyAction(
+      String dataSource,
+      DataSourceMetadata startMetadata,
+      DataSourceMetadata endMetadata
+  )
+  {
+    return new SegmentTransactionalInsertAction(null, null, startMetadata, 
endMetadata, dataSource);
   }
 
   @JsonCreator
   private SegmentTransactionalInsertAction(
       @JsonProperty("segmentsToBeOverwritten") @Nullable Set<DataSegment> 
segmentsToBeOverwritten,
-      @JsonProperty("segments") Set<DataSegment> segments,
+      @JsonProperty("segments") @Nullable Set<DataSegment> segments,
       @JsonProperty("startMetadata") @Nullable DataSourceMetadata 
startMetadata,
-      @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata
+      @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata,
+      @JsonProperty("dataSource") @Nullable String dataSource
   )
   {
     this.segmentsToBeOverwritten = segmentsToBeOverwritten;
-    this.segments = ImmutableSet.copyOf(segments);
+    this.segments = segments == null ? ImmutableSet.of() : 
ImmutableSet.copyOf(segments);
     this.startMetadata = startMetadata;
     this.endMetadata = endMetadata;
+    this.dataSource = dataSource;
   }
 
   @JsonProperty
@@ -123,6 +136,13 @@ public class SegmentTransactionalInsertAction implements 
TaskAction<SegmentPubli
     return endMetadata;
   }
 
+  @JsonProperty
+  @Nullable
+  public String getDataSource()
+  {
+    return dataSource;
+  }
+
   @Override
   public TypeReference<SegmentPublishResult> getReturnTypeReference()
   {
@@ -137,6 +157,24 @@ public class SegmentTransactionalInsertAction implements 
TaskAction<SegmentPubli
   @Override
   public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
   {
+    final SegmentPublishResult retVal;
+
+    if (segments.isEmpty()) {
+      // A stream ingestion task didn't ingest any rows and created no 
segments (e.g., all records were unparseable),
+      // but still needs to update metadata with the progress that the task 
made.
+      try {
+        retVal = 
toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly(
+            dataSource,
+            startMetadata,
+            endMetadata
+        );
+      }
+      catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      return retVal;
+    }
+
     final Set<DataSegment> allSegments = new HashSet<>(segments);
     if (segmentsToBeOverwritten != null) {
       allSegments.addAll(segmentsToBeOverwritten);
@@ -151,7 +189,6 @@ public class SegmentTransactionalInsertAction implements 
TaskAction<SegmentPubli
       }
     }
 
-    final SegmentPublishResult retVal;
     try {
       retVal = toolbox.getTaskLockbox().doInCriticalSection(
           task,
@@ -271,6 +308,7 @@ public class SegmentTransactionalInsertAction implements 
TaskAction<SegmentPubli
            ", segments=" + segments +
            ", startMetadata=" + startMetadata +
            ", endMetadata=" + endMetadata +
+           ", dataSource=" + dataSource +
            '}';
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index a9153b8..3d802ed 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -387,6 +387,8 @@ public class TaskQueue
    * Shuts down a task if it has not yet finished.
    *
    * @param taskId task to kill
+   * @param reasonFormat A format string indicating the shutdown reason
+   * @param args arguments for reasonFormat
    */
   public void shutdown(final String taskId, String reasonFormat, Object... 
args)
   {
@@ -407,6 +409,31 @@ public class TaskQueue
   }
 
   /**
+   * Shuts down a task, but records the task status as a success, unike {@link 
#shutdown(String, String, Object...)}
+   *
+   * @param taskId task to shutdown
+   * @param reasonFormat A format string indicating the shutdown reason
+   * @param args arguments for reasonFormat
+   */
+  public void shutdownWithSuccess(final String taskId, String reasonFormat, 
Object... args)
+  {
+    giant.lock();
+
+    try {
+      Preconditions.checkNotNull(taskId, "taskId");
+      for (final Task task : tasks) {
+        if (task.getId().equals(taskId)) {
+          notifyStatus(task, TaskStatus.success(taskId), reasonFormat, args);
+          break;
+        }
+      }
+    }
+    finally {
+      giant.unlock();
+    }
+  }
+
+  /**
    * Notify this queue that some task has an updated status. If this update is 
valid, the status will be persisted in
    * the task storage facility. If the status is a completed status, the task 
will be unlocked and no further
    * updates will be accepted.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
index 2b23706..4b0265f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java
@@ -27,11 +27,16 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.druid.data.input.Committer;
 import org.apache.druid.indexing.common.TaskToolbox;
 import 
org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
+import org.apache.druid.indexing.overlord.SegmentPublishResult;
 import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 import 
org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import org.apache.druid.timeline.DataSegment;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -42,6 +47,8 @@ import java.util.function.BiFunction;
 
 public class SequenceMetadata<PartitionIdType, SequenceOffsetType>
 {
+  private static final EmittingLogger log = new 
EmittingLogger(SequenceMetadata.class);
+
   private final int sequenceId;
   private final String sequenceName;
   private final Set<PartitionIdType> exclusiveStartPartitions;
@@ -301,7 +308,38 @@ public class SequenceMetadata<PartitionIdType, 
SequenceOffsetType>
       boolean useTransaction
   )
   {
-    return (mustBeNullOrEmptySegments, segmentsToPush, commitMetadata) -> {
+    return new SequenceMetadataTransactionalSegmentPublisher(
+        runner,
+        toolbox,
+        useTransaction
+    );
+  }
+
+  private class SequenceMetadataTransactionalSegmentPublisher
+      implements TransactionalSegmentPublisher
+  {
+    private final SeekableStreamIndexTaskRunner<PartitionIdType, 
SequenceOffsetType> runner;
+    private final TaskToolbox toolbox;
+    private final boolean useTransaction;
+
+    public SequenceMetadataTransactionalSegmentPublisher(
+        SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType> 
runner,
+        TaskToolbox toolbox,
+        boolean useTransaction
+    )
+    {
+      this.runner = runner;
+      this.toolbox = toolbox;
+      this.useTransaction = useTransaction;
+    }
+
+    @Override
+    public SegmentPublishResult publishAnnotatedSegments(
+        @Nullable Set<DataSegment> mustBeNullOrEmptySegments,
+        Set<DataSegment> segmentsToPush,
+        @Nullable Object commitMetadata
+    ) throws IOException
+    {
       if (mustBeNullOrEmptySegments != null && 
!mustBeNullOrEmptySegments.isEmpty()) {
         throw new ISE("WTH? stream ingestion tasks are overwriting 
segments[%s]", mustBeNullOrEmptySegments);
       }
@@ -316,14 +354,43 @@ public class SequenceMetadata<PartitionIdType, 
SequenceOffsetType>
       if 
(!getEndOffsets().equals(finalPartitions.getPartitionSequenceNumberMap())) {
         throw new ISE(
             "WTF?! Driver for sequence [%s], attempted to publish invalid 
metadata[%s].",
-            toString(),
+            SequenceMetadata.this.toString(),
             commitMetadata
         );
       }
 
       final SegmentTransactionalInsertAction action;
 
-      if (useTransaction) {
+      if (segmentsToPush.isEmpty()) {
+        // If a task ingested no data but made progress reading through its 
assigned partitions,
+        // we publish no segments but still need to update the supervisor with 
the current offsets
+        SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> 
startPartitions =
+            new SeekableStreamStartSequenceNumbers<>(
+                finalPartitions.getStream(),
+                getStartOffsets(),
+                exclusiveStartPartitions
+            );
+        if (isMetadataUnchanged(startPartitions, finalPartitions)) {
+          // if we created no segments and didn't change any offsets, just do 
nothing and return.
+          log.info(
+              "With empty segment set, start offsets [%s] and end offsets [%s] 
are the same, skipping metadata commit.",
+              startPartitions,
+              finalPartitions
+          );
+          return SegmentPublishResult.ok(segmentsToPush);
+        } else {
+          log.info(
+              "With empty segment set, start offsets [%s] and end offsets [%s] 
changed, committing new metadata.",
+              startPartitions,
+              finalPartitions
+          );
+          action = SegmentTransactionalInsertAction.commitMetadataOnlyAction(
+              runner.getAppenderator().getDataSource(),
+              runner.createDataSourceMetadata(startPartitions),
+              runner.createDataSourceMetadata(finalPartitions)
+          );
+        }
+      } else if (useTransaction) {
         action = SegmentTransactionalInsertAction.appendAction(
             segmentsToPush,
             runner.createDataSourceMetadata(
@@ -340,6 +407,22 @@ public class SequenceMetadata<PartitionIdType, 
SequenceOffsetType>
       }
 
       return toolbox.getTaskActionClient().submit(action);
-    };
+    }
+
+    @Override
+    public boolean supportsEmptyPublish()
+    {
+      return true;
+    }
+  }
+
+  private boolean isMetadataUnchanged(
+      SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> 
startSequenceNumbers,
+      SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> 
endSequenceNumbers
+  )
+  {
+    Map<PartitionIdType, SequenceOffsetType> startMap = 
startSequenceNumbers.getPartitionSequenceNumberMap();
+    Map<PartitionIdType, SequenceOffsetType> endMap = 
endSequenceNumbers.getPartitionSequenceNumberMap();
+    return startMap.equals(endMap);
   }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 5a3db11..90d95b2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -432,17 +432,18 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   // Map<{group RandomIdUtils}, List<{pending completion task groups}>>
   private final ConcurrentHashMap<Integer, CopyOnWriteArrayList<TaskGroup>> 
pendingCompletionTaskGroups = new ConcurrentHashMap<>();
 
-  // The starting sequence for a new partition in [partitionGroups] is 
initially set to getNotSetMarker(). When a new task group
-  // is created and is assigned partitions, if the sequence in 
[partitionGroups] is getNotSetMarker() it will take the starting
-  // sequence value from the metadata store, and if it can't find it there, 
from stream. Once a task begins
-  // publishing, the sequence in partitionGroups will be updated to the ending 
sequence of the publishing-but-not-yet-
+  // We keep two separate maps for tracking the current state of 
partition->task group mappings [partitionGroups] and partition->offset
+  // mappings [partitionOffsets]. The starting offset for a new partition in 
[partitionOffsets] is initially set to getNotSetMarker(). When a new task group
+  // is created and is assigned partitions, if the offset for an assigned 
partition in [partitionOffsets] is getNotSetMarker() it will take the starting
+  // offset value from the metadata store, and if it can't find it there, from 
stream. Once a task begins
+  // publishing, the offset in [partitionOffsets] will be updated to the 
ending offset of the publishing-but-not-yet-
   // completed task, which will cause the next set of tasks to begin reading 
from where the previous task left
-  // off. If that previous task now fails, we will set the sequence in 
[partitionGroups] back to getNotSetMarker() which will
-  // cause successive tasks to again grab their starting sequence from 
metadata store. This mechanism allows us to
+  // off. If that previous task now fails, we will set the offset in 
[partitionOffsets] back to getNotSetMarker() which will
+  // cause successive tasks to again grab their starting offset from metadata 
store. This mechanism allows us to
   // start up successive tasks without waiting for the previous tasks to 
succeed and still be able to handle task
   // failures during publishing.
-  // Map<{group RandomIdUtils}, Map<{partition RandomIdUtils}, 
{startingOffset}>>
-  protected final ConcurrentHashMap<Integer, 
ConcurrentHashMap<PartitionIdType, SequenceOffsetType>> partitionGroups = new 
ConcurrentHashMap<>();
+  protected final ConcurrentHashMap<PartitionIdType, SequenceOffsetType> 
partitionOffsets = new ConcurrentHashMap<>();
+  protected final ConcurrentHashMap<Integer, Set<PartitionIdType>> 
partitionGroups = new ConcurrentHashMap<>();
 
   protected final ObjectMapper sortingMapper;
   protected final List<PartitionIdType> partitionIds = new 
CopyOnWriteArrayList<>();
@@ -801,7 +802,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       boolean includeOffsets
   )
   {
-    int numPartitions = 
partitionGroups.values().stream().mapToInt(Map::size).sum();
+    int numPartitions = 
partitionGroups.values().stream().mapToInt(Set::size).sum();
 
     final SeekableStreamSupervisorReportPayload<PartitionIdType, 
SequenceOffsetType> payload = createReportPayload(
         numPartitions,
@@ -1128,6 +1129,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                                ));
       activelyReadingTaskGroups.clear();
       partitionGroups.clear();
+      partitionOffsets.clear();
     } else {
       if (!checkSourceMetadataMatch(dataSourceMetadata)) {
         throw new IAE(
@@ -1211,8 +1213,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                          activelyReadingTaskGroups.remove(groupId);
                          // killTaskGroupForPartitions() cleans up 
partitionGroups.
                          // Add the removed groups back.
-                         partitionGroups.computeIfAbsent(groupId, k -> new 
ConcurrentHashMap<>())
-                                        .put(partition, getNotSetMarker());
+                         partitionGroups.computeIfAbsent(groupId, k -> new 
HashSet<>());
+                         partitionOffsets.put(partition, getNotSetMarker());
                        });
         } else {
           throw new ISE("Unable to reset metadata");
@@ -1237,6 +1239,16 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     }
   }
 
+  private void killTaskWithSuccess(final String id, String reasonFormat, 
Object... args)
+  {
+    Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
+    if (taskQueue.isPresent()) {
+      taskQueue.get().shutdownWithSuccess(id, reasonFormat, args);
+    } else {
+      log.error("Failed to get task queue because I'm not the leader!");
+    }
+  }
+
   private void killTasksInGroup(TaskGroup taskGroup, String reasonFormat, 
Object... args)
   {
     if (taskGroup != null) {
@@ -1287,6 +1299,31 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> 
seekableStreamIndexTask = (SeekableStreamIndexTask<PartitionIdType, 
SequenceOffsetType>) task;
       final String taskId = task.getId();
 
+      // Check if the task has any inactive partitions. If so, terminate the 
task. Even if some of the
+      // partitions assigned to the task are still active, we still terminate 
the task. We terminate such tasks early
+      // to more rapidly ensure that all active partitions are evenly 
distributed and being read, and to avoid
+      // having to map expired partitions which are no longer tracked in 
partitionIds to a task group.
+      if (supportsPartitionExpiration()) {
+        Set<PartitionIdType> taskPartitions = 
seekableStreamIndexTask.getIOConfig()
+                                                                     
.getStartSequenceNumbers()
+                                                                     
.getPartitionSequenceNumberMap()
+                                                                     .keySet();
+        Set<PartitionIdType> inactivePartitionsInTask = Sets.difference(
+            taskPartitions,
+            new HashSet<>(partitionIds)
+        );
+        if (!inactivePartitionsInTask.isEmpty()) {
+          killTaskWithSuccess(
+              taskId,
+              "Task [%s] with partition set [%s] has inactive partitions [%s], 
stopping task.",
+              taskId,
+              taskPartitions,
+              inactivePartitionsInTask
+          );
+          continue;
+        }
+      }
+
       // Determine which task group this task belongs to based on one of the 
partitions handled by this task. If we
       // later determine that this task is actively reading, we will make sure 
that it matches our current partition
       // allocation (getTaskGroupIdForPartition(partition) should return the 
same value for every partition being read
@@ -1338,24 +1375,40 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
                           Map<PartitionIdType, SequenceOffsetType> 
publishingTaskEndOffsets = taskClient.getEndOffsets(
                               taskId);
 
+                          // If we received invalid endOffset values, we clear 
the known offset to refetch the last committed offset
+                          // from metadata. If any endOffset values are 
invalid, we treat the entire set as invalid as a safety measure.
+                          boolean endOffsetsAreInvalid = false;
                           for (Entry<PartitionIdType, SequenceOffsetType> 
entry : publishingTaskEndOffsets.entrySet()) {
                             PartitionIdType partition = entry.getKey();
                             SequenceOffsetType sequence = entry.getValue();
-                            ConcurrentHashMap<PartitionIdType, 
SequenceOffsetType> partitionOffsets = partitionGroups.get(
-                                getTaskGroupIdForPartition(partition)
-                            );
+                            if (sequence.equals(getEndOfPartitionMarker())) {
+                              log.info(
+                                  "Got end of partition marker for partition 
[%s] from task [%s] in discoverTasks, clearing partition offset to refetch from 
metadata..",
+                                  taskId,
+                                  partition
+                              );
+                              endOffsetsAreInvalid = true;
+                              partitionOffsets.put(partition, 
getNotSetMarker());
+                            }
+                          }
 
-                            boolean succeeded;
-                            do {
-                              succeeded = true;
-                              SequenceOffsetType previousOffset = 
partitionOffsets.putIfAbsent(partition, sequence);
-                              if (previousOffset != null
-                                  && (makeSequenceNumber(previousOffset)
-                                  .compareTo(makeSequenceNumber(
-                                      sequence))) < 0) {
-                                succeeded = 
partitionOffsets.replace(partition, previousOffset, sequence);
-                              }
-                            } while (!succeeded);
+                          if (!endOffsetsAreInvalid) {
+                            for (Entry<PartitionIdType, SequenceOffsetType> 
entry : publishingTaskEndOffsets.entrySet()) {
+                              PartitionIdType partition = entry.getKey();
+                              SequenceOffsetType sequence = entry.getValue();
+
+                              boolean succeeded;
+                              do {
+                                succeeded = true;
+                                SequenceOffsetType previousOffset = 
partitionOffsets.putIfAbsent(partition, sequence);
+                                if (previousOffset != null
+                                    && (makeSequenceNumber(previousOffset)
+                                    .compareTo(makeSequenceNumber(
+                                        sequence))) < 0) {
+                                  succeeded = 
partitionOffsets.replace(partition, previousOffset, sequence);
+                                }
+                              } while (!succeeded);
+                            }
                           }
                         } else {
                           for (PartitionIdType partition : 
seekableStreamIndexTask.getIOConfig()
@@ -1584,6 +1637,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           final SortedMap<Integer, Map<PartitionIdType, SequenceOffsetType>> 
latestCheckpoints = new TreeMap<>(
               taskCheckpoints.tailMap(earliestConsistentSequenceId.get())
           );
+
           log.info("Setting taskGroup sequences to [%s] for group [%d]", 
latestCheckpoints, groupId);
           taskGroup.checkpointSequences.clear();
           taskGroup.checkpointSequences.putAll(latestCheckpoints);
@@ -1622,7 +1676,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       // clear state about the taskgroup so that get latest sequence 
information is fetched from metadata store
       log.warn("Clearing task group [%d] information as no valid tasks left 
the group", groupId);
       activelyReadingTaskGroups.remove(groupId);
-      partitionGroups.get(groupId).replaceAll((partition, sequence) -> 
getNotSetMarker());
+      for (PartitionIdType partitionId : taskGroup.startingSequences.keySet()) 
{
+        partitionOffsets.put(partitionId, getNotSetMarker());
+      }
     }
 
     taskSequences.stream().filter(taskIdSequences -> 
tasksToKill.contains(taskIdSequences.lhs)).forEach(
@@ -1863,6 +1919,18 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         previouslyExpiredPartitions
     );
 
+    Set<PartitionIdType> activePartitionsIdsFromSupplier = Sets.difference(
+        partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions,
+        closedPartitions
+    );
+
+    Set<PartitionIdType> newlyClosedPartitions = Sets.intersection(
+        closedPartitions,
+        new HashSet<>(previousPartitionIds)
+    );
+
+    log.debug("active partitions from supplier: " + 
activePartitionsIdsFromSupplier);
+
     if (partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions.size() != 
partitionIdsFromSupplier.size()) {
       // this should never happen, but we check for it and exclude the expired 
partitions if they somehow reappear
       log.warn(
@@ -1871,9 +1939,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           partitionIdsFromSupplier
       );
     }
-    if (partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions.size() == 
0) {
+    if (activePartitionsIdsFromSupplier.size() == 0) {
       String errMsg = StringUtils.format(
-          "No partitions found for stream [%s] after removing previously 
expired partitions",
+          "No active partitions found for stream [%s] after removing closed 
and previously expired partitions",
           ioConfig.getStream()
       );
       stateManager.recordThrowableEvent(new StreamException(new ISE(errMsg)));
@@ -1887,29 +1955,38 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         log.info("partition [%s] is closed and has no more data, skipping.", 
partitionId);
         continue;
       }
-      if (!initialPartitionDiscovery && 
!this.partitionIds.contains(partitionId)) {
-        subsequentlyDiscoveredPartitions.add(partitionId);
+
+      if (!this.partitionIds.contains(partitionId)) {
+        partitionIds.add(partitionId);
+
+        if (!initialPartitionDiscovery) {
+          subsequentlyDiscoveredPartitions.add(partitionId);
+        }
       }
     }
 
-
-    // When partitions expire, we need to recompute the task group 
assignments, considering only non-expired partitions,
-    // to ensure that we have even distribution of readable partitions across 
tasks.
+    // When partitions expire, we need to recompute the task group 
assignments, considering only
+    // non-closed and non-expired partitions, to ensure that we have even 
distribution of active
+    // partitions across tasks.
     if (supportsPartitionExpiration()) {
-      cleanupExpiredPartitions(
+      cleanupClosedAndExpiredPartitions(
           storedPartitions,
+          newlyClosedPartitions,
+          activePartitionsIdsFromSupplier,
           previouslyExpiredPartitions,
           partitionIdsFromSupplier
       );
     }
 
-    for (PartitionIdType partitionId : 
partitionIdsFromSupplierWithoutPreviouslyExpiredPartitions) {
+    for (PartitionIdType partitionId : activePartitionsIdsFromSupplier) {
       int taskGroupId = getTaskGroupIdForPartition(partitionId);
-      ConcurrentHashMap<PartitionIdType, SequenceOffsetType> partitionMap = 
partitionGroups.computeIfAbsent(
+      Set<PartitionIdType> partitionGroup = partitionGroups.computeIfAbsent(
           taskGroupId,
-          k -> new ConcurrentHashMap<>()
+          k -> new HashSet<>()
       );
-      if (partitionMap.putIfAbsent(partitionId, getNotSetMarker()) == null) {
+      partitionGroup.add(partitionId);
+
+      if (partitionOffsets.putIfAbsent(partitionId, getNotSetMarker()) == 
null) {
         log.info(
             "New partition [%s] discovered for stream [%s], added to task 
group [%d]",
             partitionId,
@@ -1952,16 +2029,19 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * <p>
    * It will mark the expired partitions in metadata and recompute the 
partition->task group mappings, updating
    * the metadata, the partitionIds list, and the partitionGroups mappings.
-   * <p>
-   * Note that partition IDs that were newly discovered (appears in record 
supplier set but not in metadata set)
-   * are not added to the recomputed partition groups here. This is handled 
later in
-   * {@link #updatePartitionDataFromStream} after this method is called.
    *
-   * @param storedPartitions         Set of partitions previously tracked, 
from the metadata store
+   * @param storedPartitions Set of partitions previously tracked, from the 
metadata store
+   * @param newlyClosedPartitions Set of partitions that are closed in the 
metadata store but still present in the
+   *                              current {@link #partitionIds}
+   * @param activePartitionsIdsFromSupplier Set of partitions currently 
returned by the record supplier, but with
+   *                                        any partitions that are 
closed/expired in the metadata store removed
+   * @param previouslyExpiredPartitions Set of partitions that are recorded as 
expired in the metadata store
    * @param partitionIdsFromSupplier Set of partitions currently returned by 
the record supplier.
    */
-  private void cleanupExpiredPartitions(
+  private void cleanupClosedAndExpiredPartitions(
       Set<PartitionIdType> storedPartitions,
+      Set<PartitionIdType> newlyClosedPartitions,
+      Set<PartitionIdType> activePartitionsIdsFromSupplier,
       Set<PartitionIdType> previouslyExpiredPartitions,
       Set<PartitionIdType> partitionIdsFromSupplier
   )
@@ -1971,7 +2051,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     Set<PartitionIdType> newlyExpiredPartitions = 
Sets.difference(storedPartitions, previouslyExpiredPartitions);
     newlyExpiredPartitions = Sets.difference(newlyExpiredPartitions, 
partitionIdsFromSupplier);
 
-    if (newlyExpiredPartitions.size() > 0) {
+    if (!newlyExpiredPartitions.isEmpty()) {
       log.info("Detected newly expired partitions: " + newlyExpiredPartitions);
 
       // Mark partitions as expired in metadata
@@ -1987,28 +2067,10 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
       validateMetadataPartitionExpiration(newlyExpiredPartitions, 
currentMetadata, cleanedMetadata);
 
-      // Compute new partition groups, only including partitions that are
-      // still in partitionIdsFromSupplier
-      Map<Integer, ConcurrentHashMap<PartitionIdType, SequenceOffsetType>> 
newPartitionGroups =
-          recomputePartitionGroupsForExpiration(partitionIdsFromSupplier);
-
-      validatePartitionGroupReassignments(newPartitionGroups);
-
-      log.info("New partition groups after partition expiration: " + 
newPartitionGroups);
-
       try {
         boolean success = 
indexerMetadataStorageCoordinator.resetDataSourceMetadata(dataSource, 
cleanedMetadata);
         if (success) {
-          partitionIds.clear();
-          partitionIds.addAll(partitionIdsFromSupplier);
 
-          for (Integer groupId : partitionGroups.keySet()) {
-            if (newPartitionGroups.containsKey(groupId)) {
-              partitionGroups.put(groupId, newPartitionGroups.get(groupId));
-            } else {
-              partitionGroups.put(groupId, new ConcurrentHashMap<>());
-            }
-          }
         } else {
           log.error("Failed to update datasource metadata[%s] with expired 
partitions removed", cleanedMetadata);
         }
@@ -2017,6 +2079,33 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         throw new RuntimeException(ioe);
       }
     }
+
+    if (!newlyClosedPartitions.isEmpty()) {
+      log.info("Detected newly closed partitions: " + newlyClosedPartitions);
+    }
+
+    // Partitions have been dropped
+    if (!newlyClosedPartitions.isEmpty() || !newlyExpiredPartitions.isEmpty()) 
{
+      // Compute new partition groups, only including partitions that are
+      // still in partitionIdsFromSupplier and not closed
+      Map<Integer, Set<PartitionIdType>> newPartitionGroups =
+          
recomputePartitionGroupsForExpiration(activePartitionsIdsFromSupplier);
+
+      validatePartitionGroupReassignments(activePartitionsIdsFromSupplier, 
newPartitionGroups);
+
+      log.info("New partition groups after removing closed and expired 
partitions: " + newPartitionGroups);
+
+      partitionIds.clear();
+      partitionIds.addAll(activePartitionsIdsFromSupplier);
+
+      for (Integer groupId : partitionGroups.keySet()) {
+        if (newPartitionGroups.containsKey(groupId)) {
+          partitionGroups.put(groupId, newPartitionGroups.get(groupId));
+        } else {
+          partitionGroups.put(groupId, new HashSet<>());
+        }
+      }
+    }
   }
 
   /**
@@ -2032,7 +2121,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * @param availablePartitions
    * @return a remapped copy of partitionGroups, containing only the 
partitions in availablePartitions
    */
-  protected Map<Integer, ConcurrentHashMap<PartitionIdType, 
SequenceOffsetType>> recomputePartitionGroupsForExpiration(
+  protected Map<Integer, Set<PartitionIdType>> 
recomputePartitionGroupsForExpiration(
       Set<PartitionIdType> availablePartitions
   )
   {
@@ -2056,6 +2145,14 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     throw new UnsupportedOperationException("This supervisor type does not 
support partition expiration.");
   }
 
+  protected SeekableStreamDataSourceMetadata<PartitionIdType, 
SequenceOffsetType> createDataSourceMetadataWithClosedPartitions(
+      SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> 
currentMetadata,
+      Set<PartitionIdType> closedPartitionIds
+  )
+  {
+    throw new UnsupportedOperationException("This supervisor type does not 
support partition closing.");
+  }
+
   /**
    * Perform a sanity check on the datasource metadata returned by
    * {@link #createDataSourceMetadataWithExpiredPartitions}.
@@ -2123,33 +2220,18 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
    * @param newPartitionGroups new metadata without expired partitions, 
generated by the subclass
    */
   private void validatePartitionGroupReassignments(
-      Map<Integer, ConcurrentHashMap<PartitionIdType, SequenceOffsetType>> 
newPartitionGroups
+      Set<PartitionIdType> activePartitionsIdsFromSupplier,
+      Map<Integer, Set<PartitionIdType>> newPartitionGroups
   )
   {
-    Map<PartitionIdType, SequenceOffsetType> oldPartitionMappings = new 
HashMap<>();
-    for (ConcurrentHashMap<PartitionIdType, SequenceOffsetType> oldGroup : 
partitionGroups.values()) {
-      // we don't care about old task group mappings, only the 
partition-offset mappings
-      oldPartitionMappings.putAll(oldGroup);
-    }
-
-    for (ConcurrentHashMap<PartitionIdType, SequenceOffsetType> newGroup : 
newPartitionGroups.values()) {
-      for (Entry<PartitionIdType, SequenceOffsetType> newPartitionMapping : 
newGroup.entrySet()) {
-        if (!oldPartitionMappings.containsKey(newPartitionMapping.getKey())) {
+    for (Set<PartitionIdType> newGroup : newPartitionGroups.values()) {
+      for (PartitionIdType partitionInNewGroup : newGroup) {
+        if (!activePartitionsIdsFromSupplier.contains(partitionInNewGroup)) {
           // recomputing the groups without the expired partitions added an 
unknown partition somehow
           throw new IAE(
               "Recomputed partition groups [%s] contains unexpected partition 
ID [%s], old partition groups: [%s]",
               newPartitionGroups,
-              newPartitionMapping.getKey(),
-              partitionGroups
-          );
-        }
-
-        SequenceOffsetType oldOffset = 
oldPartitionMappings.get(newPartitionMapping.getKey());
-        if (!oldOffset.equals(newPartitionMapping.getValue())) {
-          throw new IAE(
-              "Recomputed partition groups [%s] has offset mismatch for 
partition ID [%s], original partition map: [%s]",
-              newPartitionGroups,
-              newPartitionMapping.getKey(),
+              partitionInNewGroup,
               partitionGroups
           );
         }
@@ -2271,9 +2353,29 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         group.completionTimeout = 
DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
         pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new 
CopyOnWriteArrayList<>()).add(group);
 
-        // set endOffsets as the next startOffsets
+
+        boolean endOffsetsAreInvalid = false;
         for (Entry<PartitionIdType, SequenceOffsetType> entry : 
endOffsets.entrySet()) {
-          partitionGroups.get(groupId).put(entry.getKey(), entry.getValue());
+          if (entry.getValue().equals(getEndOfPartitionMarker())) {
+            log.info(
+                "Got end of partition marker for partition [%s] in 
checkTaskDuration, not updating partition offset.",
+                entry.getKey()
+            );
+            endOffsetsAreInvalid = true;
+          }
+        }
+
+        // set endOffsets as the next startOffsets
+        // If we received invalid endOffset values, we clear the known offset 
to refetch the last committed offset
+        // from metadata. If any endOffset values are invalid, we treat the 
entire set as invalid as a safety measure.
+        if (!endOffsetsAreInvalid) {
+          for (Entry<PartitionIdType, SequenceOffsetType> entry : 
endOffsets.entrySet()) {
+            partitionOffsets.put(entry.getKey(), entry.getValue());
+          }
+        } else {
+          for (Entry<PartitionIdType, SequenceOffsetType> entry : 
endOffsets.entrySet()) {
+            partitionOffsets.put(entry.getKey(), getNotSetMarker());
+          }
         }
       } else {
         for (String id : group.taskIds()) {
@@ -2285,7 +2387,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         }
         // clear partitionGroups, so that latest sequences from db is used as 
start sequences not the stale ones
         // if tasks did some successful incremental handoffs
-        partitionGroups.get(groupId).replaceAll((partition, sequence) -> 
getNotSetMarker());
+        for (PartitionIdType partitionId : group.startingSequences.keySet()) {
+          partitionOffsets.put(partitionId, getNotSetMarker());
+        }
       }
 
       // remove this task group from the list of current task groups now that 
it has been handled
@@ -2560,7 +2664,10 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
           }
 
           // reset partitions sequences for this task group so that they will 
be re-read from metadata storage
-          partitionGroups.get(groupId).replaceAll((partition, sequence) -> 
getNotSetMarker());
+          for (PartitionIdType partitionId : group.startingSequences.keySet()) 
{
+            partitionOffsets.put(partitionId, getNotSetMarker());
+          }
+
           // kill all the tasks in this pending completion group
           killTasksInGroup(
               group,
@@ -2671,7 +2778,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     // check that there is a current task group for each group of partitions 
in [partitionGroups]
     for (Integer groupId : partitionGroups.keySet()) {
       if (!activelyReadingTaskGroups.containsKey(groupId)) {
-        log.info("Creating new task group [%d] for partitions %s", groupId, 
partitionGroups.get(groupId).keySet());
+        log.info("Creating new task group [%d] for partitions %s", groupId, 
partitionGroups.get(groupId));
         Optional<DateTime> minimumMessageTime;
         if (ioConfig.getLateMessageRejectionStartDateTime().isPresent()) {
           minimumMessageTime = 
Optional.of(ioConfig.getLateMessageRejectionStartDateTime().get());
@@ -2801,22 +2908,21 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   )
   {
     ImmutableMap.Builder<PartitionIdType, 
OrderedSequenceNumber<SequenceOffsetType>> builder = ImmutableMap.builder();
-    for (Entry<PartitionIdType, SequenceOffsetType> entry : 
partitionGroups.get(groupId).entrySet()) {
-      PartitionIdType partition = entry.getKey();
-      SequenceOffsetType sequence = entry.getValue();
+    for (PartitionIdType partitionId : partitionGroups.get(groupId)) {
+      SequenceOffsetType sequence = partitionOffsets.get(partitionId);
 
       if (!getNotSetMarker().equals(sequence)) {
         // if we are given a startingOffset (set by a previous task group 
which is pending completion) then use it
         if (!isEndOfShard(sequence)) {
-          builder.put(partition, makeSequenceNumber(sequence, 
useExclusiveStartSequenceNumberForNonFirstSequence()));
+          builder.put(partitionId, makeSequenceNumber(sequence, 
useExclusiveStartSequenceNumberForNonFirstSequence()));
         }
       } else {
         // if we don't have a startingOffset (first run or we had some 
previous failures and reset the sequences) then
         // get the sequence from metadata storage (if available) or 
Kafka/Kinesis (otherwise)
-        OrderedSequenceNumber<SequenceOffsetType> offsetFromStorage = 
getOffsetFromStorageForPartition(partition);
+        OrderedSequenceNumber<SequenceOffsetType> offsetFromStorage = 
getOffsetFromStorageForPartition(partitionId);
 
         if (offsetFromStorage != null) {
-          builder.put(partition, offsetFromStorage);
+          builder.put(partitionId, offsetFromStorage);
         }
       }
     }
@@ -3131,7 +3237,7 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   }
 
   @VisibleForTesting
-  public ConcurrentHashMap<Integer, ConcurrentHashMap<PartitionIdType, 
SequenceOffsetType>> getPartitionGroups()
+  public ConcurrentHashMap<Integer, Set<PartitionIdType>> getPartitionGroups()
   {
     return partitionGroups;
   }
@@ -3142,6 +3248,23 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     return this.partitionIds.isEmpty();
   }
 
+  public ConcurrentHashMap<PartitionIdType, SequenceOffsetType> 
getPartitionOffsets()
+  {
+    return partitionOffsets;
+  }
+
+  /**
+   * Should never be called outside of tests.
+   */
+  @VisibleForTesting
+  public void setPartitionIdsForTests(
+      List<PartitionIdType> partitionIdsForTests
+  )
+  {
+    partitionIds.clear();
+    partitionIds.addAll(partitionIdsForTests);
+  }
+
   /**
    * creates a specific task IOConfig instance for Kafka/Kinesis
    *
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index d270ea8..0a370d5 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -119,6 +119,16 @@ public class TestIndexerMetadataStorageCoordinator 
implements IndexerMetadataSto
   }
 
   @Override
+  public SegmentPublishResult commitMetadataOnly(
+      String dataSource,
+      DataSourceMetadata startMetadata,
+      DataSourceMetadata endMetadata
+  )
+  {
+    throw new UnsupportedOperationException("Not implemented, no test uses 
this currently.");
+  }
+
+  @Override
   public SegmentIdWithShardSpec allocatePendingSegment(
       String dataSource,
       String sequenceName,
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 97205a8..356223a 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -180,6 +180,33 @@ public interface IndexerMetadataStorageCoordinator
   ) throws IOException;
 
   /**
+   * Similar to {@link #announceHistoricalSegments(Set)}, but meant for 
streaming ingestion tasks for handling
+   * the case where the task ingested no records and created no segments, but 
still needs to update the metadata
+   * with the progress that the task made.
+   *
+   * The metadata should undergo the same validation checks as performed by 
announceHistoricalSegments.
+   *
+   *
+   * @param dataSource the datasource
+   * @param startMetadata dataSource metadata pre-insert must match this 
startMetadata according to
+   *                      {@link 
DataSourceMetadata#matches(DataSourceMetadata)}.
+   * @param endMetadata   dataSource metadata post-insert will have this 
endMetadata merged in with
+   *                      {@link DataSourceMetadata#plus(DataSourceMetadata)}.
+   *
+   * @return segment publish result indicating transaction success or failure.
+   * This method must only return a failure code if it is sure that the 
transaction did not happen. If it is not sure,
+   * it must throw an exception instead.
+   *
+   * @throws IllegalArgumentException if either startMetadata and endMetadata 
are null
+   * @throws RuntimeException         if the state of metadata storage after 
this call is unknown
+   */
+  SegmentPublishResult commitMetadataOnly(
+      String dataSource,
+      DataSourceMetadata startMetadata,
+      DataSourceMetadata endMetadata
+  );
+
+  /**
    * Read dataSource metadata. Returns null if there is no metadata.
    */
   DataSourceMetadata getDataSourceMetadata(String dataSource);
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index dbb690e..da8a783 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -354,6 +354,74 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   }
 
   @Override
+  public SegmentPublishResult commitMetadataOnly(
+      String dataSource,
+      DataSourceMetadata startMetadata,
+      DataSourceMetadata endMetadata
+  )
+  {
+    if (dataSource == null) {
+      throw new IllegalArgumentException("datasource name cannot be null");
+    }
+    if (startMetadata == null) {
+      throw new IllegalArgumentException("start metadata cannot be null");
+    }
+    if (endMetadata == null) {
+      throw new IllegalArgumentException("end metadata cannot be null");
+    }
+
+    final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false);
+
+    try {
+      return connector.retryTransaction(
+          new TransactionCallback<SegmentPublishResult>()
+          {
+            @Override
+            public SegmentPublishResult inTransaction(
+                final Handle handle,
+                final TransactionStatus transactionStatus
+            ) throws Exception
+            {
+              // Set definitelyNotUpdated back to false upon retrying.
+              definitelyNotUpdated.set(false);
+
+              final DataSourceMetadataUpdateResult result = 
updateDataSourceMetadataWithHandle(
+                  handle,
+                  dataSource,
+                  startMetadata,
+                  endMetadata
+              );
+
+              if (result != DataSourceMetadataUpdateResult.SUCCESS) {
+                // Metadata was definitely not updated.
+                transactionStatus.setRollbackOnly();
+                definitelyNotUpdated.set(true);
+
+                if (result == DataSourceMetadataUpdateResult.FAILURE) {
+                  throw new RuntimeException("Aborting transaction!");
+                } else if (result == DataSourceMetadataUpdateResult.TRY_AGAIN) 
{
+                  throw new RetryTransactionException("Aborting transaction!");
+                }
+              }
+
+              return SegmentPublishResult.ok(ImmutableSet.of());
+            }
+          },
+          3,
+          SQLMetadataConnector.DEFAULT_MAX_TRIES
+      );
+    }
+    catch (CallbackFailedException e) {
+      if (definitelyNotUpdated.get()) {
+        return SegmentPublishResult.fail(e.getMessage());
+      } else {
+        // Must throw exception if we are not sure if we updated or not.
+        throw e;
+      }
+    }
+  }
+
+  @Override
   public SegmentIdWithShardSpec allocatePendingSegment(
       final String dataSource,
       final String sequenceName,
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index 0fdac39..eeb3269 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -551,10 +551,22 @@ public abstract class BaseAppenderatorDriver implements 
Closeable
   )
   {
     if (segmentsAndMetadata.getSegments().isEmpty()) {
-      log.debug("Nothing to publish, skipping publish step.");
-      final SettableFuture<SegmentsAndMetadata> retVal = 
SettableFuture.create();
-      retVal.set(segmentsAndMetadata);
-      return retVal;
+      if (!publisher.supportsEmptyPublish()) {
+        log.info("Nothing to publish, skipping publish step.");
+        final SettableFuture<SegmentsAndMetadata> retVal = 
SettableFuture.create();
+        retVal.set(segmentsAndMetadata);
+        return retVal;
+      } else {
+        // Sanity check: if we have no segments to publish, but the 
appenderator did ingest > 0 valid rows,
+        // something is wrong. This check could be expanded to cover 
publishers that return false for
+        // supportsEmptyPublish, but is kept limited for now until further 
testing.
+        if (appenderator.getTotalRowCount() != 0) {
+          throw new ISE(
+              "Attempting to publish with empty segment set, but total row 
count was not 0: [%s].",
+              appenderator.getTotalRowCount()
+          );
+        }
+      }
     }
 
     final Object metadata = segmentsAndMetadata.getCommitMetadata();
diff --git 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
index 8ebc662..cb9b9ff 100644
--- 
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
+++ 
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java
@@ -68,6 +68,15 @@ public interface TransactionalSegmentPublisher
     );
   }
 
+  /**
+   * @return true if this publisher has action to take when publishing with an 
empty segment set.
+   *         The publisher used by the seekable stream tasks is an example 
where this is true.
+   */
+  default boolean supportsEmptyPublish()
+  {
+    return false;
+  }
+
   static Set<DataSegment> annotateAtomicUpdateGroupSize(Set<DataSegment> 
segments)
   {
     final Map<Interval, List<DataSegment>> intervalToSegments = new 
HashMap<>();
diff --git a/website/.spelling b/website/.spelling
index a38cf47..0be8bb4 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -670,6 +670,8 @@ taskDuration
 9.2dist
 KinesisSupervisorIOConfig
 KinesisSupervisorTuningConfig
+Resharding
+resharding
 
LZ4LZFuncompressedLZ4LZ4LZFuncompressednoneLZ4autolongsautolongslongstypeconcisetyperoaringcompressRunOnSerializationtruetypestreamendpointreplicastaskCounttaskCount
 deaggregate
 druid-kinesis-indexing-service


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to