gianm closed pull request #5960: [Backport] add 'stopped' check and handling to HttpLoadQueuePeon load and drop segment methods URL: https://github.com/apache/incubator-druid/pull/5960
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java index dbeeb738601..ece1d4884fa 100644 --- a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java @@ -28,16 +28,16 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import io.druid.java.util.common.ISE; import io.druid.java.util.common.RE; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.java.util.emitter.EmittingLogger; import io.druid.java.util.http.client.HttpClient; import io.druid.java.util.http.client.Request; import io.druid.java.util.http.client.io.AppendableByteArrayInputStream; import io.druid.java.util.http.client.response.ClientResponse; import io.druid.java.util.http.client.response.InputStreamResponseHandler; -import io.druid.java.util.common.ISE; -import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.server.coordination.DataSegmentChangeCallback; import io.druid.server.coordination.DataSegmentChangeHandler; import io.druid.server.coordination.DataSegmentChangeRequest; @@ -61,7 +61,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; @@ -261,6 +260,7 @@ public void onSuccess(InputStream result) public void onFailure(Throwable t) { try { + responseHandler.description = t.toString(); logRequestFailure(t); } finally { @@ -333,20 +333,15 @@ public void start() ScheduledExecutors.scheduleAtFixedRate( processingExecutor, new Duration(config.getHttpLoadQueuePeonRepeatDelay()), - new Callable<ScheduledExecutors.Signal>() - { - @Override - public ScheduledExecutors.Signal call() - { - if (!stopped) { - doSegmentManagement(); - } + () -> { + if (!stopped) { + doSegmentManagement(); + } - if (stopped) { - return ScheduledExecutors.Signal.STOP; - } else { - return ScheduledExecutors.Signal.REPEAT; - } + if (stopped) { + return ScheduledExecutors.Signal.STOP; + } else { + return ScheduledExecutors.Signal.REPEAT; } } ); @@ -364,11 +359,11 @@ public void stop() stopped = true; for (SegmentHolder holder : segmentsToDrop.values()) { - holder.requestSucceeded(); + holder.requestFailed("Stopping load queue peon."); } for (SegmentHolder holder : segmentsToLoad.values()) { - holder.requestSucceeded(); + holder.requestFailed("Stopping load queue peon."); } segmentsToDrop.clear(); @@ -382,6 +377,16 @@ public void stop() public void loadSegment(DataSegment segment, LoadPeonCallback callback) { synchronized (lock) { + if (stopped) { + log.warn( + "Server[%s] cannot load segment[%s] because load queue peon is stopped.", + serverId, + segment.getIdentifier() + ); + callback.execute(); + return; + } + SegmentHolder holder = segmentsToLoad.get(segment); if (holder == null) { @@ -398,6 +403,15 @@ public void loadSegment(DataSegment segment, LoadPeonCallback callback) public void dropSegment(DataSegment segment, LoadPeonCallback callback) { synchronized (lock) { + if (stopped) { + log.warn( + "Server[%s] cannot drop segment[%s] because load queue peon is stopped.", + serverId, + segment.getIdentifier() + ); + callback.execute(); + return; + } SegmentHolder holder = segmentsToDrop.get(segment); if (holder == null) { diff --git a/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java index 9447c7b0895..0ed48ac726d 100644 --- a/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -233,7 +233,7 @@ public void tearDown() throws Exception tearDownServerAndCurator(); } - @Test(timeout = 5_000) + @Test(timeout = 10_000) public void testMoveSegment() throws Exception { segmentViewInitLatch = new CountDownLatch(1); diff --git a/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java index 72fb9a36a5d..c2388359a1e 100644 --- a/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java @@ -24,14 +24,14 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.druid.java.util.http.client.HttpClient; -import io.druid.java.util.http.client.Request; -import io.druid.java.util.http.client.response.HttpResponseHandler; import io.druid.discovery.DiscoveryDruidNode; import io.druid.discovery.DruidNodeDiscovery; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.RE; import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.Request; +import io.druid.java.util.http.client.response.HttpResponseHandler; import io.druid.server.ServerTestHelper; import io.druid.server.coordination.DataSegmentChangeRequest; import io.druid.server.coordination.SegmentLoadDropHandler; @@ -57,40 +57,92 @@ */ public class HttpLoadQueuePeonTest { + final DataSegment segment1 = new DataSegment( + "test1", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final DataSegment segment2 = new DataSegment( + "test2", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final DataSegment segment3 = new DataSegment( + "test3", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final DataSegment segment4 = new DataSegment( + "test4", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final TestDruidCoordinatorConfig config = new TestDruidCoordinatorConfig( + null, + null, + null, + null, + null, + null, + 10, + null, + false, + false, + Duration.ZERO + ) + { + @Override + public int getHttpLoadQueuePeonBatchSize() + { + return 2; + } + }; + @Test(timeout = 10000) public void testSimple() throws Exception { - final DataSegment segment1 = new DataSegment( - "test1", Intervals.of("2014/2015"), "v1", - null, null, null, null, 0, 0 + HttpLoadQueuePeon httpLoadQueuePeon = new HttpLoadQueuePeon( + "http://dummy:4000", + ServerTestHelper.MAPPER, + new TestHttpClient(), + config, + Executors.newScheduledThreadPool( + 2, + Execs.makeThreadFactory("HttpLoadQueuePeonTest-%s") + ), + Execs.singleThreaded("HttpLoadQueuePeonTest") ); - final DataSegment segment2 = new DataSegment( - "test2", Intervals.of("2014/2015"), "v1", - null, null, null, null, 0, 0 - ); + httpLoadQueuePeon.start(); - final DataSegment segment3 = new DataSegment( - "test3", Intervals.of("2014/2015"), "v1", - null, null, null, null, 0, 0 + Map<String, CountDownLatch> latches = ImmutableMap.of( + segment1.getIdentifier(), new CountDownLatch(1), + segment2.getIdentifier(), new CountDownLatch(1), + segment3.getIdentifier(), new CountDownLatch(1), + segment4.getIdentifier(), new CountDownLatch(1) ); - final DataSegment segment4 = new DataSegment( - "test4", Intervals.of("2014/2015"), "v1", - null, null, null, null, 0, 0 - ); + httpLoadQueuePeon.dropSegment(segment1, () -> latches.get(segment1.getIdentifier()).countDown()); + httpLoadQueuePeon.loadSegment(segment2, () -> latches.get(segment2.getIdentifier()).countDown()); + httpLoadQueuePeon.dropSegment(segment3, () -> latches.get(segment3.getIdentifier()).countDown()); + httpLoadQueuePeon.loadSegment(segment4, () -> latches.get(segment4.getIdentifier()).countDown()); + + latches.get(segment1.getIdentifier()).await(); + latches.get(segment2.getIdentifier()).await(); + latches.get(segment3.getIdentifier()).await(); + latches.get(segment4.getIdentifier()).await(); + httpLoadQueuePeon.stop(); + } + + @Test(timeout = 10000) + public void testLoadDropAfterStop() throws Exception + { HttpLoadQueuePeon httpLoadQueuePeon = new HttpLoadQueuePeon( "http://dummy:4000", ServerTestHelper.MAPPER, new TestHttpClient(), - new TestDruidCoordinatorConfig(null, null, null, null, null, null, 10, null, false, false, Duration.ZERO) { - @Override - public int getHttpLoadQueuePeonBatchSize() - { - return 2; - } - }, + config, Executors.newScheduledThreadPool( 2, Execs.makeThreadFactory("HttpLoadQueuePeonTest-%s") @@ -107,48 +159,16 @@ public int getHttpLoadQueuePeonBatchSize() segment4.getIdentifier(), new CountDownLatch(1) ); - httpLoadQueuePeon.dropSegment(segment1, new LoadPeonCallback() - { - @Override - public void execute() - { - latches.get(segment1.getIdentifier()).countDown(); - } - }); - - httpLoadQueuePeon.loadSegment(segment2, new LoadPeonCallback() - { - @Override - public void execute() - { - latches.get(segment2.getIdentifier()).countDown(); - } - }); - - httpLoadQueuePeon.dropSegment(segment3, new LoadPeonCallback() - { - @Override - public void execute() - { - latches.get(segment3.getIdentifier()).countDown(); - } - }); - - httpLoadQueuePeon.loadSegment(segment4, new LoadPeonCallback() - { - @Override - public void execute() - { - latches.get(segment4.getIdentifier()).countDown(); - } - }); - + httpLoadQueuePeon.dropSegment(segment1, () -> latches.get(segment1.getIdentifier()).countDown()); + httpLoadQueuePeon.loadSegment(segment2, () -> latches.get(segment2.getIdentifier()).countDown()); latches.get(segment1.getIdentifier()).await(); latches.get(segment2.getIdentifier()).await(); + httpLoadQueuePeon.stop(); + httpLoadQueuePeon.dropSegment(segment3, () -> latches.get(segment3.getIdentifier()).countDown()); + httpLoadQueuePeon.loadSegment(segment4, () -> latches.get(segment4.getIdentifier()).countDown()); latches.get(segment3.getIdentifier()).await(); latches.get(segment4.getIdentifier()).await(); - httpLoadQueuePeon.stop(); } private static class TestDruidNodeDiscovery implements DruidNodeDiscovery @@ -191,12 +211,17 @@ public void registerListener(Listener listener) httpResponseHandler.handleResponse(httpResponse); try { List<DataSegmentChangeRequest> changeRequests = ServerTestHelper.MAPPER.readValue( - request.getContent().array(), new TypeReference<List<DataSegmentChangeRequest>>() {} + request.getContent().array(), new TypeReference<List<DataSegmentChangeRequest>>() + { + } ); List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> statuses = new ArrayList<>(changeRequests.size()); for (DataSegmentChangeRequest cr : changeRequests) { - statuses.add(new SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(cr, SegmentLoadDropHandler.Status.SUCCESS)); + statuses.add(new SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus( + cr, + SegmentLoadDropHandler.Status.SUCCESS + )); } return (ListenableFuture) Futures.immediateFuture( new ByteArrayInputStream( ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org