clintropolis commented on a change in pull request #11189:
URL: https://github.com/apache/druid/pull/11189#discussion_r624978412



##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java
##########
@@ -398,7 +398,7 @@ private StringFullResponseHolder submitRequest(
         } else {
           try {
             final long sleepTime = delay.getMillis();
-            log.debug(
+            log.warn(

Review comment:
       :+1:

##########
File path: docs/ingestion/tasks.md
##########
@@ -347,9 +347,10 @@ The task context is used for various individual task 
configuration. The followin
 
 |property|default|description|
 |--------|-------|-----------|
-|taskLockTimeout|300000|task lock timeout in millisecond. For more details, 
see [Locking](#locking).|
-|forceTimeChunkLock|true|_Setting this to false is still experimental_<br/> 
Force to always use time chunk lock. If not set, each task automatically 
chooses a lock type to use. If this set, it will overwrite the 
`druid.indexer.tasklock.forceTimeChunkLock` [configuration for the 
overlord](../configuration/index.md#overlord-operations). See 
[Locking](#locking) for more details.|
-|priority|Different based on task types. See [Priority](#priority).|Task 
priority|
+|`taskLockTimeout`|300000|task lock timeout in millisecond. For more details, 
see [Locking](#locking).|
+|`forceTimeChunkLock`|true|_Setting this to false is still experimental_<br/> 
Force to always use time chunk lock. If not set, each task automatically 
chooses a lock type to use. If this set, it will overwrite the 
`druid.indexer.tasklock.forceTimeChunkLock` [configuration for the 
overlord](../configuration/index.md#overlord-operations). See 
[Locking](#locking) for more details.|
+|`priority`|Different based on task types. See [Priority](#priority).|Task 
priority|
+|`useLineageBasedSegmentAllocation`|false|Enable the new lineage-based segment 
allocation protocol for the native Parallel task with dynamic partitioning. 
This option should be off during the replacing rolling upgrade to Druid 0.22 or 
higher. Once the upgrade is done, it must be set to true.|

Review comment:
       maybe worth elaborating on why, e.g. "...must be set to true to ensure 
data correctness"

##########
File path: core/src/main/java/org/apache/druid/java/util/common/RetryUtils.java
##########
@@ -76,12 +77,34 @@
       @Nullable final CleanupAfterFailure cleanupAfterFailure,
       @Nullable final String messageOnRetry
   ) throws Exception
+  {
+    return retry(
+        f,
+        shouldRetry,
+        quietTries,
+        maxTries,
+        cleanupAfterFailure,
+        messageOnRetry,
+        false
+    );
+  }
+
+  @VisibleForTesting
+  static <T> T retry(
+      final Task<T> f,
+      final Predicate<Throwable> shouldRetry,
+      final int quietTries,
+      final int maxTries,
+      @Nullable final CleanupAfterFailure cleanupAfterFailure,
+      @Nullable final String messageOnRetry,
+      boolean skipSleep

Review comment:
       nit: is skip sleep the test parameter i guess? maybe worth javadocs

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/TaskLockHelper.java
##########
@@ -281,16 +281,28 @@ public static void 
verifyRootPartitionIsAdjacentAndAtomicUpdateGroupIsFull(List<
         atomicUpdateGroupSize++;
       } else {
         if (curSegment.getEndRootPartitionId() != 
nextSegment.getStartRootPartitionId()) {
-          throw new ISE("Can't compact segments of non-consecutive 
rootPartition range");
+          throw new ISE(
+              "Can't compact segments of non-consecutive rootPartition range. 
Missing partitionIds between [%s] and [%s]",
+              curSegment.getEndRootPartitionId(),
+              nextSegment.getStartRootPartitionId()
+          );

Review comment:
       nice :+1:

##########
File path: 
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
##########
@@ -173,17 +181,43 @@
           null
       );
 
+  protected static final double DEFAULT_TRANSIENT_TASK_FAILURE_RATE = 0.3;
+  protected static final double DEFAULT_TRANSIENT_API_FAILURE_RATE = 0.3;
+
   private static final Logger LOG = new 
Logger(AbstractParallelIndexSupervisorTaskTest.class);
 
   @Rule
   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
 
+  /**
+   * Transient task failure rate emulated by the taskKiller in {@link 
SimpleThreadingTaskRunner}.
+   * Per {@link SubTaskSpec}, there could be at most one task failure.
+   */
+  private final double transientTaskFailureRate;
+
+  /**
+   * Transient API call failure rate emulated by {@link 
LocalParallelIndexSupervisorTaskClient}.
+   * This will be applied to every API calls in the future.
+   */
+  private final double transientApiCallFailureRate;
+

Review comment:
       cool :+1:

##########
File path: 
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelper.java
##########
@@ -72,7 +72,24 @@
       if (firstShardSpec instanceof OverwriteShardSpec) {
         annotateFn = annotateAtomicUpdateGroupFn(segmentsPerInterval.size());
       } else if (firstShardSpec instanceof BuildingShardSpec) {
-        annotateFn = 
annotateCorePartitionSetSizeFn(segmentsPerInterval.size());
+        // sanity check
+        // BuildingShardSpec is used in non-appending mode. In this mode,
+        // the segments in each interval should have contiguous partitionIds,
+        // so that they can be queryable (see PartitionHolder.isComplete()).
+        int expectedCorePartitionSetSize = segmentsPerInterval.size();
+        int actualCorePartitionSetSize = Math.toIntExact(
+            segmentsPerInterval
+                .stream()
+                .filter(segment -> segment.getShardSpec().getPartitionNum() < 
expectedCorePartitionSetSize)
+                .count()
+        );
+        if (expectedCorePartitionSetSize != actualCorePartitionSetSize) {
+          throw new ISE(
+              "Cannot publish segments due to incomplete time chunk. Segments 
are [%s]",
+              
segmentsPerInterval.stream().map(DataSegment::getId).collect(Collectors.toList())

Review comment:
       :+1: on sanity check... is there any chance the list of segments is huge 
here? (maybe we should use `log.errorSegments` to log segments and just include 
count/interval or something?)

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
##########
@@ -351,6 +355,7 @@ public boolean add(final Task task) throws 
EntryExistsException
 
     // Set forceTimeChunkLock before adding task spec to taskStorage, so that 
we can see always consistent task spec.
     task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, 
lockConfig.isForceTimeChunkLock());
+    defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent);

Review comment:
       I think we should we also set the use lineage config if it is absent to 
true here so that custom taskContext configs that are missing that setting do 
not run with false. The documentation for the default config would then no 
longer need to indicate that config is the default config since it would be 
implicit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to