This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.12.2 by this push:
     new 4ad7e96  add 'stopped' check and handling to HttpLoadQueuePeon load 
and drop segment methods (#5555) (#5960)
4ad7e96 is described below

commit 4ad7e965c5b66ccf9bcca18202803ca73b6d8383
Author: Jihoon Son <jihoon...@apache.org>
AuthorDate: Mon Jul 9 11:23:21 2018 -0700

    add 'stopped' check and handling to HttpLoadQueuePeon load and drop segment 
methods (#5555) (#5960)
    
    * add stopped check and handling to HttpLoadQueuePeon load and drop segment 
methods
    
    * fix unrelated timeout :(
    
    * revert unintended change
    
    * PR feedback: change logging
    
    * fix dumb
---
 .../server/coordinator/HttpLoadQueuePeon.java      |  50 ++++---
 .../coordinator/CuratorDruidCoordinatorTest.java   |   2 +-
 .../server/coordinator/HttpLoadQueuePeonTest.java  | 151 ++++++++++++---------
 3 files changed, 121 insertions(+), 82 deletions(-)

diff --git 
a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java 
b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java
index dbeeb73..ece1d48 100644
--- a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java
+++ b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java
@@ -28,16 +28,16 @@ import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.RE;
 import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.common.concurrent.ScheduledExecutors;
 import io.druid.java.util.emitter.EmittingLogger;
 import io.druid.java.util.http.client.HttpClient;
 import io.druid.java.util.http.client.Request;
 import io.druid.java.util.http.client.io.AppendableByteArrayInputStream;
 import io.druid.java.util.http.client.response.ClientResponse;
 import io.druid.java.util.http.client.response.InputStreamResponseHandler;
-import io.druid.java.util.common.ISE;
-import io.druid.java.util.common.concurrent.ScheduledExecutors;
 import io.druid.server.coordination.DataSegmentChangeCallback;
 import io.druid.server.coordination.DataSegmentChangeHandler;
 import io.druid.server.coordination.DataSegmentChangeRequest;
@@ -61,7 +61,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
@@ -261,6 +260,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
             public void onFailure(Throwable t)
             {
               try {
+                responseHandler.description = t.toString();
                 logRequestFailure(t);
               }
               finally {
@@ -333,20 +333,15 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
       ScheduledExecutors.scheduleAtFixedRate(
           processingExecutor,
           new Duration(config.getHttpLoadQueuePeonRepeatDelay()),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              if (!stopped) {
-                doSegmentManagement();
-              }
+          () -> {
+            if (!stopped) {
+              doSegmentManagement();
+            }
 
-              if (stopped) {
-                return ScheduledExecutors.Signal.STOP;
-              } else {
-                return ScheduledExecutors.Signal.REPEAT;
-              }
+            if (stopped) {
+              return ScheduledExecutors.Signal.STOP;
+            } else {
+              return ScheduledExecutors.Signal.REPEAT;
             }
           }
       );
@@ -364,11 +359,11 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
       stopped = true;
 
       for (SegmentHolder holder : segmentsToDrop.values()) {
-        holder.requestSucceeded();
+        holder.requestFailed("Stopping load queue peon.");
       }
 
       for (SegmentHolder holder : segmentsToLoad.values()) {
-        holder.requestSucceeded();
+        holder.requestFailed("Stopping load queue peon.");
       }
 
       segmentsToDrop.clear();
@@ -382,6 +377,16 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
   public void loadSegment(DataSegment segment, LoadPeonCallback callback)
   {
     synchronized (lock) {
+      if (stopped) {
+        log.warn(
+            "Server[%s] cannot load segment[%s] because load queue peon is 
stopped.",
+            serverId,
+            segment.getIdentifier()
+        );
+        callback.execute();
+        return;
+      }
+
       SegmentHolder holder = segmentsToLoad.get(segment);
 
       if (holder == null) {
@@ -398,6 +403,15 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
   public void dropSegment(DataSegment segment, LoadPeonCallback callback)
   {
     synchronized (lock) {
+      if (stopped) {
+        log.warn(
+            "Server[%s] cannot drop segment[%s] because load queue peon is 
stopped.",
+            serverId,
+            segment.getIdentifier()
+        );
+        callback.execute();
+        return;
+      }
       SegmentHolder holder = segmentsToDrop.get(segment);
 
       if (holder == null) {
diff --git 
a/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java
 
b/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index 9447c7b..0ed48ac 100644
--- 
a/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ 
b/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -233,7 +233,7 @@ public class CuratorDruidCoordinatorTest extends 
CuratorTestBase
     tearDownServerAndCurator();
   }
 
-  @Test(timeout = 5_000)
+  @Test(timeout = 10_000)
   public void testMoveSegment() throws Exception
   {
     segmentViewInitLatch = new CountDownLatch(1);
diff --git 
a/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java 
b/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java
index 72fb9a3..c238835 100644
--- 
a/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java
+++ 
b/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java
@@ -24,14 +24,14 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import io.druid.java.util.http.client.HttpClient;
-import io.druid.java.util.http.client.Request;
-import io.druid.java.util.http.client.response.HttpResponseHandler;
 import io.druid.discovery.DiscoveryDruidNode;
 import io.druid.discovery.DruidNodeDiscovery;
 import io.druid.java.util.common.Intervals;
 import io.druid.java.util.common.RE;
 import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.HttpResponseHandler;
 import io.druid.server.ServerTestHelper;
 import io.druid.server.coordination.DataSegmentChangeRequest;
 import io.druid.server.coordination.SegmentLoadDropHandler;
@@ -57,40 +57,92 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class HttpLoadQueuePeonTest
 {
+  final DataSegment segment1 = new DataSegment(
+      "test1", Intervals.of("2014/2015"), "v1",
+      null, null, null, null, 0, 0
+  );
+
+  final DataSegment segment2 = new DataSegment(
+      "test2", Intervals.of("2014/2015"), "v1",
+      null, null, null, null, 0, 0
+  );
+
+  final DataSegment segment3 = new DataSegment(
+      "test3", Intervals.of("2014/2015"), "v1",
+      null, null, null, null, 0, 0
+  );
+
+  final DataSegment segment4 = new DataSegment(
+      "test4", Intervals.of("2014/2015"), "v1",
+      null, null, null, null, 0, 0
+  );
+
+  final TestDruidCoordinatorConfig config = new TestDruidCoordinatorConfig(
+      null,
+      null,
+      null,
+      null,
+      null,
+      null,
+      10,
+      null,
+      false,
+      false,
+      Duration.ZERO
+  )
+  {
+    @Override
+    public int getHttpLoadQueuePeonBatchSize()
+    {
+      return 2;
+    }
+  };
+
   @Test(timeout = 10000)
   public void testSimple() throws Exception
   {
-    final DataSegment segment1 = new DataSegment(
-        "test1", Intervals.of("2014/2015"), "v1",
-        null, null, null, null, 0, 0
+    HttpLoadQueuePeon httpLoadQueuePeon = new HttpLoadQueuePeon(
+        "http://dummy:4000";,
+        ServerTestHelper.MAPPER,
+        new TestHttpClient(),
+        config,
+        Executors.newScheduledThreadPool(
+            2,
+            Execs.makeThreadFactory("HttpLoadQueuePeonTest-%s")
+        ),
+        Execs.singleThreaded("HttpLoadQueuePeonTest")
     );
 
-    final DataSegment segment2 = new DataSegment(
-        "test2", Intervals.of("2014/2015"), "v1",
-        null, null, null, null, 0, 0
-    );
+    httpLoadQueuePeon.start();
 
-    final DataSegment segment3 = new DataSegment(
-        "test3", Intervals.of("2014/2015"), "v1",
-        null, null, null, null, 0, 0
+    Map<String, CountDownLatch> latches = ImmutableMap.of(
+        segment1.getIdentifier(), new CountDownLatch(1),
+        segment2.getIdentifier(), new CountDownLatch(1),
+        segment3.getIdentifier(), new CountDownLatch(1),
+        segment4.getIdentifier(), new CountDownLatch(1)
     );
 
-    final DataSegment segment4 = new DataSegment(
-        "test4", Intervals.of("2014/2015"), "v1",
-        null, null, null, null, 0, 0
-    );
+    httpLoadQueuePeon.dropSegment(segment1, () -> 
latches.get(segment1.getIdentifier()).countDown());
+    httpLoadQueuePeon.loadSegment(segment2, () -> 
latches.get(segment2.getIdentifier()).countDown());
+    httpLoadQueuePeon.dropSegment(segment3, () -> 
latches.get(segment3.getIdentifier()).countDown());
+    httpLoadQueuePeon.loadSegment(segment4, () -> 
latches.get(segment4.getIdentifier()).countDown());
+
+    latches.get(segment1.getIdentifier()).await();
+    latches.get(segment2.getIdentifier()).await();
+    latches.get(segment3.getIdentifier()).await();
+    latches.get(segment4.getIdentifier()).await();
 
+    httpLoadQueuePeon.stop();
+  }
+
+  @Test(timeout = 10000)
+  public void testLoadDropAfterStop() throws Exception
+  {
     HttpLoadQueuePeon httpLoadQueuePeon = new HttpLoadQueuePeon(
         "http://dummy:4000";,
         ServerTestHelper.MAPPER,
         new TestHttpClient(),
-        new TestDruidCoordinatorConfig(null, null, null, null, null, null, 10, 
null, false, false, Duration.ZERO) {
-          @Override
-          public int getHttpLoadQueuePeonBatchSize()
-          {
-            return 2;
-          }
-        },
+        config,
         Executors.newScheduledThreadPool(
             2,
             Execs.makeThreadFactory("HttpLoadQueuePeonTest-%s")
@@ -107,48 +159,16 @@ public class HttpLoadQueuePeonTest
         segment4.getIdentifier(), new CountDownLatch(1)
     );
 
-    httpLoadQueuePeon.dropSegment(segment1, new LoadPeonCallback()
-    {
-      @Override
-      public void execute()
-      {
-        latches.get(segment1.getIdentifier()).countDown();
-      }
-    });
-
-    httpLoadQueuePeon.loadSegment(segment2, new LoadPeonCallback()
-    {
-      @Override
-      public void execute()
-      {
-        latches.get(segment2.getIdentifier()).countDown();
-      }
-    });
-
-    httpLoadQueuePeon.dropSegment(segment3, new LoadPeonCallback()
-    {
-      @Override
-      public void execute()
-      {
-        latches.get(segment3.getIdentifier()).countDown();
-      }
-    });
-
-    httpLoadQueuePeon.loadSegment(segment4, new LoadPeonCallback()
-    {
-      @Override
-      public void execute()
-      {
-        latches.get(segment4.getIdentifier()).countDown();
-      }
-    });
-
+    httpLoadQueuePeon.dropSegment(segment1, () -> 
latches.get(segment1.getIdentifier()).countDown());
+    httpLoadQueuePeon.loadSegment(segment2, () -> 
latches.get(segment2.getIdentifier()).countDown());
     latches.get(segment1.getIdentifier()).await();
     latches.get(segment2.getIdentifier()).await();
+    httpLoadQueuePeon.stop();
+    httpLoadQueuePeon.dropSegment(segment3, () -> 
latches.get(segment3.getIdentifier()).countDown());
+    httpLoadQueuePeon.loadSegment(segment4, () -> 
latches.get(segment4.getIdentifier()).countDown());
     latches.get(segment3.getIdentifier()).await();
     latches.get(segment4.getIdentifier()).await();
 
-    httpLoadQueuePeon.stop();
   }
 
   private static class TestDruidNodeDiscovery implements DruidNodeDiscovery
@@ -191,12 +211,17 @@ public class HttpLoadQueuePeonTest
       httpResponseHandler.handleResponse(httpResponse);
       try {
         List<DataSegmentChangeRequest> changeRequests = 
ServerTestHelper.MAPPER.readValue(
-            request.getContent().array(), new 
TypeReference<List<DataSegmentChangeRequest>>() {}
+            request.getContent().array(), new 
TypeReference<List<DataSegmentChangeRequest>>()
+            {
+            }
         );
 
         List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> 
statuses = new ArrayList<>(changeRequests.size());
         for (DataSegmentChangeRequest cr : changeRequests) {
-          statuses.add(new 
SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(cr, 
SegmentLoadDropHandler.Status.SUCCESS));
+          statuses.add(new 
SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(
+              cr,
+              SegmentLoadDropHandler.Status.SUCCESS
+          ));
         }
         return (ListenableFuture) Futures.immediateFuture(
             new ByteArrayInputStream(


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org
For additional commands, e-mail: dev-h...@druid.apache.org

Reply via email to