This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a8b9165  Allow for appending tasks to co-exist with each other. 
(#12041)
a8b9165 is described below

commit a8b916576dc8be48ea49c4b7e22fd1f53140261b
Author: imply-cheddar <[email protected]>
AuthorDate: Fri Dec 10 09:46:40 2021 +0900

    Allow for appending tasks to co-exist with each other. (#12041)
    
    * Allow for appending tasks to co-exist with each other.
    
    Add a config parameter for appending tasks to allow them to
    use a SHARED lock.  This will allow multiple appending tasks
    to add segments to the same datasource at the same time.
    
    This config should actually be the default, but it is added
    as a config to enable a smooth transition/validation in
    production settings before forcing it as the default
    behavior going forward.
    
    This change leverages the TaskLockType.SHARED that existed
    previously, this used to carry the semantics of a READ lock,
    which was "escalated" when the task wanted to actually
    persist the segment.  As of many moons before this diff, the
    SHARED lock had stopped being used but was still piped into
    the code.  It turns out that with a few tweaks, it can be
    adjusted to be a shared lock for append tasks to allow them
    all to write to the same datasource, so that is what this does.
    
    * Can only reuse the shared lock if using the same groupId
    
    * Need to serialize out the task lock type
    
    * Adjust Unit tests to expect new field in JSON
---
 .../indexing/common/actions/SegmentAllocateAction.java | 17 +++++++++++++----
 .../indexing/common/task/AbstractBatchIndexTask.java   | 13 ++++++++-----
 .../task/AppenderatorDriverRealtimeIndexTask.java      |  9 ++++++---
 .../apache/druid/indexing/common/task/IndexTask.java   |  8 ++++++--
 .../task/OverlordCoordinatingSegmentAllocator.java     |  3 ++-
 .../druid/indexing/common/task/TaskLockHelper.java     | 14 +++++++++++++-
 .../org/apache/druid/indexing/common/task/Tasks.java   |  2 ++
 .../batch/parallel/ParallelIndexSupervisorTask.java    |  3 ++-
 .../common/task/batch/parallel/SinglePhaseSubTask.java |  3 ++-
 .../apache/druid/indexing/overlord/TaskLockbox.java    | 11 ++++++-----
 .../seekablestream/SeekableStreamIndexTask.java        |  6 +++++-
 .../common/actions/SegmentAllocateActionSerdeTest.java |  8 ++++++--
 .../common/actions/SegmentAllocateActionTest.java      |  6 ++++--
 .../parallel/SinglePhaseParallelIndexingTest.java      | 18 ++++++++++++++++++
 .../druid/indexing/overlord/TaskLockboxTest.java       |  2 +-
 15 files changed, 94 insertions(+), 29 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
index 3a168b0..f61dad9 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java
@@ -80,6 +80,7 @@ public class SegmentAllocateAction implements 
TaskAction<SegmentIdWithShardSpec>
   private final boolean skipSegmentLineageCheck;
   private final PartialShardSpec partialShardSpec;
   private final LockGranularity lockGranularity;
+  private final TaskLockType taskLockType;
 
   @JsonCreator
   public SegmentAllocateAction(
@@ -92,7 +93,8 @@ public class SegmentAllocateAction implements 
TaskAction<SegmentIdWithShardSpec>
       @JsonProperty("skipSegmentLineageCheck") boolean skipSegmentLineageCheck,
       // nullable for backward compatibility
       @JsonProperty("shardSpecFactory") @Nullable PartialShardSpec 
partialShardSpec,
-      @JsonProperty("lockGranularity") @Nullable LockGranularity 
lockGranularity // nullable for backward compatibility
+      @JsonProperty("lockGranularity") @Nullable LockGranularity 
lockGranularity,
+      @JsonProperty("taskLockType") @Nullable TaskLockType taskLockType
   )
   {
     this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
@@ -107,6 +109,7 @@ public class SegmentAllocateAction implements 
TaskAction<SegmentIdWithShardSpec>
     this.skipSegmentLineageCheck = skipSegmentLineageCheck;
     this.partialShardSpec = partialShardSpec == null ? 
NumberedPartialShardSpec.instance() : partialShardSpec;
     this.lockGranularity = lockGranularity == null ? 
LockGranularity.TIME_CHUNK : lockGranularity;
+    this.taskLockType = taskLockType == null ? TaskLockType.EXCLUSIVE : 
taskLockType;
   }
 
   @JsonProperty
@@ -163,6 +166,12 @@ public class SegmentAllocateAction implements 
TaskAction<SegmentIdWithShardSpec>
     return lockGranularity;
   }
 
+  @JsonProperty
+  public TaskLockType getTaskLockType()
+  {
+    return taskLockType;
+  }
+
   @Override
   public TypeReference<SegmentIdWithShardSpec> getReturnTypeReference()
   {
@@ -290,13 +299,13 @@ public class SegmentAllocateAction implements 
TaskAction<SegmentIdWithShardSpec>
       boolean logOnFail
   )
   {
-    // This action is always used by appending tasks, which cannot change the 
segment granularity of existing
-    // dataSources. So, all lock requests should be segmentLock.
+    // This action is always used by appending tasks, so if it is a time_chunk 
lock then we allow it to be
+    // shared with other appending tasks as well
     final LockResult lockResult = toolbox.getTaskLockbox().tryLock(
         task,
         new LockRequestForNewSegment(
             lockGranularity,
-            TaskLockType.EXCLUSIVE,
+            taskLockType,
             task.getGroupId(),
             dataSource,
             tryInterval,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
index 59ed883..7a6b8df 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java
@@ -279,17 +279,18 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
    *
    * @return whether the lock was acquired
    */
-  public boolean determineLockGranularityAndTryLock(TaskActionClient client, 
List<Interval> intervals)
+  public boolean determineLockGranularityAndTryLock(TaskActionClient client, 
List<Interval> intervals, IndexIOConfig ioConfig)
       throws IOException
   {
     final boolean forceTimeChunkLock = getContextValue(
         Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
         Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
     );
+    final boolean useSharedLock = ioConfig.isAppendToExisting() && 
getContextValue(Tasks.USE_SHARED_LOCK, false);
     // Respect task context value most.
     if (forceTimeChunkLock) {
       log.info("[%s] is set to true in task context. Use timeChunk lock", 
Tasks.FORCE_TIME_CHUNK_LOCK_KEY);
-      taskLockHelper = new TaskLockHelper(false);
+      taskLockHelper = new TaskLockHelper(false, useSharedLock);
       if (!intervals.isEmpty()) {
         return tryTimeChunkLock(client, intervals);
       } else {
@@ -298,7 +299,7 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
     } else {
       if (!intervals.isEmpty()) {
         final LockGranularityDetermineResult result = 
determineSegmentGranularity(client, intervals);
-        taskLockHelper = new TaskLockHelper(result.lockGranularity == 
LockGranularity.SEGMENT);
+        taskLockHelper = new TaskLockHelper(result.lockGranularity == 
LockGranularity.SEGMENT, useSharedLock);
         return tryLockWithDetermineResult(client, result);
       } else {
         // This branch is the only one that will not initialize taskLockHelper.
@@ -326,9 +327,11 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
         Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
         Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK
     );
+    final boolean useSharedLock = getContextValue(Tasks.USE_SHARED_LOCK, 
false);
+
     if (forceTimeChunkLock) {
       log.info("[%s] is set to true in task context. Use timeChunk lock", 
Tasks.FORCE_TIME_CHUNK_LOCK_KEY);
-      taskLockHelper = new TaskLockHelper(false);
+      taskLockHelper = new TaskLockHelper(false, useSharedLock);
       segmentCheckFunction.accept(LockGranularity.TIME_CHUNK, segments);
       return tryTimeChunkLock(
           client,
@@ -336,7 +339,7 @@ public abstract class AbstractBatchIndexTask extends 
AbstractTask
       );
     } else {
       final LockGranularityDetermineResult result = 
determineSegmentGranularity(segments);
-      taskLockHelper = new TaskLockHelper(result.lockGranularity == 
LockGranularity.SEGMENT);
+      taskLockHelper = new TaskLockHelper(result.lockGranularity == 
LockGranularity.SEGMENT, useSharedLock);
       segmentCheckFunction.accept(result.lockGranularity, segments);
       return tryLockWithDetermineResult(client, result);
     }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
index 52c8026..1ef23b7 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java
@@ -276,7 +276,8 @@ public class AppenderatorDriverRealtimeIndexTask extends 
AbstractTask implements
     DiscoveryDruidNode discoveryDruidNode = createDiscoveryDruidNode(toolbox);
 
     appenderator = newAppenderator(dataSchema, tuningConfig, metrics, toolbox);
-    StreamAppenderatorDriver driver = newDriver(dataSchema, appenderator, 
toolbox, metrics);
+    TaskLockType lockType = getContextValue(Tasks.USE_SHARED_LOCK, false) ? 
TaskLockType.SHARED : TaskLockType.EXCLUSIVE;
+    StreamAppenderatorDriver driver = newDriver(dataSchema, appenderator, 
toolbox, metrics, lockType);
 
     try {
       log.debug("Found chat handler of class[%s]", 
toolbox.getChatHandlerProvider().getClass().getName());
@@ -787,7 +788,8 @@ public class AppenderatorDriverRealtimeIndexTask extends 
AbstractTask implements
       final DataSchema dataSchema,
       final Appenderator appenderator,
       final TaskToolbox toolbox,
-      final FireDepartmentMetrics metrics
+      final FireDepartmentMetrics metrics,
+      final TaskLockType lockType
   )
   {
     return new StreamAppenderatorDriver(
@@ -804,7 +806,8 @@ public class AppenderatorDriverRealtimeIndexTask extends 
AbstractTask implements
                 previousSegmentId,
                 skipSegmentLineageCheck,
                 NumberedPartialShardSpec.instance(),
-                LockGranularity.TIME_CHUNK
+                LockGranularity.TIME_CHUNK,
+                lockType
             )
         ),
         toolbox.getSegmentHandoffNotifierFactory(),
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
index df2e81d..d99013d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
@@ -238,7 +238,8 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
     }
     return determineLockGranularityAndTryLock(
         taskActionClient,
-        ingestionSchema.dataSchema.getGranularitySpec().inputIntervals()
+        ingestionSchema.dataSchema.getGranularitySpec().inputIntervals(),
+        ingestionSchema.getIOConfig()
     );
   }
 
@@ -489,7 +490,10 @@ public class IndexTask extends AbstractBatchIndexTask 
implements ChatHandler
       final List<Interval> allocateIntervals = new 
ArrayList<>(partitionAnalysis.getAllIntervalsToIndex());
       final DataSchema dataSchema;
       if (determineIntervals) {
-        if (!determineLockGranularityAndTryLock(toolbox.getTaskActionClient(), 
allocateIntervals)) {
+        final boolean gotLocks = determineLockGranularityAndTryLock(
+            toolbox.getTaskActionClient(), allocateIntervals, 
ingestionSchema.getIOConfig()
+        );
+        if (!gotLocks) {
           throw new ISE("Failed to get locks for intervals[%s]", 
allocateIntervals);
         }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
index 6efd39a..d202258 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/OverlordCoordinatingSegmentAllocator.java
@@ -86,7 +86,8 @@ public class OverlordCoordinatingSegmentAllocator implements 
SegmentAllocatorFor
               previousSegmentId,
               skipSegmentLineageCheck,
               partialShardSpec,
-              taskLockHelper.getLockGranularityToUse()
+              taskLockHelper.getLockGranularityToUse(),
+              taskLockHelper.getLockTypeToUse()
           );
         }
     );
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskLockHelper.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskLockHelper.java
index 6a4d360..e27b29f 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskLockHelper.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskLockHelper.java
@@ -57,6 +57,7 @@ public class TaskLockHelper
   private final Map<Interval, OverwritingRootGenerationPartitions> 
overwritingRootGenPartitions = new HashMap<>();
   private final Set<DataSegment> lockedExistingSegments = new HashSet<>();
   private final boolean useSegmentLock;
+  private final boolean useSharedLock;
 
   @Nullable
   private Granularity knownSegmentGranularity;
@@ -90,9 +91,10 @@ public class TaskLockHelper
     }
   }
 
-  public TaskLockHelper(boolean useSegmentLock)
+  public TaskLockHelper(boolean useSegmentLock, boolean useSharedLock)
   {
     this.useSegmentLock = useSegmentLock;
+    this.useSharedLock = useSharedLock;
   }
 
   public boolean isUseSegmentLock()
@@ -105,6 +107,16 @@ public class TaskLockHelper
     return useSegmentLock ? LockGranularity.SEGMENT : 
LockGranularity.TIME_CHUNK;
   }
 
+  public boolean isUseSharedLock()
+  {
+    return useSharedLock;
+  }
+
+  public TaskLockType getLockTypeToUse()
+  {
+    return useSharedLock ? TaskLockType.SHARED : TaskLockType.EXCLUSIVE;
+  }
+
   public boolean hasLockedExistingSegments()
   {
     return !lockedExistingSegments.isEmpty();
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
index abd5424..045cee3 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Tasks.java
@@ -50,6 +50,8 @@ public class Tasks
   public static final String PRIORITY_KEY = "priority";
   public static final String LOCK_TIMEOUT_KEY = "taskLockTimeout";
   public static final String FORCE_TIME_CHUNK_LOCK_KEY = "forceTimeChunkLock";
+  public static final String USE_SHARED_LOCK = "useSharedLock";
+
   /**
    * This context is used in compaction. When it is set in the context, the 
segments created by the task
    * will fill 'lastCompactionState' in its metadata. This will be used to 
track what segments are compacted or not.
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 4b1e246..7972156 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -384,7 +384,8 @@ public class ParallelIndexSupervisorTask extends 
AbstractBatchIndexTask implemen
   {
     return determineLockGranularityAndTryLock(
         taskActionClient,
-        ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()
+        ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals(),
+        ingestionSchema.getIOConfig()
     );
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 0944128..0a9f154 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -194,7 +194,8 @@ public class SinglePhaseSubTask extends 
AbstractBatchSubtask implements ChatHand
   {
     return determineLockGranularityAndTryLock(
         new SurrogateTaskActionClient(supervisorTaskId, taskActionClient),
-        ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals()
+        ingestionSchema.getDataSchema().getGranularitySpec().inputIntervals(),
+        ingestionSchema.getIOConfig()
     );
   }
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 0e2e25e..a53af64 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -579,9 +579,8 @@ public class TaskLockbox
           .stream()
           .allMatch(interval -> {
             final List<TaskLockPosse> lockPosses = 
getOnlyTaskLockPosseContainingInterval(task, interval);
-            // Tasks cannot enter the critical section with a shared lock
-            return 
lockPosses.stream().map(TaskLockPosse::getTaskLock).allMatch(
-                lock -> !lock.isRevoked() && lock.getType() != 
TaskLockType.SHARED
+            return 
lockPosses.stream().map(TaskLockPosse::getTaskLock).noneMatch(
+                lock -> lock.isRevoked()
             );
           });
     }
@@ -1134,8 +1133,10 @@ public class TaskLockbox
       if (taskLock.getType() == request.getType() && taskLock.getGranularity() 
== request.getGranularity()) {
         switch (taskLock.getType()) {
           case SHARED:
-            // All shared lock is not reusable. Instead, a new lock posse is 
created for each lock request.
-            // See createOrFindLockPosse().
+            if (request instanceof TimeChunkLockRequest) {
+              return taskLock.getInterval().contains(request.getInterval())
+                     && taskLock.getGroupId().equals(request.getGroupId());
+            }
             return false;
           case EXCLUSIVE:
             if (request instanceof TimeChunkLockRequest) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 3f91d11..4fac734 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -30,6 +30,7 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
 import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
 import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -69,6 +70,7 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
   protected final SeekableStreamIndexTaskIOConfig<PartitionIdType, 
SequenceOffsetType> ioConfig;
   protected final Map<String, Object> context;
   protected final LockGranularity lockGranularityToUse;
+  protected final TaskLockType lockTypeToUse;
 
   // Lazily initialized, to avoid calling it on the overlord when tasks are 
instantiated.
   // See https://github.com/apache/druid/issues/7724 for issues that can cause.
@@ -103,6 +105,7 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
     this.lockGranularityToUse = 
getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, 
Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK)
                                 ? LockGranularity.TIME_CHUNK
                                 : LockGranularity.SEGMENT;
+    this.lockTypeToUse = getContextValue(Tasks.USE_SHARED_LOCK, false) ? 
TaskLockType.SHARED : TaskLockType.EXCLUSIVE;
   }
 
   protected static String getFormattedGroupId(String dataSource, String type)
@@ -222,7 +225,8 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
                 previousSegmentId,
                 skipSegmentLineageCheck,
                 NumberedPartialShardSpec.instance(),
-                lockGranularityToUse
+                lockGranularityToUse,
+                lockTypeToUse
             )
         ),
         toolbox.getSegmentHandoffNotifierFactory(),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionSerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionSerdeTest.java
index 00ea876..4bed168 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionSerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionSerdeTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.actions;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -52,7 +53,8 @@ public class SegmentAllocateActionSerdeTest
         "prev",
         false,
         NumberedPartialShardSpec.instance(),
-        LockGranularity.SEGMENT
+        LockGranularity.SEGMENT,
+        null
     );
   }
 
@@ -71,6 +73,7 @@ public class SegmentAllocateActionSerdeTest
     Assert.assertEquals(target.getSequenceName(), fromJson.getSequenceName());
     Assert.assertEquals(target.getPreviousSegmentId(), 
fromJson.getPreviousSegmentId());
     Assert.assertEquals(target.isSkipSegmentLineageCheck(), 
fromJson.isSkipSegmentLineageCheck());
+    Assert.assertEquals(TaskLockType.EXCLUSIVE, target.getTaskLockType());
   }
 
   @Test
@@ -81,7 +84,7 @@ public class SegmentAllocateActionSerdeTest
         Map.class
     );
 
-    Assert.assertEquals(10, fromJson.size());
+    Assert.assertEquals(11, fromJson.size());
     Assert.assertEquals(SegmentAllocateAction.TYPE, fromJson.get("type"));
     Assert.assertEquals(target.getDataSource(), fromJson.get("dataSource"));
     Assert.assertEquals(target.getTimestamp(), DateTimes.of((String) 
fromJson.get("timestamp")));
@@ -98,5 +101,6 @@ public class SegmentAllocateActionSerdeTest
     Assert.assertEquals(target.isSkipSegmentLineageCheck(), 
fromJson.get("skipSegmentLineageCheck"));
     Assert.assertEquals(ImmutableMap.of("type", "numbered"), 
fromJson.get("shardSpecFactory"));
     Assert.assertEquals(target.getLockGranularity(), 
LockGranularity.valueOf((String) fromJson.get("lockGranularity")));
+    Assert.assertEquals(target.getTaskLockType(), 
TaskLockType.valueOf((String) fromJson.get("taskLockType")));
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
index bce5502..a7e85a0 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
@@ -873,7 +873,8 @@ public class SegmentAllocateActionTest
         null,
         true,
         new HashBasedNumberedPartialShardSpec(ImmutableList.of("dim1"), 1, 2, 
null),
-        lockGranularity
+        lockGranularity,
+        null
     );
     final SegmentIdWithShardSpec segmentIdentifier = action.perform(task, 
taskActionTestKit.getTaskActionToolbox());
     Assert.assertNotNull(segmentIdentifier);
@@ -986,7 +987,8 @@ public class SegmentAllocateActionTest
         sequencePreviousId,
         false,
         partialShardSpec,
-        lockGranularity
+        lockGranularity,
+        null
     );
     return action.perform(task, taskActionTestKit.getTaskActionToolbox());
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index 3dc27c2..98bb399 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -71,6 +71,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 @RunWith(Parameterized.class)
@@ -619,6 +620,23 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
   }
 
   @Test
+  public void testMultipleAppends()
+  {
+    final Interval interval = null;
+    final ParallelIndexSupervisorTask task = newTask(interval, 
Granularities.DAY, true, true);
+    final ParallelIndexSupervisorTask task2 = newTask(interval, 
Granularities.DAY, true, true);
+    task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
+    task.addToContext(Tasks.USE_SHARED_LOCK, true);
+    task2.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, true);
+    task2.addToContext(Tasks.USE_SHARED_LOCK, true);
+    getIndexingServiceClient().runTask(task.getId(), task);
+    getIndexingServiceClient().runTask(task2.getId(), task2);
+
+    Assert.assertEquals(TaskState.SUCCESS, 
getIndexingServiceClient().waitToFinish(task, 1, 
TimeUnit.DAYS).getStatusCode());
+    Assert.assertEquals(TaskState.SUCCESS, 
getIndexingServiceClient().waitToFinish(task2, 1, 
TimeUnit.DAYS).getStatusCode());
+  }
+
+  @Test
   public void testRunParallelWithNoInputSplitToProcess()
   {
     // The input source filter on this task does not match any input
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 445b022..1da1ae1 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -491,7 +491,7 @@ public class TaskLockboxTest
     lockbox.add(task);
     Assert.assertTrue(tryTimeChunkLock(TaskLockType.SHARED, task, 
interval).isOk());
 
-    Assert.assertFalse(
+    Assert.assertTrue(
         lockbox.doInCriticalSection(
             task,
             Collections.singletonList(interval),

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to