This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 24e5d8a9e81 Refactor: Minor cleanup of segment allocation flow (#17524)
24e5d8a9e81 is described below

commit 24e5d8a9e81fffd656e9544ae16bea53d7e60808
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Dec 12 18:16:57 2024 -0800

    Refactor: Minor cleanup of segment allocation flow (#17524)
    
    Changes
    --------
    - Simplify the arguments of 
IndexerMetadataStorageCoordinator.allocatePendingSegment
    - Remove field SegmentCreateRequest.upgradedFromSegmentId as it was always 
null
    - Miscellaneous cleanup
---
 .../druid/indexing/overlord/TaskLockbox.java       |  80 ++++---
 .../druid/indexing/overlord/TaskLockboxTest.java   |   4 +-
 .../TestIndexerMetadataStorageCoordinator.java     |  11 +-
 .../timeline/partition/OverwriteShardSpec.java     |   1 +
 .../druid/timeline/partition/PartitionIds.java     |   1 +
 .../IndexerMetadataStorageCoordinator.java         |  22 +-
 .../indexing/overlord/SegmentCreateRequest.java    |  20 +-
 .../IndexerSQLMetadataStorageCoordinator.java      | 242 ++++++++-------------
 .../overlord/SegmentCreateRequestTest.java         |   1 -
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 102 +++++----
 10 files changed, 213 insertions(+), 271 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 916f8cab75c..2c1b78d3ada 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -183,7 +183,7 @@ public class TaskLockbox
                                       ? 
savedTaskLock.withPriority(task.getPriority())
                                       : savedTaskLock;
 
-        final TaskLockPosse taskLockPosse = verifyAndCreateOrFindLockPosse(
+        final TaskLockPosse taskLockPosse = reacquireLockOnStartup(
             task,
             savedTaskLockWithPriority
         );
@@ -192,15 +192,11 @@ public class TaskLockbox
 
           if 
(savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) {
             taskLockCount++;
-            log.info(
-                "Reacquired lock[%s] for task: %s",
-                taskLock,
-                task.getId()
-            );
+            log.info("Reacquired lock[%s] for task[%s].", taskLock, 
task.getId());
           } else {
             taskLockCount++;
             log.info(
-                "Could not reacquire lock on interval[%s] version[%s] (got 
version[%s] instead) for task: %s",
+                "Could not reacquire lock on interval[%s] version[%s] (got 
version[%s] instead) for task[%s].",
                 savedTaskLockWithPriority.getInterval(),
                 savedTaskLockWithPriority.getVersion(),
                 taskLock.getVersion(),
@@ -210,7 +206,7 @@ public class TaskLockbox
         } else {
           failedToReacquireLockTaskGroups.add(task.getGroupId());
           log.error(
-              "Could not reacquire lock on interval[%s] version[%s] for task: 
%s from group %s.",
+              "Could not reacquire lock on interval[%s] version[%s] for 
task[%s], groupId[%s].",
               savedTaskLockWithPriority.getInterval(),
               savedTaskLockWithPriority.getVersion(),
               task.getId(),
@@ -253,38 +249,28 @@ public class TaskLockbox
   }
 
   /**
-   * This method is called only in {@link #syncFromStorage()} and verifies the 
given task and the taskLock have the same
-   * groupId, dataSource, and priority.
+   * Reacquire lock during {@link #syncFromStorage()}.
+   *
+   * @return null if the lock could not be reacquired.
    */
   @VisibleForTesting
   @Nullable
-  protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock 
taskLock)
+  protected TaskLockPosse reacquireLockOnStartup(Task task, TaskLock taskLock)
   {
+    if (!taskMatchesLock(task, taskLock)) {
+      log.warn(
+          "Task[datasource: %s, groupId: %s, priority: %s] does not match"
+          + " TaskLock[datasource: %s, groupId: %s, priority: %s].",
+          task.getDataSource(), task.getGroupId(), task.getPriority(),
+          taskLock.getDataSource(), taskLock.getGroupId(), 
taskLock.getNonNullPriority()
+      );
+      return null;
+    }
+
     giant.lock();
 
     try {
-      Preconditions.checkArgument(
-          task.getGroupId().equals(taskLock.getGroupId()),
-          "lock groupId[%s] is different from task groupId[%s]",
-          taskLock.getGroupId(),
-          task.getGroupId()
-      );
-      Preconditions.checkArgument(
-          task.getDataSource().equals(taskLock.getDataSource()),
-          "lock dataSource[%s] is different from task dataSource[%s]",
-          taskLock.getDataSource(),
-          task.getDataSource()
-      );
       final int taskPriority = task.getPriority();
-      final int lockPriority = taskLock.getNonNullPriority();
-
-      Preconditions.checkArgument(
-          lockPriority == taskPriority,
-          "lock priority[%s] is different from task priority[%s]",
-          lockPriority,
-          taskPriority
-      );
-
       final LockRequest request;
       switch (taskLock.getGranularity()) {
         case SEGMENT:
@@ -313,15 +299,13 @@ public class TaskLockbox
           );
           break;
         default:
-          throw new ISE("Unknown lockGranularity[%s]", 
taskLock.getGranularity());
+          throw DruidException.defensive("Unknown lockGranularity[%s]", 
taskLock.getGranularity());
       }
 
       return createOrFindLockPosse(request, task, false);
     }
     catch (Exception e) {
-      log.error(e,
-                "Could not reacquire lock for task: %s from metadata store", 
task.getId()
-      );
+      log.error(e, "Could not reacquire lock for task[%s] from metadata 
store", task.getId());
       return null;
     }
     finally {
@@ -329,6 +313,17 @@ public class TaskLockbox
     }
   }
 
+  /**
+   * Returns true if the datasource, groupId and priority of the given Task
+   * match that of the TaskLock.
+   */
+  private boolean taskMatchesLock(Task task, TaskLock taskLock)
+  {
+    return task.getGroupId().equals(taskLock.getGroupId())
+        && task.getDataSource().equals(taskLock.getDataSource())
+        && task.getPriority() == taskLock.getNonNullPriority();
+  }
+
   /**
    * Acquires a lock on behalf of a task.  Blocks until the lock is acquired.
    *
@@ -751,13 +746,15 @@ public class TaskLockbox
   {
     return metadataStorageCoordinator.allocatePendingSegment(
         request.getDataSource(),
-        request.getSequenceName(),
-        request.getPreviousSegmentId(),
         request.getInterval(),
-        request.getPartialShardSpec(),
-        version,
         request.isSkipSegmentLineageCheck(),
-        allocatorId
+        new SegmentCreateRequest(
+            request.getSequenceName(),
+            request.getPreviousSegmentId(),
+            version,
+            request.getPartialShardSpec(),
+            allocatorId
+        )
     );
   }
 
@@ -1818,7 +1815,6 @@ public class TaskLockbox
             action.getPreviousSegmentId(),
             acquiredLock == null ? lockRequest.getVersion() : 
acquiredLock.getVersion(),
             action.getPartialShardSpec(),
-            null,
             ((PendingSegmentAllocatingTask) task).getTaskAllocatorId()
         );
       }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index a8c4b5117b1..8f47b78a3bf 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -2270,10 +2270,10 @@ public class TaskLockboxTest
     }
 
     @Override
-    protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock 
taskLock)
+    protected TaskLockPosse reacquireLockOnStartup(Task task, TaskLock 
taskLock)
     {
       return task.getGroupId()
-                 .contains("FailingLockAcquisition") ? null : 
super.verifyAndCreateOrFindLockPosse(task, taskLock);
+                 .contains("FailingLockAcquisition") ? null : 
super.reacquireLockOnStartup(task, taskLock);
     }
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index 54e323581c4..a95d73ce1bb 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -36,7 +36,6 @@ import org.apache.druid.segment.SegmentSchemaMapping;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentTimeline;
-import org.apache.druid.timeline.partition.PartialShardSpec;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
@@ -242,20 +241,16 @@ public class TestIndexerMetadataStorageCoordinator 
implements IndexerMetadataSto
   @Override
   public SegmentIdWithShardSpec allocatePendingSegment(
       String dataSource,
-      String sequenceName,
-      String previousSegmentId,
       Interval interval,
-      PartialShardSpec partialShardSpec,
-      String maxVersion,
       boolean skipSegmentLineageCheck,
-      String taskAllocatorId
+      SegmentCreateRequest createRequest
   )
   {
     return new SegmentIdWithShardSpec(
         dataSource,
         interval,
-        maxVersion,
-        partialShardSpec.complete(objectMapper, 0, 0)
+        createRequest.getVersion(),
+        createRequest.getPartialShardSpec().complete(objectMapper, 0, 0)
     );
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/timeline/partition/OverwriteShardSpec.java
 
b/processing/src/main/java/org/apache/druid/timeline/partition/OverwriteShardSpec.java
index ec784c44ffb..a18f829efdd 100644
--- 
a/processing/src/main/java/org/apache/druid/timeline/partition/OverwriteShardSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/timeline/partition/OverwriteShardSpec.java
@@ -21,6 +21,7 @@ package org.apache.druid.timeline.partition;
 
 /**
  * ShardSpec for non-first-generation segments.
+ * This shardSpec is created only by overwriting tasks using segment locks.
  * This shardSpec is allocated a partitionId between {@link 
PartitionIds#NON_ROOT_GEN_START_PARTITION_ID} and
  * {@link PartitionIds#NON_ROOT_GEN_END_PARTITION_ID}.
  *
diff --git 
a/processing/src/main/java/org/apache/druid/timeline/partition/PartitionIds.java
 
b/processing/src/main/java/org/apache/druid/timeline/partition/PartitionIds.java
index fc5d0e981c4..2bd93125225 100644
--- 
a/processing/src/main/java/org/apache/druid/timeline/partition/PartitionIds.java
+++ 
b/processing/src/main/java/org/apache/druid/timeline/partition/PartitionIds.java
@@ -31,6 +31,7 @@ public final class PartitionIds
   public static final int ROOT_GEN_END_PARTITION_ID = 32768; // exclusive
   /**
    * Start partitionId available for non-root generation segments.
+   * Used only with segment locks.
    */
   public static final int NON_ROOT_GEN_START_PARTITION_ID = 32768;
   /**
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index da54d7e3998..5e840b07b6d 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -26,7 +26,6 @@ import org.apache.druid.segment.SegmentSchemaMapping;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentTimeline;
-import org.apache.druid.timeline.partition.PartialShardSpec;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
 
@@ -215,34 +214,23 @@ public interface IndexerMetadataStorageCoordinator
    * Note that a segment sequence may include segments with a variety of 
different intervals and versions.
    *
    * @param dataSource              dataSource for which to allocate a segment
-   * @param sequenceName            name of the group of ingestion tasks 
producing a segment series
-   * @param previousSegmentId       previous segment in the series; may be 
null or empty, meaning this is the first
-   *                                segment
    * @param interval                interval for which to allocate a segment
-   * @param partialShardSpec        partialShardSpec containing all necessary 
information to create a shardSpec for the
-   *                                new segmentId
-   * @param maxVersion              use this version if we have no better 
version to use. The returned segment
-   *                                identifier may have a version lower than 
this one, but will not have one higher.
    * @param skipSegmentLineageCheck if true, perform lineage validation using 
previousSegmentId for this sequence.
    *                                Should be set to false if replica tasks 
would index events in same order
-   * @param taskAllocatorId         The task allocator id with which the 
pending segment is associated
    * @return the pending segment identifier, or null if it was impossible to 
allocate a new segment
    */
+  @Nullable
   SegmentIdWithShardSpec allocatePendingSegment(
       String dataSource,
-      String sequenceName,
-      @Nullable String previousSegmentId,
       Interval interval,
-      PartialShardSpec partialShardSpec,
-      String maxVersion,
       boolean skipSegmentLineageCheck,
-      String taskAllocatorId
+      SegmentCreateRequest createRequest
   );
 
   /**
    * Delete pending segments created in the given interval belonging to the 
given data source from the pending segments
    * table. The {@code created_date} field of the pending segments table is 
checked to find segments to be deleted.
-   *
+   * <p>
    * Note that the semantic of the interval (for `created_date`s) is different 
from the semantic of the interval
    * parameters in some other methods in this class, such as {@link 
#retrieveUsedSegmentsForInterval} (where the
    * interval is about the time column value in rows belonging to the segment).
@@ -269,7 +257,7 @@ public interface IndexerMetadataStorageCoordinator
    * <p/>
    * If startMetadata and endMetadata are set, this insertion will be atomic 
with a compare-and-swap on dataSource
    * commit metadata.
-   *
+   * <p>
    * If segmentsToDrop is not null and not empty, this insertion will be 
atomic with a insert-and-drop on inserting
    * {@param segments} and dropping {@param segmentsToDrop}.
    *
@@ -426,7 +414,7 @@ public interface IndexerMetadataStorageCoordinator
    * Similar to {@link #commitSegments}, but meant for streaming ingestion 
tasks for handling
    * the case where the task ingested no records and created no segments, but 
still needs to update the metadata
    * with the progress that the task made.
-   *
+   * <p>
    * The metadata should undergo the same validation checks as performed by 
{@link #commitSegments}.
    *
    *
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java
index 49b31e5e6ff..bcbf9416fe8 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java
@@ -38,7 +38,6 @@ public class SegmentCreateRequest
   private final String sequenceName;
   private final String previousSegmentId;
   private final PartialShardSpec partialShardSpec;
-  private final String upgradedFromSegmentId;
   private final String taskAllocatorId;
 
   public SegmentCreateRequest(
@@ -46,7 +45,6 @@ public class SegmentCreateRequest
       String previousSegmentId,
       String version,
       PartialShardSpec partialShardSpec,
-      String upgradedFromSegmentId,
       String taskAllocatorId
   )
   {
@@ -54,24 +52,31 @@ public class SegmentCreateRequest
     this.previousSegmentId = previousSegmentId == null ? "" : 
previousSegmentId;
     this.version = version;
     this.partialShardSpec = partialShardSpec;
-    this.upgradedFromSegmentId = upgradedFromSegmentId;
     this.taskAllocatorId = taskAllocatorId;
   }
 
+  /**
+   * Represents group of ingestion tasks that produce a segment series.
+   */
   public String getSequenceName()
   {
     return sequenceName;
   }
 
   /**
-   * Non-null previous segment id. This can be used for persisting to the
-   * pending segments table in the metadata store.
+   * Previous segment id allocated for this sequence.
+   *
+   * @return Empty string if there is no previous segment in the series.
    */
   public String getPreviousSegmentId()
   {
     return previousSegmentId;
   }
 
+  /**
+   * Version of the lock held by the task that has requested the segment 
allocation.
+   * The allocated segment must have a version less than or equal to this 
version.
+   */
   public String getVersion()
   {
     return version;
@@ -82,11 +87,6 @@ public class SegmentCreateRequest
     return partialShardSpec;
   }
 
-  public String getUpgradedFromSegmentId()
-  {
-    return upgradedFromSegmentId;
-  }
-
   public String getTaskAllocatorId()
   {
     return taskAllocatorId;
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 0717c9b07ee..c85fd1c4960 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -80,7 +80,6 @@ import org.skife.jdbi.v2.TransactionCallback;
 import org.skife.jdbi.v2.TransactionStatus;
 import org.skife.jdbi.v2.Update;
 import org.skife.jdbi.v2.exceptions.CallbackFailedException;
-import org.skife.jdbi.v2.tweak.HandleCallback;
 import org.skife.jdbi.v2.util.ByteArrayMapper;
 
 import javax.annotation.Nullable;
@@ -350,8 +349,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   /**
    * Fetches all the pending segments, whose interval overlaps with the given 
search interval, from the metadata store.
    */
-  @VisibleForTesting
-  List<PendingSegmentRecord> getPendingSegmentsForIntervalWithHandle(
+  private List<PendingSegmentRecord> getPendingSegmentsForInterval(
       final Handle handle,
       final String dataSource,
       final Interval interval
@@ -390,7 +388,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     return pendingSegments.build();
   }
 
-  List<PendingSegmentRecord> getPendingSegmentsForTaskAllocatorIdWithHandle(
+  private List<PendingSegmentRecord> getPendingSegmentsForTaskAllocatorId(
       final Handle handle,
       final String dataSource,
       final String taskAllocatorId
@@ -580,7 +578,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                 );
               }
             }
-            SegmentPublishResult result = SegmentPublishResult.ok(
+            return SegmentPublishResult.ok(
                 insertSegments(
                     handle,
                     segmentsToInsert,
@@ -591,7 +589,6 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                 ),
                 upgradePendingSegmentsOverlappingWith(segmentsToInsert)
             );
-            return result;
           },
           3,
           getSqlMetadataMaxRetry()
@@ -740,21 +737,16 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   }
 
   @Override
+  @Nullable
   public SegmentIdWithShardSpec allocatePendingSegment(
       final String dataSource,
-      final String sequenceName,
-      @Nullable final String previousSegmentId,
       final Interval interval,
-      final PartialShardSpec partialShardSpec,
-      final String maxVersion,
       final boolean skipSegmentLineageCheck,
-      String taskAllocatorId
+      final SegmentCreateRequest createRequest
   )
   {
     Preconditions.checkNotNull(dataSource, "dataSource");
-    Preconditions.checkNotNull(sequenceName, "sequenceName");
     Preconditions.checkNotNull(interval, "interval");
-    Preconditions.checkNotNull(maxVersion, "version");
     final Interval allocateInterval = 
interval.withChronology(ISOChronology.getInstanceUTC());
 
     return connector.retryWithHandle(
@@ -776,24 +768,17 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
             return allocatePendingSegment(
                 handle,
                 dataSource,
-                sequenceName,
                 allocateInterval,
-                partialShardSpec,
-                maxVersion,
-                existingChunks,
-                taskAllocatorId
+                createRequest,
+                existingChunks
             );
           } else {
             return allocatePendingSegmentWithSegmentLineageCheck(
                 handle,
                 dataSource,
-                sequenceName,
-                previousSegmentId,
                 allocateInterval,
-                partialShardSpec,
-                maxVersion,
-                existingChunks,
-                taskAllocatorId
+                createRequest,
+                existingChunks
             );
           }
         }
@@ -854,7 +839,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       int currentPartitionNumber = 
maxSegmentId.getShardSpec().getPartitionNum();
 
       final List<PendingSegmentRecord> overlappingPendingSegments
-          = getPendingSegmentsForIntervalWithHandle(handle, datasource, 
replaceInterval);
+          = getPendingSegmentsForInterval(handle, datasource, replaceInterval);
 
       for (PendingSegmentRecord overlappingPendingSegment : 
overlappingPendingSegments) {
         final SegmentIdWithShardSpec pendingSegmentId = 
overlappingPendingSegment.getId();
@@ -929,17 +914,11 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck(
       final Handle handle,
       final String dataSource,
-      final String sequenceName,
-      @Nullable final String previousSegmentId,
       final Interval interval,
-      final PartialShardSpec partialShardSpec,
-      final String maxVersion,
-      final List<TimelineObjectHolder<String, DataSegment>> existingChunks,
-      final String taskAllocatorId
+      final SegmentCreateRequest createRequest,
+      final List<TimelineObjectHolder<String, DataSegment>> existingChunks
   ) throws IOException
   {
-    final String previousSegmentIdNotNull = previousSegmentId == null ? "" : 
previousSegmentId;
-
     final String sql = StringUtils.format(
         "SELECT payload FROM %s WHERE "
         + "dataSource = :dataSource AND "
@@ -950,15 +929,15 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     final Query<Map<String, Object>> query
         = handle.createQuery(sql)
                 .bind("dataSource", dataSource)
-                .bind("sequence_name", sequenceName)
-                .bind("sequence_prev_id", previousSegmentIdNotNull);
+                .bind("sequence_name", createRequest.getSequenceName())
+                .bind("sequence_prev_id", 
createRequest.getPreviousSegmentId());
 
     final String usedSegmentVersion = existingChunks.isEmpty() ? null : 
existingChunks.get(0).getVersion();
     final CheckExistingSegmentIdResult result = findExistingPendingSegment(
         query,
         interval,
-        sequenceName,
-        previousSegmentIdNotNull,
+        createRequest.getSequenceName(),
+        createRequest.getPreviousSegmentId(),
         usedSegmentVersion
     );
 
@@ -967,12 +946,12 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       return result.segmentIdentifier;
     }
 
-    final SegmentIdWithShardSpec newIdentifier = createNewSegment(
+    final SegmentIdWithShardSpec newIdentifier = createNewPendingSegment(
         handle,
         dataSource,
         interval,
-        partialShardSpec,
-        maxVersion,
+        createRequest.getPartialShardSpec(),
+        createRequest.getVersion(),
         existingChunks
     );
     if (newIdentifier == null) {
@@ -989,9 +968,9 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode(
         Hashing.sha1()
                .newHasher()
-               .putBytes(StringUtils.toUtf8(sequenceName))
+               .putBytes(StringUtils.toUtf8(createRequest.getSequenceName()))
                .putByte((byte) 0xff)
-               .putBytes(StringUtils.toUtf8(previousSegmentIdNotNull))
+               
.putBytes(StringUtils.toUtf8(createRequest.getPreviousSegmentId()))
                .putByte((byte) 0xff)
                .putBytes(StringUtils.toUtf8(newIdentifier.getVersion()))
                .hash()
@@ -1003,10 +982,10 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         newIdentifier,
         dataSource,
         interval,
-        previousSegmentIdNotNull,
-        sequenceName,
+        createRequest.getPreviousSegmentId(),
+        createRequest.getSequenceName(),
         sequenceNamePrevIdSha1,
-        taskAllocatorId
+        createRequest.getTaskAllocatorId()
     );
     return newIdentifier;
   }
@@ -1108,12 +1087,9 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   private SegmentIdWithShardSpec allocatePendingSegment(
       final Handle handle,
       final String dataSource,
-      final String sequenceName,
       final Interval interval,
-      final PartialShardSpec partialShardSpec,
-      final String maxVersion,
-      final List<TimelineObjectHolder<String, DataSegment>> existingChunks,
-      final String taskAllocatorId
+      final SegmentCreateRequest createRequest,
+      final List<TimelineObjectHolder<String, DataSegment>> existingChunks
   ) throws IOException
   {
     final String sql = StringUtils.format(
@@ -1128,14 +1104,14 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     final Query<Map<String, Object>> query
         = handle.createQuery(sql)
                 .bind("dataSource", dataSource)
-                .bind("sequence_name", sequenceName)
+                .bind("sequence_name", createRequest.getSequenceName())
                 .bind("start", interval.getStart().toString())
                 .bind("end", interval.getEnd().toString());
 
     final CheckExistingSegmentIdResult result = findExistingPendingSegment(
         query,
         interval,
-        sequenceName,
+        createRequest.getSequenceName(),
         null,
         existingChunks.isEmpty() ? null : existingChunks.get(0).getVersion()
     );
@@ -1144,12 +1120,12 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       return result.segmentIdentifier;
     }
 
-    final SegmentIdWithShardSpec newIdentifier = createNewSegment(
+    final SegmentIdWithShardSpec newIdentifier = createNewPendingSegment(
         handle,
         dataSource,
         interval,
-        partialShardSpec,
-        maxVersion,
+        createRequest.getPartialShardSpec(),
+        createRequest.getVersion(),
         existingChunks
     );
     if (newIdentifier == null) {
@@ -1166,7 +1142,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode(
         Hashing.sha1()
                .newHasher()
-               .putBytes(StringUtils.toUtf8(sequenceName))
+               .putBytes(StringUtils.toUtf8(createRequest.getSequenceName()))
                .putByte((byte) 0xff)
                .putLong(interval.getStartMillis())
                .putLong(interval.getEndMillis())
@@ -1183,14 +1159,14 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         dataSource,
         interval,
         "",
-        sequenceName,
+        createRequest.getSequenceName(),
         sequenceNamePrevIdSha1,
-        taskAllocatorId
+        createRequest.getTaskAllocatorId()
     );
 
     log.info(
-        "Created new pending segment[%s] for datasource[%s], sequence[%s], 
interval[%s].",
-        newIdentifier, dataSource, sequenceName, interval
+        "Created new pending segment[%s] for datasource[%s], interval[%s].",
+        newIdentifier, dataSource, interval
     );
 
     return newIdentifier;
@@ -1334,7 +1310,6 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   private static class CheckExistingSegmentIdResult
   {
     private final boolean found;
-    @Nullable
     private final SegmentIdWithShardSpec segmentIdentifier;
 
     CheckExistingSegmentIdResult(boolean found, @Nullable 
SegmentIdWithShardSpec segmentIdentifier)
@@ -1391,21 +1366,6 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     }
   }
 
-  private static void bindColumnValuesToQueryWithInCondition(
-      final String columnName,
-      final List<String> values,
-      final Update query
-  )
-  {
-    if (values == null) {
-      return;
-    }
-
-    for (int i = 0; i < values.size(); i++) {
-      query.bind(StringUtils.format("%s%d", columnName, i), values.get(i));
-    }
-  }
-
   private int deletePendingSegmentsById(Handle handle, String datasource, 
List<String> pendingSegmentIds)
   {
     if (pendingSegmentIds.isEmpty()) {
@@ -1419,7 +1379,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
             
SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id", 
pendingSegmentIds)
         )
     ).bind("dataSource", datasource);
-    bindColumnValuesToQueryWithInCondition("id", pendingSegmentIds, query);
+    SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition("id", 
pendingSegmentIds, query);
 
     return query.execute();
   }
@@ -1442,7 +1402,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     final String dataSource = appendSegments.iterator().next().getDataSource();
     final List<PendingSegmentRecord> segmentIdsForNewVersions = 
connector.retryTransaction(
         (handle, transactionStatus)
-            -> getPendingSegmentsForTaskAllocatorIdWithHandle(handle, 
dataSource, taskAllocatorId),
+            -> getPendingSegmentsForTaskAllocatorId(handle, dataSource, 
taskAllocatorId),
         0,
         SQLMetadataConnector.DEFAULT_MAX_TRIES
     );
@@ -1668,11 +1628,11 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     // across all shard specs (published + pending).
     // A pending segment having a higher partitionId must also be considered
     // to avoid clashes when inserting the pending segment created here.
-    final Set<SegmentIdWithShardSpec> pendingSegments = new HashSet<>(
-        getPendingSegmentsForIntervalWithHandle(handle, dataSource, 
interval).stream()
-                                                                             
.map(PendingSegmentRecord::getId)
-                                                                             
.collect(Collectors.toSet())
-    );
+    final Set<SegmentIdWithShardSpec> pendingSegments =
+        getPendingSegmentsForInterval(handle, dataSource, interval)
+            .stream()
+            .map(PendingSegmentRecord::getId)
+            .collect(Collectors.toSet());
 
     final Map<SegmentCreateRequest, PendingSegmentRecord> createdSegments = 
new HashMap<>();
     final Map<UniqueAllocateRequest, PendingSegmentRecord> 
uniqueRequestToSegment = new HashMap<>();
@@ -1686,7 +1646,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       if (uniqueRequestToSegment.containsKey(uniqueRequest)) {
         createdSegment = uniqueRequestToSegment.get(uniqueRequest);
       } else {
-        createdSegment = createNewSegment(
+        createdSegment = createNewPendingSegment(
             request,
             dataSource,
             interval,
@@ -1712,7 +1672,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     return createdSegments;
   }
 
-  private PendingSegmentRecord createNewSegment(
+  @Nullable
+  private PendingSegmentRecord createNewPendingSegment(
       SegmentCreateRequest request,
       String dataSource,
       Interval interval,
@@ -1775,17 +1736,14 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
           pendingSegmentId,
           request.getSequenceName(),
           request.getPreviousSegmentId(),
-          request.getUpgradedFromSegmentId(),
+          null,
           request.getTaskAllocatorId()
       );
 
     } else if (!overallMaxId.getInterval().equals(interval)) {
       log.warn(
           "Cannot allocate new segment for dataSource[%s], interval[%s], 
existingVersion[%s]: conflicting segment[%s].",
-          dataSource,
-          interval,
-          existingVersion,
-          overallMaxId
+          dataSource, interval, existingVersion, overallMaxId
       );
       return null;
     } else if (committedMaxId != null
@@ -1793,8 +1751,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                   == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) {
       log.warn(
           "Cannot allocate new segment because of unknown core partition size 
of segment[%s], shardSpec[%s]",
-          committedMaxId,
-          committedMaxId.getShardSpec()
+          committedMaxId, committedMaxId.getShardSpec()
       );
       return null;
     } else {
@@ -1815,28 +1772,20 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
           getTrueAllocatedId(pendingSegmentId),
           request.getSequenceName(),
           request.getPreviousSegmentId(),
-          request.getUpgradedFromSegmentId(),
+          null,
           request.getTaskAllocatorId()
       );
     }
   }
 
   /**
-   * This function creates a new segment for the given 
datasource/interval/etc. A critical
-   * aspect of the creation is to make sure that the new version & new 
partition number will make
-   * sense given the existing segments & pending segments also very important 
is to avoid
-   * clashes with existing pending & used/unused segments.
-   * @param handle Database handle
-   * @param dataSource datasource for the new segment
-   * @param interval interval for the new segment
+   * Creates a new pending segment for the given datasource and interval.
    * @param partialShardSpec Shard spec info minus segment id stuff
    * @param existingVersion Version of segments in interval, used to compute 
the version of the very first segment in
    *                        interval
-   * @return
-   * @throws IOException
    */
   @Nullable
-  private SegmentIdWithShardSpec createNewSegment(
+  private SegmentIdWithShardSpec createNewPendingSegment(
       final Handle handle,
       final String dataSource,
       final Interval interval,
@@ -1876,11 +1825,12 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     // across all shard specs (published + pending).
     // A pending segment having a higher partitionId must also be considered
     // to avoid clashes when inserting the pending segment created here.
-    final Set<SegmentIdWithShardSpec> pendings = new HashSet<>(
-        getPendingSegmentsForIntervalWithHandle(handle, dataSource, 
interval).stream()
-                                                                             
.map(PendingSegmentRecord::getId)
-                                                                             
.collect(Collectors.toSet())
-    );
+    final Set<SegmentIdWithShardSpec> pendings =
+        getPendingSegmentsForInterval(handle, dataSource, interval)
+            .stream()
+            .map(PendingSegmentRecord::getId)
+            .collect(Collectors.toSet());
+
     if (committedMaxId != null) {
       pendings.add(committedMaxId);
     }
@@ -1910,11 +1860,9 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     }
 
     if (overallMaxId == null) {
-      // When appending segments, null overallMaxId means that we are 
allocating the very initial
-      // segment for this time chunk.
-      // This code is executed when the Overlord coordinates segment 
allocation, which is either you append segments
-      // or you use segment lock. Since the core partitions set is not 
determined for appended segments, we set
-      // it 0. When you use segment lock, the core partitions set doesn't work 
with it. We simply set it 0 so that the
+      // We are allocating the very first segment for this time chunk.
+      // Set numCorePartitions to 0 as core partitions are not determined for 
append segments
+      // When you use segment lock, the core partitions set doesn't work with 
it. We simply set it 0 so that the
       // OvershadowableManager handles the atomic segment update.
       final int newPartitionId = 
partialShardSpec.useNonRootGenerationPartitionSpace()
                                  ? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
@@ -1929,10 +1877,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     } else if (!overallMaxId.getInterval().equals(interval)) {
       log.warn(
           "Cannot allocate new segment for dataSource[%s], interval[%s], 
existingVersion[%s]: conflicting segment[%s].",
-          dataSource,
-          interval,
-          existingVersion,
-          overallMaxId
+          dataSource, interval, existingVersion, overallMaxId
       );
       return null;
     } else if (committedMaxId != null
@@ -1940,14 +1885,12 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                   == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) {
       log.warn(
           "Cannot allocate new segment because of unknown core partition size 
of segment[%s], shardSpec[%s]",
-          committedMaxId,
-          committedMaxId.getShardSpec()
+          committedMaxId, committedMaxId.getShardSpec()
       );
       return null;
     } else {
-      // The number of core partitions must always be chosen from the set of 
used segments in the SegmentTimeline.
-      // When the core partitions have been dropped, using pending segments 
may lead to an incorrect state
-      // where the chunk is believed to have core partitions and queries 
results are incorrect.
+      // numCorePartitions must always be picked from the committedMaxId and 
not overallMaxId
+      // as overallMaxId may refer to a pending segment which might have stale 
info of numCorePartitions
       final SegmentIdWithShardSpec allocatedId = new SegmentIdWithShardSpec(
           dataSource,
           interval,
@@ -1963,7 +1906,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   }
 
   /**
-   * Verifies that the allocated id doesn't already exist in the druid 
segments table.
+   * Verifies that the allocated id doesn't already exist in the 
druid_segments table.
    * If yes, try to get the max unallocated id considering the unused segments 
for the datasource, version and interval
    * Otherwise, use the same id.
    * @param allocatedId The segment allcoted on the basis of used and pending 
segments
@@ -1977,7 +1920,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     }
 
     // If yes, try to compute allocated partition num using the max unused 
segment shard spec
-    SegmentId unusedMaxId = getUnusedMaxId(
+    SegmentId unusedMaxId = getMaxIdOfUnusedSegment(
         allocatedId.getDataSource(),
         allocatedId.getInterval(),
         allocatedId.getVersion()
@@ -2002,7 +1945,14 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     );
   }
 
-  private SegmentId getUnusedMaxId(String datasource, Interval interval, 
String version)
+  /**
+   * Determines the highest ID amongst unused segments for the given 
datasource,
+   * interval and version.
+   *
+   * @return null if no unused segment exists for the given parameters.
+   */
+  @Nullable
+  private SegmentId getMaxIdOfUnusedSegment(String datasource, Interval 
interval, String version)
   {
     List<String> unusedSegmentIds = 
retrieveUnusedSegmentIdsForExactIntervalAndVersion(
         datasource,
@@ -2134,7 +2084,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
               .bind("created_date", now)
               .bind("start", segment.getInterval().getStart().toString())
               .bind("end", segment.getInterval().getEnd().toString())
-              .bind("partitioned", (segment.getShardSpec() instanceof 
NoneShardSpec) ? false : true)
+              .bind("partitioned", !(segment.getShardSpec() instanceof 
NoneShardSpec))
               .bind("version", segment.getVersion())
               .bind("used", usedSegments.contains(segment))
               .bind("payload", jsonMapper.writeValueAsBytes(segment))
@@ -2330,9 +2280,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       Map<String, String> upgradedFromSegmentIdMap
   ) throws IOException
   {
-    boolean shouldPersistSchema = shouldPersistSchema(segmentSchemaMapping);
-
-    if (shouldPersistSchema) {
+    if (shouldPersistSchema(segmentSchemaMapping)) {
       persistSchema(handle, segments, segmentSchemaMapping);
     }
 
@@ -2407,6 +2355,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     return segmentsToInsert;
   }
 
+  @Nullable
   private SegmentMetadata getSegmentMetadataFromSchemaMappingOrUpgradeMetadata(
       final SegmentId segmentId,
       final SegmentSchemaMapping segmentSchemaMapping,
@@ -2786,27 +2735,18 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes()
     );
 
+    final String sql = "UPDATE %s SET "
+                       + "commit_metadata_payload = 
:new_commit_metadata_payload, "
+                       + "commit_metadata_sha1 = :new_commit_metadata_sha1 "
+                       + "WHERE dataSource = :dataSource";
     return connector.retryWithHandle(
-        new HandleCallback<Boolean>()
-        {
-          @Override
-          public Boolean withHandle(Handle handle)
-          {
-            final int numRows = handle.createStatement(
-                StringUtils.format(
-                    "UPDATE %s SET "
-                    + "commit_metadata_payload = :new_commit_metadata_payload, 
"
-                    + "commit_metadata_sha1 = :new_commit_metadata_sha1 "
-                    + "WHERE dataSource = :dataSource",
-                    dbTables.getDataSourceTable()
-                )
-            )
-                                      .bind("dataSource", dataSource)
-                                      .bind("new_commit_metadata_payload", 
newCommitMetadataBytes)
-                                      .bind("new_commit_metadata_sha1", 
newCommitMetadataSha1)
-                                      .execute();
-            return numRows == 1;
-          }
+        handle -> {
+          final int numRows = handle.createStatement(StringUtils.format(sql, 
dbTables.getDataSourceTable()))
+                                    .bind("dataSource", dataSource)
+                                    .bind("new_commit_metadata_payload", 
newCommitMetadataBytes)
+                                    .bind("new_commit_metadata_sha1", 
newCommitMetadataSha1)
+                                    .execute();
+          return numRows == 1;
         }
     );
   }
@@ -3028,7 +2968,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   public List<PendingSegmentRecord> getPendingSegments(String datasource, 
Interval interval)
   {
     return connector.retryWithHandle(
-        handle -> getPendingSegmentsForIntervalWithHandle(handle, datasource, 
interval)
+        handle -> getPendingSegmentsForInterval(handle, datasource, interval)
     );
   }
 
@@ -3178,7 +3118,6 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   {
     private final boolean failed;
     private final boolean canRetry;
-    @Nullable
     private final String errorMsg;
 
     public static final DataStoreMetadataUpdateResult SUCCESS = new 
DataStoreMetadataUpdateResult(false, false, null);
@@ -3198,7 +3137,6 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       this.failed = failed;
       this.canRetry = canRetry;
       this.errorMsg = null == errorMsg ? null : StringUtils.format(errorMsg, 
errorFormatArgs);
-
     }
 
     public boolean isFailed()
diff --git 
a/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java
 
b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java
index 57e01d76a44..567867bd97e 100644
--- 
a/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java
+++ 
b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java
@@ -36,7 +36,6 @@ public class SegmentCreateRequestTest
         null,
         "version",
         partialShardSpec,
-        null,
         null
     );
     Assert.assertEquals("sequence", request.getSequenceName());
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index a000fbec5a3..06bbf3b7ecd 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -2280,7 +2280,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     final PartialShardSpec partialShardSpec = 
NumberedPartialShardSpec.instance();
     final String dataSource = "ds";
     final Interval interval = Intervals.of("2017-01-01/2017-02-01");
-    final SegmentIdWithShardSpec identifier = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier = allocatePendingSegment(
         dataSource,
         "seq",
         null,
@@ -2293,7 +2293,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
 
     
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version",
 identifier.toString());
 
-    final SegmentIdWithShardSpec identifier1 = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier1 = allocatePendingSegment(
         dataSource,
         "seq",
         identifier.toString(),
@@ -2306,7 +2306,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
 
     
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1",
 identifier1.toString());
 
-    final SegmentIdWithShardSpec identifier2 = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier2 = allocatePendingSegment(
         dataSource,
         "seq",
         identifier1.toString(),
@@ -2319,7 +2319,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
 
     
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2",
 identifier2.toString());
 
-    final SegmentIdWithShardSpec identifier3 = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier3 = allocatePendingSegment(
         dataSource,
         "seq",
         identifier1.toString(),
@@ -2333,7 +2333,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2",
 identifier3.toString());
     Assert.assertEquals(identifier2, identifier3);
 
-    final SegmentIdWithShardSpec identifier4 = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier4 = allocatePendingSegment(
         dataSource,
         "seq1",
         null,
@@ -2370,7 +2370,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     final PartialShardSpec partialShardSpec = 
NumberedPartialShardSpec.instance();
     final String dataSource = "ds";
     final Interval interval = Intervals.of("2017-01-01/2017-02-01");
-    final SegmentIdWithShardSpec identifier = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier = allocatePendingSegment(
         dataSource,
         "seq",
         null,
@@ -2385,7 +2385,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     Assert.assertEquals(0, identifier.getShardSpec().getNumCorePartitions());
 
     // simulate one more load using kafka streaming (as if previous segment 
was published, note different sequence name)
-    final SegmentIdWithShardSpec identifier1 = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier1 = allocatePendingSegment(
         dataSource,
         "seq2",
         identifier.toString(),
@@ -2400,7 +2400,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     Assert.assertEquals(0, identifier1.getShardSpec().getNumCorePartitions());
 
     // simulate one more load using kafka streaming (as if previous segment 
was published, note different sequence name)
-    final SegmentIdWithShardSpec identifier2 = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier2 = allocatePendingSegment(
         dataSource,
         "seq3",
         identifier1.toString(),
@@ -2431,7 +2431,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new",
 ids.get(0));
 
     // one more load on same interval:
-    final SegmentIdWithShardSpec identifier3 = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier3 = allocatePendingSegment(
         dataSource,
         "seq4",
         identifier1.toString(),
@@ -2450,7 +2450,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
 
     // and final load, this reproduces an issue that could happen with 
multiple streaming appends,
     // followed by a reindex, followed by a drop, and more streaming data 
coming in for same interval
-    final SegmentIdWithShardSpec identifier4 = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier4 = allocatePendingSegment(
         dataSource,
         "seq5",
         identifier1.toString(),
@@ -2484,7 +2484,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     final PartialShardSpec partialShardSpec = 
NumberedPartialShardSpec.instance();
     final String dataSource = "ds";
     final Interval interval = Intervals.of("2017-01-01/2017-02-01");
-    final SegmentIdWithShardSpec identifier = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier = allocatePendingSegment(
         dataSource,
         "seq",
         null,
@@ -2513,7 +2513,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
 
 
     // 1.1) simulate one more append load  (as if previous segment was 
published, note different sequence name)
-    final SegmentIdWithShardSpec identifier1 = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier1 = allocatePendingSegment(
         dataSource,
         "seq2",
         identifier.toString(),
@@ -2542,7 +2542,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
 
 
     // 1.2) simulate one more append load  (as if previous segment was 
published, note different sequence name)
-    final SegmentIdWithShardSpec identifier2 = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier2 = allocatePendingSegment(
         dataSource,
         "seq3",
         identifier1.toString(),
@@ -2597,7 +2597,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     // unused segment:
 
     // 4) pending segment of version = B, id = 1 <= appending new data, aborted
-    final SegmentIdWithShardSpec identifier3 = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier3 = allocatePendingSegment(
         dataSource,
         "seq4",
         identifier2.toString(),
@@ -2632,7 +2632,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     Assert.assertEquals(1, unused.size());
 
     // Simulate one more append load
-    final SegmentIdWithShardSpec identifier4 = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier4 = allocatePendingSegment(
         dataSource,
         "seq5",
         identifier1.toString(),
@@ -2678,7 +2678,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     final Interval interval = Intervals.of("2017-01-01/2017-02-01");
     final String sequenceName = "seq";
 
-    final SegmentCreateRequest request = new 
SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null, null);
+    final SegmentCreateRequest request = new 
SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null);
     final SegmentIdWithShardSpec segmentId0 = 
coordinator.allocatePendingSegments(
         dataSource,
         interval,
@@ -2690,7 +2690,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", 
segmentId0.toString());
 
     final SegmentCreateRequest request1 =
-        new SegmentCreateRequest(sequenceName, segmentId0.toString(), 
segmentId0.getVersion(), partialShardSpec, null, null);
+        new SegmentCreateRequest(sequenceName, segmentId0.toString(), 
segmentId0.getVersion(), partialShardSpec, null);
     final SegmentIdWithShardSpec segmentId1 = 
coordinator.allocatePendingSegments(
         dataSource,
         interval,
@@ -2702,7 +2702,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1",
 segmentId1.toString());
 
     final SegmentCreateRequest request2 =
-        new SegmentCreateRequest(sequenceName, segmentId1.toString(), 
segmentId1.getVersion(), partialShardSpec, null, null);
+        new SegmentCreateRequest(sequenceName, segmentId1.toString(), 
segmentId1.getVersion(), partialShardSpec, null);
     final SegmentIdWithShardSpec segmentId2 = 
coordinator.allocatePendingSegments(
         dataSource,
         interval,
@@ -2714,7 +2714,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2",
 segmentId2.toString());
 
     final SegmentCreateRequest request3 =
-        new SegmentCreateRequest(sequenceName, segmentId1.toString(), 
segmentId1.getVersion(), partialShardSpec, null, null);
+        new SegmentCreateRequest(sequenceName, segmentId1.toString(), 
segmentId1.getVersion(), partialShardSpec, null);
     final SegmentIdWithShardSpec segmentId3 = 
coordinator.allocatePendingSegments(
         dataSource,
         interval,
@@ -2727,7 +2727,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     Assert.assertEquals(segmentId2, segmentId3);
 
     final SegmentCreateRequest request4 =
-        new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null, 
null);
+        new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null);
     final SegmentIdWithShardSpec segmentId4 = 
coordinator.allocatePendingSegments(
         dataSource,
         interval,
@@ -2747,7 +2747,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     final Interval interval = Intervals.of("2017-01-01/2017-02-01");
     final String sequenceName = "seq";
 
-    final SegmentCreateRequest request = new 
SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null, null);
+    final SegmentCreateRequest request = new 
SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null);
     final SegmentIdWithShardSpec segmentId0 = 
coordinator.allocatePendingSegments(
         dataSource,
         interval,
@@ -2759,7 +2759,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", 
segmentId0.toString());
 
     final SegmentCreateRequest request1 =
-        new SegmentCreateRequest(sequenceName, segmentId0.toString(), 
segmentId0.getVersion(), partialShardSpec, null, null);
+        new SegmentCreateRequest(sequenceName, segmentId0.toString(), 
segmentId0.getVersion(), partialShardSpec, null);
     final SegmentIdWithShardSpec segmentId1 = 
coordinator.allocatePendingSegments(
         dataSource,
         interval,
@@ -2771,7 +2771,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1",
 segmentId1.toString());
 
     final SegmentCreateRequest request2 =
-        new SegmentCreateRequest(sequenceName, segmentId1.toString(), 
segmentId1.getVersion(), partialShardSpec, null, null);
+        new SegmentCreateRequest(sequenceName, segmentId1.toString(), 
segmentId1.getVersion(), partialShardSpec, null);
     final SegmentIdWithShardSpec segmentId2 = 
coordinator.allocatePendingSegments(
         dataSource,
         interval,
@@ -2783,7 +2783,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2",
 segmentId2.toString());
 
     final SegmentCreateRequest request3 =
-        new SegmentCreateRequest(sequenceName, segmentId1.toString(), 
segmentId1.getVersion(), partialShardSpec, null, null);
+        new SegmentCreateRequest(sequenceName, segmentId1.toString(), 
segmentId1.getVersion(), partialShardSpec, null);
     final SegmentIdWithShardSpec segmentId3 = 
coordinator.allocatePendingSegments(
         dataSource,
         interval,
@@ -2796,7 +2796,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     Assert.assertEquals(segmentId2, segmentId3);
 
     final SegmentCreateRequest request4 =
-        new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null, 
null);
+        new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null);
     final SegmentIdWithShardSpec segmentId4 = 
coordinator.allocatePendingSegments(
         dataSource,
         interval,
@@ -2833,7 +2833,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     final PartialShardSpec partialShardSpec = 
NumberedPartialShardSpec.instance();
     final String dataSource = "ds";
     final Interval interval = Intervals.of("2017-01-01/2017-02-01");
-    final SegmentIdWithShardSpec identifier = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier = allocatePendingSegment(
         dataSource,
         "seq",
         null,
@@ -2857,7 +2857,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     final DateTime begin = DateTimes.nowUtc();
 
     for (int i = 0; i < 10; i++) {
-      final SegmentIdWithShardSpec identifier = 
coordinator.allocatePendingSegment(
+      final SegmentIdWithShardSpec identifier = allocatePendingSegment(
           dataSource,
           "seq",
           prevSegmentId,
@@ -2873,7 +2873,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
 
     final DateTime secondBegin = DateTimes.nowUtc();
     for (int i = 0; i < 5; i++) {
-      final SegmentIdWithShardSpec identifier = 
coordinator.allocatePendingSegment(
+      final SegmentIdWithShardSpec identifier = allocatePendingSegment(
           dataSource,
           "seq",
           prevSegmentId,
@@ -2901,7 +2901,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     String prevSegmentId = null;
 
     for (int i = 0; i < 10; i++) {
-      final SegmentIdWithShardSpec identifier = 
coordinator.allocatePendingSegment(
+      final SegmentIdWithShardSpec identifier = allocatePendingSegment(
           dataSource,
           "seq",
           prevSegmentId,
@@ -2970,7 +2970,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     final String dataSource = "ds";
     final Interval interval = Intervals.of("2017-01-01/2017-02-01");
 
-    SegmentIdWithShardSpec id = coordinator.allocatePendingSegment(
+    SegmentIdWithShardSpec id = allocatePendingSegment(
         dataSource,
         "seq",
         null,
@@ -3003,7 +3003,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
         new 
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)
     );
 
-    id = coordinator.allocatePendingSegment(
+    id = allocatePendingSegment(
         dataSource,
         "seq2",
         null,
@@ -3036,7 +3036,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
         new 
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)
     );
 
-    id = coordinator.allocatePendingSegment(
+    id = allocatePendingSegment(
         dataSource,
         "seq3",
         null,
@@ -3084,7 +3084,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
       );
     }
     coordinator.commitSegments(originalSegments, new 
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION));
-    final SegmentIdWithShardSpec id = coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec id = allocatePendingSegment(
         datasource,
         "seq",
         null,
@@ -3130,7 +3130,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
       );
     }
     coordinator.commitSegments(originalSegments, new 
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION));
-    final SegmentIdWithShardSpec id = coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec id = allocatePendingSegment(
         datasource,
         "seq",
         null,
@@ -3377,7 +3377,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     Assert.assertTrue(coordinator.commitSegments(tombstones, new 
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)).containsAll(tombstones));
 
     // Allocate and commit a data segment by appending to the same interval
-    final SegmentIdWithShardSpec identifier = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier = allocatePendingSegment(
         TestDataSource.WIKI,
         "seq",
         tombstoneSegment.getVersion(),
@@ -3432,7 +3432,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     Assert.assertTrue(coordinator.commitSegments(tombstones, new 
SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)).containsAll(tombstones));
 
     // Allocate and commit a data segment by appending to the same interval
-    final SegmentIdWithShardSpec identifier = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec identifier = allocatePendingSegment(
         TestDataSource.WIKI,
         "seq",
         tombstoneSegment.getVersion(),
@@ -3471,7 +3471,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
   @Test
   public void testSegmentIdShouldNotBeReallocated()
   {
-    final SegmentIdWithShardSpec idWithNullTaskAllocator = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec idWithNullTaskAllocator = 
allocatePendingSegment(
         TestDataSource.WIKI,
         "seq",
         "0",
@@ -3487,7 +3487,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
         idWithNullTaskAllocator.getShardSpec()
     );
 
-    final SegmentIdWithShardSpec idWithValidTaskAllocator = 
coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec idWithValidTaskAllocator = 
allocatePendingSegment(
         TestDataSource.WIKI,
         "seq",
         "1",
@@ -3510,7 +3510,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     // Mark all segments as unused
     coordinator.markSegmentsAsUnusedWithinInterval(TestDataSource.WIKI, 
Intervals.ETERNITY);
 
-    final SegmentIdWithShardSpec theId = coordinator.allocatePendingSegment(
+    final SegmentIdWithShardSpec theId = allocatePendingSegment(
         TestDataSource.WIKI,
         "seq",
         "2",
@@ -3791,6 +3791,30 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     );
   }
 
+  private SegmentIdWithShardSpec allocatePendingSegment(
+      String datasource,
+      String sequenceName,
+      String previousSegmentId,
+      Interval interval,
+      PartialShardSpec partialShardSpec,
+      String maxVersion,
+      boolean skipSegmentLineageCheck,
+      String taskAllocatorId
+  )
+  {
+    return coordinator.allocatePendingSegment(
+        datasource,
+        interval,
+        skipSegmentLineageCheck,
+        new SegmentCreateRequest(
+            sequenceName,
+            previousSegmentId,
+            maxVersion,
+            partialShardSpec,
+            taskAllocatorId
+        )
+    );
+  }
 
   private void insertUsedSegments(Set<DataSegment> segments, Map<String, 
String> upgradedFromSegmentIdMap)
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to