This is an automated email from the ASF dual-hosted git repository.

abhishekrb19 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a289901680c fix: preserve DimensionValueSetShardSpec on upgraded 
append segments (#19615)
a289901680c is described below

commit a289901680c021b28560916ee1fd4ff5394a5c79
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Mon Jun 22 15:44:09 2026 -0700

    fix: preserve DimensionValueSetShardSpec on upgraded append segments 
(#19615)
    
    When a concurrent REPLACE upgrades a still-appending streaming task, the
      upgraded (new-version) copy of each append segment previously adopted the
      pending segment's plain NumberedShardSpec, dropping the
      DimensionValueSetShardSpec stamped at publish time. This made upgraded
      segments unprunable by the broker.
    
      The upgraded copy now takes its partition number and core-partition count
      from the pending segment while carrying forward the original segment's
      partitionDimensionValues when it is a DimensionValueSetShardSpec. The
      value-set guarantee holds because the upgraded copy serves the same rows
      as the original append segment.
---
 .../IndexerSQLMetadataStorageCoordinator.java      |  23 ++-
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 184 ++++++++++++++++++++-
 2 files changed, 205 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index fbe30a31e88..3aa06b8e371 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -65,6 +65,7 @@ import org.apache.druid.timeline.Partitions;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.SegmentTimeline;
 import org.apache.druid.timeline.TimelineObjectHolder;
+import org.apache.druid.timeline.partition.DimensionValueSetShardSpec;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.apache.druid.timeline.partition.PartialShardSpec;
 import org.apache.druid.timeline.partition.PartitionChunk;
@@ -1223,10 +1224,11 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
             final SegmentId newVersionSegmentId = 
pendingSegment.getId().asSegmentId();
             newVersionSegmentToParent.put(newVersionSegmentId, 
oldSegment.getId());
             upgradedFromSegmentIdMap.put(newVersionSegmentId.toString(), 
oldSegment.getId().toString());
+            final ShardSpec upgradedShardSpec = 
getUpgradedSegmentShardSpec(oldSegment, pendingSegment);
             allSegmentsToInsert.add(DataSegment.builder(oldSegment)
                                                
.interval(newVersionSegmentId.getInterval())
                                                
.version(newVersionSegmentId.getVersion())
-                                               
.shardSpec(pendingSegment.getId().getShardSpec())
+                                               .shardSpec(upgradedShardSpec)
                                                .build());
           }
         }
@@ -1287,6 +1289,25 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     }
   }
 
