jihoonson commented on a change in pull request #11189:
URL: https://github.com/apache/druid/pull/11189#discussion_r627901501



##########
File path: core/src/main/java/org/apache/druid/java/util/common/RetryUtils.java
##########
@@ -76,12 +77,34 @@
       @Nullable final CleanupAfterFailure cleanupAfterFailure,
       @Nullable final String messageOnRetry
   ) throws Exception
+  {
+    return retry(
+        f,
+        shouldRetry,
+        quietTries,
+        maxTries,
+        cleanupAfterFailure,
+        messageOnRetry,
+        false
+    );
+  }
+
+  @VisibleForTesting
+  static <T> T retry(
+      final Task<T> f,
+      final Predicate<Throwable> shouldRetry,
+      final int quietTries,
+      final int maxTries,
+      @Nullable final CleanupAfterFailure cleanupAfterFailure,
+      @Nullable final String messageOnRetry,
+      boolean skipSleep

Review comment:
       Added javadocs.

##########
File path: docs/ingestion/tasks.md
##########
@@ -347,9 +347,10 @@ The task context is used for various individual task 
configuration. The followin
 
 |property|default|description|
 |--------|-------|-----------|
-|taskLockTimeout|300000|task lock timeout in millisecond. For more details, 
see [Locking](#locking).|
-|forceTimeChunkLock|true|_Setting this to false is still experimental_<br/> 
Force to always use time chunk lock. If not set, each task automatically 
chooses a lock type to use. If this set, it will overwrite the 
`druid.indexer.tasklock.forceTimeChunkLock` [configuration for the 
overlord](../configuration/index.md#overlord-operations). See 
[Locking](#locking) for more details.|
-|priority|Different based on task types. See [Priority](#priority).|Task 
priority|
+|`taskLockTimeout`|300000|task lock timeout in millisecond. For more details, 
see [Locking](#locking).|
+|`forceTimeChunkLock`|true|_Setting this to false is still experimental_<br/> 
Force to always use time chunk lock. If not set, each task automatically 
chooses a lock type to use. If this set, it will overwrite the 
`druid.indexer.tasklock.forceTimeChunkLock` [configuration for the 
overlord](../configuration/index.md#overlord-operations). See 
[Locking](#locking) for more details.|
+|`priority`|Different based on task types. See [Priority](#priority).|Task 
priority|
+|`useLineageBasedSegmentAllocation`|false|Enable the new lineage-based segment 
allocation protocol for the native Parallel task with dynamic partitioning. 
This option should be off during the replacing rolling upgrade to Druid 0.22 or 
higher. Once the upgrade is done, it must be set to true.|

Review comment:
       Updated the doc per suggestion.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
##########
@@ -351,6 +355,7 @@ public boolean add(final Task task) throws 
EntryExistsException
 
     // Set forceTimeChunkLock before adding task spec to taskStorage, so that 
we can see always consistent task spec.
     task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, 
lockConfig.isForceTimeChunkLock());
+    defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent);

Review comment:
       Good idea. I changed the default of the default context to an empty map, 
and added `useLineageBasedSegmentAllocation` here.

##########
File path: 
server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelper.java
##########
@@ -72,7 +72,24 @@
       if (firstShardSpec instanceof OverwriteShardSpec) {
         annotateFn = annotateAtomicUpdateGroupFn(segmentsPerInterval.size());
       } else if (firstShardSpec instanceof BuildingShardSpec) {
-        annotateFn = 
annotateCorePartitionSetSizeFn(segmentsPerInterval.size());
+        // sanity check
+        // BuildingShardSpec is used in non-appending mode. In this mode,
+        // the segments in each interval should have contiguous partitionIds,
+        // so that they can be queryable (see PartitionHolder.isComplete()).
+        int expectedCorePartitionSetSize = segmentsPerInterval.size();
+        int actualCorePartitionSetSize = Math.toIntExact(
+            segmentsPerInterval
+                .stream()
+                .filter(segment -> segment.getShardSpec().getPartitionNum() < 
expectedCorePartitionSetSize)
+                .count()
+        );
+        if (expectedCorePartitionSetSize != actualCorePartitionSetSize) {
+          throw new ISE(
+              "Cannot publish segments due to incomplete time chunk. Segments 
are [%s]",
+              
segmentsPerInterval.stream().map(DataSegment::getId).collect(Collectors.toList())

Review comment:
       Thanks for reminding me of that. Changed to use `log.errorSegments` and 
to not create a too large string for exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to