This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch 28.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/28.0.0 by this push:
     new 701b9af6c90 Filter pending segments upgraded with transactional 
replace (#15169) (#15234)
701b9af6c90 is described below

commit 701b9af6c90e535e320cf4d8b4491d0032209106
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Tue Oct 24 00:37:18 2023 +0530

    Filter pending segments upgraded with transactional replace (#15169) 
(#15234)
    
    * Filter pending segments upgraded with transactional replace
    
    * Push sequence name filter to metadata query
---
 .../MaterializedViewSupervisor.java                |  6 ++
 .../MaterializedViewSupervisorSpecTest.java        | 16 +---
 .../actions/SegmentTransactionalReplaceAction.java |  5 +-
 .../overlord/supervisor/SupervisorManager.java     | 10 +++
 .../supervisor/SeekableStreamSupervisor.java       | 22 +++++
 .../SeekableStreamSupervisorStateTest.java         | 42 ++++++++++
 .../TestIndexerMetadataStorageCoordinator.java     |  5 +-
 .../IndexerMetadataStorageCoordinator.java         |  5 +-
 .../overlord/supervisor/NoopSupervisorSpec.java    |  7 ++
 .../indexing/overlord/supervisor/Supervisor.java   |  6 ++
 .../IndexerSQLMetadataStorageCoordinator.java      | 76 ++++++++++++++---
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 98 ++++++++++++++++++++++
 12 files changed, 273 insertions(+), 25 deletions(-)

diff --git 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index d0a035be17c..af5c0fbe95a 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -296,6 +296,12 @@ public class MaterializedViewSupervisor implements 
Supervisor
     throw new UnsupportedOperationException("Compute Lag Stats not supported 
in MaterializedViewSupervisor");
   }
 
+  @Override
+  public Set<String> getActiveRealtimeSequencePrefixes()
+  {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public int getActiveTaskGroupsCount()
   {
diff --git 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
index fe84d358baf..365fb1751ea 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java
@@ -203,19 +203,11 @@ public class MaterializedViewSupervisorSpecTest
       SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor);
       Assert.assertNull(autoscaler);
 
-      try {
-        supervisor.computeLagStats();
-      }
-      catch (Exception e) {
-        Assert.assertTrue(e instanceof UnsupportedOperationException);
-      }
+      Assert.assertThrows(UnsupportedOperationException.class, () -> 
supervisor.computeLagStats());
 
-      try {
-        int count = supervisor.getActiveTaskGroupsCount();
-      }
-      catch (Exception e) {
-        Assert.assertTrue(e instanceof UnsupportedOperationException);
-      }
+      Assert.assertThrows(UnsupportedOperationException.class, () -> 
supervisor.getActiveTaskGroupsCount());
+
+      Assert.assertThrows(UnsupportedOperationException.class, () -> 
supervisor.getActiveRealtimeSequencePrefixes());
 
       Callable<Integer> noop = new Callable<Integer>() {
         @Override
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
index e6ad0426e46..aaa62db90a7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java
@@ -145,8 +145,11 @@ public class SegmentTransactionalReplaceAction implements 
TaskAction<SegmentPubl
       return;
     }
 
+    final Set<String> activeRealtimeSequencePrefixes
+        = 
supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
     Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradedPendingSegments =
-        
toolbox.getIndexerMetadataStorageCoordinator().upgradePendingSegmentsOverlappingWith(segments);
+        toolbox.getIndexerMetadataStorageCoordinator()
+               .upgradePendingSegmentsOverlappingWith(segments, 
activeRealtimeSequencePrefixes);
     log.info(
         "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
         upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index df454c1011a..207ff56f28f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -39,6 +39,7 @@ import 
org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 
 import javax.annotation.Nullable;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -110,6 +111,15 @@ public class SupervisorManager
     return Optional.absent();
   }
 
+  public Set<String> getActiveRealtimeSequencePrefixes(String 
activeSupervisorId)
+  {
+    if (supervisors.containsKey(activeSupervisorId)) {
+      return 
supervisors.get(activeSupervisorId).lhs.getActiveRealtimeSequencePrefixes();
+    } else {
+      return Collections.emptySet();
+    }
+  }
+
   public Optional<SupervisorSpec> getSupervisorSpec(String id)
   {
     Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 1d05169e3fb..a0ec6d809eb 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -1093,6 +1093,28 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
   }
 
+  /**
+   * The base sequence name of a seekable stream task group is used as a 
prefix of the sequence names
+   * of pending segments published by it.
+   * This method can be used to identify the active pending segments for a 
datasource
+   * by checking if the sequence name begins with any of the active realtime 
sequence prefix returned by this method
+   * @return the set of base sequence names of both active and pending 
completion task gruops.
+   */
+  @Override
+  public Set<String> getActiveRealtimeSequencePrefixes()
+  {
+    final Set<String> activeBaseSequences = new HashSet<>();
+    for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) {
+      activeBaseSequences.add(taskGroup.baseSequenceName);
+    }
+    for (List<TaskGroup> taskGroupList : pendingCompletionTaskGroups.values()) 
{
+      for (TaskGroup taskGroup : taskGroupList) {
+        activeBaseSequences.add(taskGroup.baseSequenceName);
+      }
+    }
+    return activeBaseSequences;
+  }
+
   public void registerNewVersionOfPendingSegment(
       SegmentIdWithShardSpec basePendingSegment,
       SegmentIdWithShardSpec newSegmentVersion
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 819a6baacd8..7a587bb196e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -1313,6 +1313,48 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 0);
   }
 
+  @Test
+  public void testGetActiveRealtimeSequencePrefixes()
+  {
+    EasyMock.expect(spec.isSuspended()).andReturn(false);
+
+    replayAll();
+
+    final TestSeekableStreamSupervisor supervisor = new 
TestSeekableStreamSupervisor();
+
+    // Spin off two active tasks with each task serving one partition.
+    supervisor.getIoConfig().setTaskCount(3);
+    supervisor.start();
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition("0"),
+        ImmutableMap.of("0", "5"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task1"),
+        ImmutableSet.of()
+    );
+
+    supervisor.addTaskGroupToActivelyReadingTaskGroup(
+        supervisor.getTaskGroupIdForPartition("1"),
+        ImmutableMap.of("1", "6"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task2"),
+        ImmutableSet.of()
+    );
+
+    supervisor.addTaskGroupToPendingCompletionTaskGroup(
+        supervisor.getTaskGroupIdForPartition("2"),
+        ImmutableMap.of("2", "100"),
+        Optional.absent(),
+        Optional.absent(),
+        ImmutableSet.of("task3"),
+        ImmutableSet.of()
+    );
+
+    Assert.assertEquals(3, 
supervisor.getActiveRealtimeSequencePrefixes().size());
+  }
+
   @Test
   public void testSupervisorResetSpecificOffsetsTasksWithCheckpoints() throws 
InterruptedException, IOException
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index 108833422c8..143a74c72cb 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -238,7 +238,10 @@ public class TestIndexerMetadataStorageCoordinator 
implements IndexerMetadataSto
   }
 
   @Override
-  public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradePendingSegmentsOverlappingWith(Set<DataSegment> replaceSegments)
+  public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradePendingSegmentsOverlappingWith(
+      Set<DataSegment> replaceSegments,
+      Set<String> activeBaseSequenceNames
+  )
   {
     return Collections.emptyMap();
   }
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 7c6710048a1..34a55574dce 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -347,10 +347,13 @@ public interface IndexerMetadataStorageCoordinator
    * </ul>
    *
    * @param replaceSegments Segments being committed by a REPLACE task
+   * @param activeRealtimeSequencePrefixes Set of sequence prefixes of active 
and pending completion task groups
+   *                                       of the supervisor (if any) for this 
datasource
    * @return Map from originally allocated pending segment to its new upgraded 
ID.
    */
   Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradePendingSegmentsOverlappingWith(
-      Set<DataSegment> replaceSegments
+      Set<DataSegment> replaceSegments,
+      Set<String> activeRealtimeSequencePrefixes
   );
 
   /**
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
index e733ef6c233..20c10253386 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java
@@ -31,6 +31,7 @@ import org.apache.druid.server.security.ResourceAction;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -185,6 +186,12 @@ public class NoopSupervisorSpec implements SupervisorSpec
       {
         return -1;
       }
+
+      @Override
+      public Set<String> getActiveRealtimeSequencePrefixes()
+      {
+        return Collections.emptySet();
+      }
     };
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index bcfc5ebe819..9d940bc55b6 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -29,6 +29,7 @@ import 
org.apache.druid.segment.incremental.ParseExceptionReport;
 import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public interface Supervisor
 {
@@ -93,4 +94,9 @@ public interface Supervisor
   LagStats computeLagStats();
 
   int getActiveTaskGroupsCount();
+
+  /**
+   * @return active sequence prefixes for reading and pending completion task 
groups of a seekable stream supervisor
+   */
+  Set<String> getActiveRealtimeSequencePrefixes();
 }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index c654d5e229b..612f712c1bb 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -231,7 +231,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         (handle, status) -> {
           try (final CloseableIterator<DataSegment> iterator =
                    SqlSegmentsMetadataQuery.forHandle(handle, connector, 
dbTables, jsonMapper)
-                       .retrieveUnusedSegments(dataSource, 
Collections.singletonList(interval), limit)) {
+                                           .retrieveUnusedSegments(dataSource, 
Collections.singletonList(interval), limit)) {
             return ImmutableList.copyOf(iterator);
           }
         }
@@ -258,9 +258,62 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
 
   /**
    * Fetches all the pending segments, whose interval overlaps with the given
-   * search interval from the metadata store. Returns a Map from the
-   * pending segment ID to the sequence name.
+   * search interval and has a sequence_name that begins with one of the 
prefixes in sequenceNamePrefixFilter
+   * from the metadata store. Returns a Map from the pending segment ID to the 
sequence name.
    */
+  @VisibleForTesting
+  Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
+      final Handle handle,
+      final String dataSource,
+      final Interval interval,
+      final Set<String> sequenceNamePrefixFilter
+  ) throws IOException
+  {
+    if (sequenceNamePrefixFilter.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    final List<String> sequenceNamePrefixes = new 
ArrayList<>(sequenceNamePrefixFilter);
+    final List<String> sequenceNamePrefixConditions = new ArrayList<>();
+    for (int i = 0; i < sequenceNamePrefixes.size(); i++) {
+      sequenceNamePrefixConditions.add(StringUtils.format("(sequence_name LIKE 
:prefix%d)", i));
+    }
+
+    String sql = "SELECT sequence_name, payload"
+                 + " FROM " + dbTables.getPendingSegmentsTable()
+                 + " WHERE dataSource = :dataSource"
+                 + " AND start < :end"
+                 + StringUtils.format(" AND %1$send%1$s > :start", 
connector.getQuoteString())
+                 + " AND ( " + String.join(" OR ", 
sequenceNamePrefixConditions) + " )";
+
+    Query<Map<String, Object>> query = handle.createQuery(sql)
+                                             .bind("dataSource", dataSource)
+                                             .bind("start", 
interval.getStart().toString())
+                                             .bind("end", 
interval.getEnd().toString());
+
+    for (int i = 0; i < sequenceNamePrefixes.size(); i++) {
+      query.bind(StringUtils.format("prefix%d", i), 
sequenceNamePrefixes.get(i) + "%");
+    }
+
+    final ResultIterator<PendingSegmentsRecord> dbSegments =
+        query.map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r))
+             .iterator();
+
+    final Map<SegmentIdWithShardSpec, String> pendingSegmentToSequenceName = 
new HashMap<>();
+    while (dbSegments.hasNext()) {
+      PendingSegmentsRecord record = dbSegments.next();
+      final SegmentIdWithShardSpec identifier = 
jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class);
+
+      if (interval.overlaps(identifier.getInterval())) {
+        pendingSegmentToSequenceName.put(identifier, record.sequenceName);
+      }
+    }
+
+    dbSegments.close();
+
+    return pendingSegmentToSequenceName;
+  }
+
   private Map<SegmentIdWithShardSpec, String> 