+  /**
+   * Builds the shard spec for the upgraded copy of an append segment. The 
partition number and core-partition count come
+   * from the pending segment, but a {@link DimensionValueSetShardSpec} on the 
original is preserved so the upgraded copy
+   * stays prunable by the broker.
+   */
+  private static ShardSpec getUpgradedSegmentShardSpec(DataSegment oldSegment, 
PendingSegmentRecord pendingSegment)
+  {
+    final ShardSpec pendingShardSpec = pendingSegment.getId().getShardSpec();
+    final ShardSpec oldShardSpec = oldSegment.getShardSpec();
+    if (oldShardSpec instanceof DimensionValueSetShardSpec) {
+      return new DimensionValueSetShardSpec(
+          pendingShardSpec.getPartitionNum(),
+          pendingShardSpec.getNumCorePartitions(),
+          ((DimensionValueSetShardSpec) 
oldShardSpec).getPartitionDimensionValues()
+      );
+    }
+    return pendingShardSpec;
+  }
+
   private Map<SegmentCreateRequest, PendingSegmentRecord> createNewSegments(
       SegmentMetadataTransaction transaction,
       String dataSource,
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 238b76f741a..f9ab6078a9a 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -66,6 +66,7 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.SegmentTimeline;
 import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
+import org.apache.druid.timeline.partition.DimensionValueSetShardSpec;
 import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
 import org.apache.druid.timeline.partition.LinearShardSpec;
@@ -144,7 +145,12 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
   {
     derbyConnector = derbyConnectorRule.getConnector();
     segmentsTable = derbyConnectorRule.segments();
-    mapper.registerSubtypes(LinearShardSpec.class, NumberedShardSpec.class, 
HashBasedNumberedShardSpec.class);
+    mapper.registerSubtypes(
+        LinearShardSpec.class,
+        NumberedShardSpec.class,
+        HashBasedNumberedShardSpec.class,
+        DimensionValueSetShardSpec.class
+    );
     derbyConnector.createDataSourceTable();
     derbyConnector.createTaskTables();
     derbyConnector.createSegmentTable();
@@ -438,6 +444,106 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     Assert.assertEquals(replaceLock.getVersion(), 
Iterables.getOnlyElement(observedLockVersions));
   }
 
+  /**
+   * When a concurrent REPLACE upgrades a still-appending task, the upgraded 
copy must take its partition number and
+   * core-partition count from the (numbered) pending segment while preserving 
the original append segment's
+   * {@link DimensionValueSetShardSpec}, so it stays prunable by the broker.
+   */
+  @Test
+  public void 
testCommitAppendSegments_upgradedSegmentPreservesDimensionValueSetShardSpec()
+  {
+    final String appendVersion = "2023-01-01";
+    final String upgradedVersion = "2023-02-01";
+
+    final String taskAllocatorId = "appendTask";
+    final String replaceTaskId = "replaceTask1";
+    final ReplaceTaskLock replaceLock = new ReplaceTaskLock(
+        replaceTaskId,
+        Intervals.of("2023-01-01/2023-02-01"),
+        upgradedVersion
+    );
+
+    final Map<String, List<String>> partitionDimensionValues = 
ImmutableMap.of("tenant_id", ImmutableList.of("tenant_a"));
+    // The published append segment carries a DimensionValueSetShardSpec, as 
stamped at publish time by the streaming task.
+    final DataSegment appendSegment = createSegment(
+        Intervals.of("2023-01-01/2023-01-02"),
+        appendVersion,
+        new DimensionValueSetShardSpec(0, 1, partitionDimensionValues)
+    );
+
+    final List<PendingSegmentRecord> pendingSegmentsForTask = new 
ArrayList<>();
+    // The pending segment for the append segment itself.
+    pendingSegmentsForTask.add(
+        PendingSegmentRecord.create(
+            SegmentIdWithShardSpec.fromDataSegment(appendSegment),
+            appendVersion,
+            appendSegment.getId().toString(),
+            null,
+            taskAllocatorId
+        )
+    );
+    // The upgraded pending segment minted by the concurrent REPLACE — 
numbered, pointing back to the append segment.
+    final SegmentIdWithShardSpec upgradedPendingId = new 
SegmentIdWithShardSpec(
+        TestDataSource.WIKI,
+        Intervals.of("2023-01-01/2023-02-01"),
+        upgradedVersion,
+        new NumberedShardSpec(5, 8)
+    );
+    pendingSegmentsForTask.add(
+        PendingSegmentRecord.create(
+            upgradedPendingId,
+            upgradedVersion,
+            appendSegment.getId().toString(),
+            appendSegment.getId().toString(),
+            taskAllocatorId
+        )
+    );
+    insertPendingSegments(TestDataSource.WIKI, pendingSegmentsForTask, false);
+
+    final SegmentPublishResult commitResult = coordinator.commitAppendSegments(
+        Set.of(appendSegment),
+        Map.of(appendSegment, replaceLock),
+        taskAllocatorId,
+        null
+    );
+    Assert.assertTrue(commitResult.isSuccess());
+
+    final Set<DataSegment> allCommittedSegments
+        = new 
HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()));
+    final Map<String, String> upgradedFromSegmentIdMap = 
coordinator.retrieveUpgradedFromSegmentIds(
+        TestDataSource.WIKI,
+        
allCommittedSegments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
+    );
+
+    // The original append segment is published as-is, retaining its 
DimensionValueSetShardSpec.
+    Assert.assertTrue(allCommittedSegments.contains(appendSegment));
+    Assert.assertTrue(appendSegment.getShardSpec() instanceof 
DimensionValueSetShardSpec);
+
+    // Find the upgraded copy (the one whose upgradedFromSegmentId points back 
to the append segment).
+    DataSegment upgradedSegment = null;
+    for (DataSegment segment : allCommittedSegments) {
+      if 
(appendSegment.getId().toString().equals(upgradedFromSegmentIdMap.get(segment.getId().toString())))
 {
+        upgradedSegment = segment;
+      }
+    }
+    Assert.assertNotNull("Expected an upgraded copy of the append segment", 
upgradedSegment);
+
+    // The upgraded copy is published under the replace version, with the 
pending segment's partition number and core
+    // partitions, but it preserves the original DimensionValueSetShardSpec 
(and partitionDimensionValues).
+    Assert.assertEquals(upgradedVersion, upgradedSegment.getVersion());
+    Assert.assertTrue(
+        "upgraded append segment should preserve DimensionValueSetShardSpec",
+        upgradedSegment.getShardSpec() instanceof DimensionValueSetShardSpec
+    );
+    Assert.assertEquals(
+        partitionDimensionValues,
+        ((DimensionValueSetShardSpec) 
upgradedSegment.getShardSpec()).getPartitionDimensionValues()
+    );
+    // Partition number and core partitions come from the (numbered) pending 
segment.
+    Assert.assertEquals(5, upgradedSegment.getShardSpec().getPartitionNum());
+    Assert.assertEquals(8, 
upgradedSegment.getShardSpec().getNumCorePartitions());
+  }
+
   @Test
   public void 
