panhongan commented on code in PR #11296:
URL: https://github.com/apache/druid/pull/11296#discussion_r979179770


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1783,41 +1783,49 @@ public Response pauseHTTP(
   @VisibleForTesting
   public Response pause() throws InterruptedException
   {
-    if (!(status == Status.PAUSED || status == Status.READING)) {
+    if (status == Status.NOT_STARTED || status == Status.STARTING) {
       return Response.status(Response.Status.BAD_REQUEST)
-                     .type(MediaType.TEXT_PLAIN)
-                     .entity(StringUtils.format("Can't pause, task is not in a 
pausable state (state: [%s])", status))
-                     .build();
+              .type(MediaType.TEXT_PLAIN)
+              .entity(StringUtils.format("Can't pause, task state is invalid 
(state: [%s])", status))

Review Comment:
   done



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java:
##########
@@ -1783,41 +1783,49 @@ public Response pauseHTTP(
   @VisibleForTesting
   public Response pause() throws InterruptedException
   {
-    if (!(status == Status.PAUSED || status == Status.READING)) {
+    if (status == Status.NOT_STARTED || status == Status.STARTING) {
       return Response.status(Response.Status.BAD_REQUEST)
-                     .type(MediaType.TEXT_PLAIN)
-                     .entity(StringUtils.format("Can't pause, task is not in a 
pausable state (state: [%s])", status))
-                     .build();
+              .type(MediaType.TEXT_PLAIN)
+              .entity(StringUtils.format("Can't pause, task state is invalid 
(state: [%s])", status))
+              .build();
     }
 
-    pauseLock.lockInterruptibly();
-    try {
-      pauseRequested = true;
+    // if status in [PAUSED, READING], need to pause
+    // if status == PUBLISHING, return current offset, not to report exception
+    if (status == Status.PAUSED || status == Status.READING) {
+      log.info("Task state is pausable, taskId: [%s], state: [%s])", 
task.getId(), status);
 
-      pollRetryLock.lockInterruptibly();
+      pauseLock.lockInterruptibly();
       try {
-        isAwaitingRetry.signalAll();
-      }
-      finally {
-        pollRetryLock.unlock();
-      }
+        pauseRequested = true;
 
-      if (isPaused()) {
-        shouldResume.signalAll(); // kick the monitor so it re-awaits with the 
new pauseMillis
-      }
+        pollRetryLock.lockInterruptibly();
+        try {
+          isAwaitingRetry.signalAll();
+        }
+        finally {
+          pollRetryLock.unlock();
+        }
 
-      long nanos = TimeUnit.SECONDS.toNanos(2);
-      while (!isPaused()) {
-        if (nanos <= 0L) {
-          return Response.status(Response.Status.ACCEPTED)
-                         .entity("Request accepted but task has not yet 
paused")
-                         .build();
+        if (isPaused()) {
+          shouldResume.signalAll(); // kick the monitor so it re-awaits with 
the new pauseMillis
+        }
+
+        long nanos = TimeUnit.SECONDS.toNanos(2);
+        while (!isPaused()) {
+          if (nanos <= 0L) {
+            return Response.status(Response.Status.ACCEPTED)
+                    .entity("Request accepted but task has not yet paused")
+                    .build();
+          }
+          nanos = hasPaused.awaitNanos(nanos);
         }
-        nanos = hasPaused.awaitNanos(nanos);
       }
-    }
-    finally {
-      pauseLock.unlock();
+      finally {
+        pauseLock.unlock();
+      }
+    } else {
+      log.info("Return current offsets directly, taskId: [%s], state: [%s]", 
task.getId(), status);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to