This is an automated email from the ASF dual-hosted git repository.

georgew5656 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new aca56d6bb84 reject publishing actions with a retriable error code if a 
earlier task is still publishing (#17509)
aca56d6bb84 is described below

commit aca56d6bb842231853d624e7da07748ba002ac4f
Author: George Shiqi Wu <[email protected]>
AuthorDate: Thu Dec 12 10:37:53 2024 -0500

    reject publishing actions with a retriable error code if a earlier task is 
still publishing (#17509)
    
    * Working queuing of publishing
    
    * fix style
    
    * Add unit tests
    
    * add tests
    
    * retry within the connector
    
    * fix unit tests
    
    * Update 
indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java
    
    Co-authored-by: Kashif Faraz <[email protected]>
    
    * Add comment
    
    * fix style
    
    * Fix unit tests
    
    * style fix
    
    ---------
    
    Co-authored-by: Kashif Faraz <[email protected]>
---
 .../apache/druid/indexing/common/actions/LocalTaskActionClient.java | 1 +
 .../common/actions/SegmentTransactionalInsertActionTest.java        | 4 ++--
 .../apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java | 4 +++-
 .../druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java    | 6 +++---
 4 files changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java
index 1d0059335ed..81ff137ed17 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java
@@ -69,6 +69,7 @@ public class LocalTaskActionClient implements TaskActionClient
       return result;
     }
     catch (Throwable t) {
+      log.error(t, "Failed to perform action[%s]", taskAction);
       throw new RuntimeException(t);
     }
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
index 44ce60b5ceb..7d5f4488fea 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.indexing.common.actions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
-import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.task.NoopTask;
 import org.apache.druid.indexing.common.task.Task;
@@ -32,6 +31,7 @@ import 
org.apache.druid.indexing.overlord.SegmentPublishResult;
 import org.apache.druid.indexing.overlord.Segments;
 import org.apache.druid.indexing.overlord.TimeChunkLockRequest;
 import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.metadata.RetryTransactionException;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.assertj.core.api.Assertions;
@@ -151,7 +151,7 @@ public class SegmentTransactionalInsertActionTest
 
     Assert.assertEquals(
         SegmentPublishResult.fail(
-            InvalidInput.exception(
+            new RetryTransactionException(
                 "The new start metadata state[ObjectMetadata{theObject=[1]}] 
is"
                 + " ahead of the last committed end state[null]. Try resetting 
the supervisor."
             ).toString()
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 69d86ab80cf..0717c9b07ee 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -2687,7 +2687,9 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
 
     if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) {
       // Offsets stored in startMetadata is greater than the last commited 
metadata.
-      return DataStoreMetadataUpdateResult.failure(
+      // This can happen because the previous task is still publishing its 
segments and can resolve once
+      // the previous task finishes publishing.
+      return DataStoreMetadataUpdateResult.retryableFailure(
           "The new start metadata state[%s] is ahead of the last committed"
           + " end state[%s]. Try resetting the supervisor.",
           startMetadata, oldCommitMetadataFromDb
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 0377faac5fb..a000fbec5a3 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -784,15 +784,15 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     );
     Assert.assertEquals(
         SegmentPublishResult.fail(
-            InvalidInput.exception(
+            new RetryTransactionException(
                 "The new start metadata 
state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last committed"
                 + " end state[null]. Try resetting the supervisor."
             ).toString()),
         result1
     );
 
-    // Should only be tried once.
-    Assert.assertEquals(1, metadataUpdateCounter.get());
+    // Should be retried.
+    Assert.assertEquals(2, metadataUpdateCounter.get());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to