testCommitReplaceSegments_partiallyOverlappingPendingSegmentUnsupported()
   {
@@ -632,6 +738,82 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     );
   }
 
+  /**
+   * When a REPLACE commits over an interval with already-published APPEND 
segments held under a REPLACE lock, the
+   * upgraded (re-versioned) copies must preserve their {@link 
DimensionValueSetShardSpec} so they remain prunable by
+   * the broker.
+   */
+  @Test
+  public void 
testCommitReplaceSegments_upgradedPublishedSegmentPreservesDimensionValueSetShardSpec()
+  {
+    final ReplaceTaskLock replaceLock = new ReplaceTaskLock("g1", 
Intervals.of("2023-01-01/2023-02-01"), "2023-02-01");
+
+    final Map<String, List<String>> partitionDimensionValues = 
ImmutableMap.of("tenant_id", ImmutableList.of("tenant_a"));
+    // A published APPEND segment carrying a DimensionValueSetShardSpec (as 
stamped at publish time by the streaming task).
+    final DataSegment appendSegment = new DataSegment(
+        "foo",
+        Intervals.of("2023-01-01/2023-01-02"),
+        "2023-01-01",
+        ImmutableMap.of("path", "a-0"),
+        ImmutableList.of("dim1"),
+        ImmutableList.of("m1"),
+        new DimensionValueSetShardSpec(0, 1, partitionDimensionValues),
+        9,
+        100
+    );
+    segmentSchemaTestUtils.insertUsedSegments(Set.of(appendSegment), 
Collections.emptyMap());
+    insertIntoUpgradeSegmentsTable(
+        Map.of(appendSegment, replaceLock),
+        derbyConnectorRule.metadataTablesConfigSupplier().get()
+    );
+
+    final Set<DataSegment> replacingSegments = new HashSet<>();
+    for (int i = 0; i < 4; i++) {
+      replacingSegments.add(
+          new DataSegment(
+              "foo",
+              Intervals.of("2023-01-01/2023-02-01"),
+              "2023-02-01",
+              ImmutableMap.of("path", "b-" + i),
+              ImmutableList.of("dim1"),
+              ImmutableList.of("m1"),
+              new NumberedShardSpec(i, 4),
+              9,
+              100
+          )
+      );
+    }
+    Assert.assertTrue(coordinator.commitReplaceSegments(replacingSegments, 
Set.of(replaceLock), null).isSuccess());
+
+    final Set<DataSegment> usedSegments
+        = new 
HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()));
+    final Map<String, String> upgradedFromSegmentIdMap = 
coordinator.retrieveUpgradedFromSegmentIds(
+        "foo",
+        
usedSegments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
+    );
+
+    // Find the upgraded copy of the append segment (the one whose 
upgradedFromSegmentId points back to it).
+    DataSegment upgradedSegment = null;
+    for (DataSegment segment : usedSegments) {
+      if 
(appendSegment.getId().toString().equals(upgradedFromSegmentIdMap.get(segment.getId().toString())))
 {
+        upgradedSegment = segment;
+      }
+    }
+    Assert.assertNotNull("Expected an upgraded published segment", 
upgradedSegment);
+
+    // The upgraded published segment is re-versioned to the replace version 
but keeps its DimensionValueSetShardSpec
+    // (and partitionDimensionValues), so it remains prunable by the broker.
+    Assert.assertEquals("2023-02-01", upgradedSegment.getVersion());
+    Assert.assertTrue(
+        "upgraded published segment should preserve 
DimensionValueSetShardSpec",
+        upgradedSegment.getShardSpec() instanceof DimensionValueSetShardSpec
+    );
+    Assert.assertEquals(
+        partitionDimensionValues,
+        ((DimensionValueSetShardSpec) 
upgradedSegment.getShardSpec()).getPartitionDimensionValues()
+    );
+  }
+
   @Test
   public void testCommitReplaceSegmentsWithUpdatedCorePartitions()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to