getPendingSegmentsForIntervalWithHandle(
       final Handle handle,
       final String dataSource,
@@ -620,7 +673,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
 
   @Override
   public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradePendingSegmentsOverlappingWith(
-      Set<DataSegment> replaceSegments
+      Set<DataSegment> replaceSegments,
+      Set<String> activeRealtimeSequencePrefixes
   )
   {
     if (replaceSegments.isEmpty()) {
@@ -639,7 +693,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
 
     final String datasource = 
replaceSegments.iterator().next().getDataSource();
     return connector.retryWithHandle(
-        handle -> upgradePendingSegments(handle, datasource, 
replaceIntervalToMaxId)
+        handle -> upgradePendingSegments(handle, datasource, 
replaceIntervalToMaxId, activeRealtimeSequencePrefixes)
     );
   }
 
@@ -658,7 +712,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   private Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> 
upgradePendingSegments(
       Handle handle,
       String datasource,
-      Map<Interval, DataSegment> replaceIntervalToMaxId
+      Map<Interval, DataSegment> replaceIntervalToMaxId,
+      Set<String> activeRealtimeSequencePrefixes
   ) throws IOException
   {
     final Map<SegmentCreateRequest, SegmentIdWithShardSpec> 
newPendingSegmentVersions = new HashMap<>();
@@ -673,12 +728,13 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       int currentPartitionNumber = 
maxSegmentId.getShardSpec().getPartitionNum();
 
       final Map<SegmentIdWithShardSpec, String> overlappingPendingSegments
-          = getPendingSegmentsForIntervalWithHandle(handle, datasource, 
replaceInterval);
+          = getPendingSegmentsForIntervalWithHandle(handle, datasource, 
replaceInterval, activeRealtimeSequencePrefixes);
 
       for (Map.Entry<SegmentIdWithShardSpec, String> overlappingPendingSegment
           : overlappingPendingSegments.entrySet()) {
         final SegmentIdWithShardSpec pendingSegmentId = 
overlappingPendingSegment.getKey();
         final String pendingSegmentSequence = 
overlappingPendingSegment.getValue();
+
         if (shouldUpgradePendingSegment(pendingSegmentId, 
pendingSegmentSequence, replaceInterval, replaceVersion)) {
           // Ensure unique sequence_name_prev_id_sha1 by setting
           // sequence_prev_id -> pendingSegmentId
@@ -1281,9 +1337,9 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     final Map<Interval, Set<DataSegment>> overlappingIntervalToSegments = new 
HashMap<>();
     for (DataSegment segment : overlappingSegments) {
       overlappingVersionToIntervals.computeIfAbsent(segment.getVersion(), v -> 
new HashSet<>())
-                                 .add(segment.getInterval());
+                                   .add(segment.getInterval());
       overlappingIntervalToSegments.computeIfAbsent(segment.getInterval(), i 
-> new HashSet<>())
-                                 .add(segment);
+                                   .add(segment);
     }
 
     final Set<DataSegment> upgradedSegments = new HashSet<>();
@@ -2275,7 +2331,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       // Not in the desired start state.
       return new DataStoreMetadataUpdateResult(true, false, StringUtils.format(
           "Inconsistent metadata state. This can happen if you update input 
topic in a spec without changing " +
-              "the supervisor name. Stored state: [%s], Target state: [%s].",
+          "the supervisor name. Stored state: [%s], Target state: [%s].",
           oldCommitMetadataFromDb,
           startMetadata
       ));
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 0512357ffc1..a8fc9e923c5 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
 import org.apache.druid.data.input.StringTuple;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.ObjectMetadata;
@@ -464,6 +466,44 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     );
   }
 
+  private Boolean 
insertPendingSegmentAndSequenceName(Pair<SegmentIdWithShardSpec, String> 
pendingSegmentSequenceName)
+  {
+    final SegmentIdWithShardSpec pendingSegment = 
pendingSegmentSequenceName.lhs;
+    final String sequenceName = pendingSegmentSequenceName.rhs;
+    final String table = 
derbyConnectorRule.metadataTablesConfigSupplier().get().getPendingSegmentsTable();
+    return derbyConnector.retryWithHandle(
+        handle -> {
+          handle.createStatement(
+                    StringUtils.format(
+                        "INSERT INTO %1$s (id, dataSource, created_date, 
start, %2$send%2$s, sequence_name, sequence_prev_id, "
+                        + "sequence_name_prev_id_sha1, payload) "
+                        + "VALUES (:id, :dataSource, :created_date, :start, 
:end, :sequence_name, :sequence_prev_id, "
+                        + ":sequence_name_prev_id_sha1, :payload)",
+                        table,
+                        derbyConnector.getQuoteString()
+                    )
+                )
+                .bind("id", pendingSegment.toString())
+                .bind("dataSource", pendingSegment.getDataSource())
+                .bind("created_date", DateTimes.nowUtc().toString())
+                .bind("start", 
pendingSegment.getInterval().getStart().toString())
+                .bind("end", pendingSegment.getInterval().getEnd().toString())
+                .bind("sequence_name", sequenceName)
+                .bind("sequence_prev_id", pendingSegment.toString())
+                .bind("sequence_name_prev_id_sha1", 
BaseEncoding.base16().encode(
+                    Hashing.sha1()
+                           .newHasher()
+                           .putLong((long) pendingSegment.hashCode() * 
sequenceName.hashCode())
+                           .hash()
+                           .asBytes()
+                ))
+                .bind("payload", mapper.writeValueAsBytes(pendingSegment))
+                .execute();
+          return true;
+        }
+    );
+  }
+
   private Map<String, String> getSegmentsCommittedDuringReplaceTask(String 
taskId)
   {
     final String table = 
derbyConnectorRule.metadataTablesConfigSupplier().get().getUpgradeSegmentsTable();
@@ -2554,6 +2594,64 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     );
   }
 
+  @Test
+  public void testGetPendingSegmentsForIntervalWithSequencePrefixes()
+  {
+    Pair<SegmentIdWithShardSpec, String> validIntervalValidSequence = Pair.of(
+        SegmentIdWithShardSpec.fromDataSegment(defaultSegment),
+        "validLOL"
+    );
+    insertPendingSegmentAndSequenceName(validIntervalValidSequence);
+
+    Pair<SegmentIdWithShardSpec, String> validIntervalInvalidSequence = 
Pair.of(
+        SegmentIdWithShardSpec.fromDataSegment(defaultSegment2),
+        "invalidRandom"
+    );
+    insertPendingSegmentAndSequenceName(validIntervalInvalidSequence);
+
+    Pair<SegmentIdWithShardSpec, String> invalidIntervalvalidSequence = 
Pair.of(
+        SegmentIdWithShardSpec.fromDataSegment(existingSegment1),
+        "validStuff"
+    );
+    insertPendingSegmentAndSequenceName(invalidIntervalvalidSequence);
+
+    Pair<SegmentIdWithShardSpec, String> twentyFifteenWithAnotherValidSequence 
= Pair.of(
+        new SegmentIdWithShardSpec(
+            existingSegment1.getDataSource(),
+            Intervals.of("2015/2016"),
+            "1970-01-01",
+            new NumberedShardSpec(1, 0)
+        ),
+        "alsoValidAgain"
+    );
+    insertPendingSegmentAndSequenceName(twentyFifteenWithAnotherValidSequence);
+
+    Pair<SegmentIdWithShardSpec, String> twentyFifteenWithInvalidSequence = 
Pair.of(
+        new SegmentIdWithShardSpec(
+            existingSegment1.getDataSource(),
+            Intervals.of("2015/2016"),
+            "1970-01-01",
+            new NumberedShardSpec(2, 0)
+        ),
+        "definitelyInvalid"
+    );
+    insertPendingSegmentAndSequenceName(twentyFifteenWithInvalidSequence);
+
+
+    final Map<SegmentIdWithShardSpec, String> expected = new HashMap<>();
+    expected.put(validIntervalValidSequence.lhs, 
validIntervalValidSequence.rhs);
+    expected.put(twentyFifteenWithAnotherValidSequence.lhs, 
twentyFifteenWithAnotherValidSequence.rhs);
+
+    final Map<SegmentIdWithShardSpec, String> actual =
+        derbyConnector.retryWithHandle(handle -> 
coordinator.getPendingSegmentsForIntervalWithHandle(
+            handle,
+            defaultSegment.getDataSource(),
+            defaultSegment.getInterval(),
+            ImmutableSet.of("valid", "alsoValid")
+        ));
+    Assert.assertEquals(expected, actual);
+  }
+
   @Test
   public void testRetrieveUsedSegmentsAndCreatedDates()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to