This is an automated email from the ASF dual-hosted git repository. gian pushed a commit to branch 0.12.2 in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.12.2 by this push: new 4ad7e96 add 'stopped' check and handling to HttpLoadQueuePeon load and drop segment methods (#5555) (#5960) 4ad7e96 is described below commit 4ad7e965c5b66ccf9bcca18202803ca73b6d8383 Author: Jihoon Son <jihoon...@apache.org> AuthorDate: Mon Jul 9 11:23:21 2018 -0700 add 'stopped' check and handling to HttpLoadQueuePeon load and drop segment methods (#5555) (#5960) * add stopped check and handling to HttpLoadQueuePeon load and drop segment methods * fix unrelated timeout :( * revert unintended change * PR feedback: change logging * fix dumb --- .../server/coordinator/HttpLoadQueuePeon.java | 50 ++++--- .../coordinator/CuratorDruidCoordinatorTest.java | 2 +- .../server/coordinator/HttpLoadQueuePeonTest.java | 151 ++++++++++++--------- 3 files changed, 121 insertions(+), 82 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java index dbeeb73..ece1d48 100644 --- a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java @@ -28,16 +28,16 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import io.druid.java.util.common.ISE; import io.druid.java.util.common.RE; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.java.util.emitter.EmittingLogger; import io.druid.java.util.http.client.HttpClient; import io.druid.java.util.http.client.Request; import io.druid.java.util.http.client.io.AppendableByteArrayInputStream; import io.druid.java.util.http.client.response.ClientResponse; import io.druid.java.util.http.client.response.InputStreamResponseHandler; -import io.druid.java.util.common.ISE; -import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.server.coordination.DataSegmentChangeCallback; import io.druid.server.coordination.DataSegmentChangeHandler; import io.druid.server.coordination.DataSegmentChangeRequest; @@ -61,7 +61,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; @@ -261,6 +260,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon public void onFailure(Throwable t) { try { + responseHandler.description = t.toString(); logRequestFailure(t); } finally { @@ -333,20 +333,15 @@ public class HttpLoadQueuePeon extends LoadQueuePeon ScheduledExecutors.scheduleAtFixedRate( processingExecutor, new Duration(config.getHttpLoadQueuePeonRepeatDelay()), - new Callable<ScheduledExecutors.Signal>() - { - @Override - public ScheduledExecutors.Signal call() - { - if (!stopped) { - doSegmentManagement(); - } + () -> { + if (!stopped) { + doSegmentManagement(); + } - if (stopped) { - return ScheduledExecutors.Signal.STOP; - } else { - return ScheduledExecutors.Signal.REPEAT; - } + if (stopped) { + return ScheduledExecutors.Signal.STOP; + } else { + return ScheduledExecutors.Signal.REPEAT; } } ); @@ -364,11 +359,11 @@ public class HttpLoadQueuePeon extends LoadQueuePeon stopped = true; for (SegmentHolder holder : segmentsToDrop.values()) { - holder.requestSucceeded(); + holder.requestFailed("Stopping load queue peon."); } for (SegmentHolder holder : segmentsToLoad.values()) { - holder.requestSucceeded(); + holder.requestFailed("Stopping load queue peon."); } segmentsToDrop.clear(); @@ -382,6 +377,16 @@ public class HttpLoadQueuePeon extends LoadQueuePeon public void loadSegment(DataSegment segment, LoadPeonCallback callback) { synchronized (lock) { + if (stopped) { + log.warn( + "Server[%s] cannot load segment[%s] because load queue peon is stopped.", + serverId, + segment.getIdentifier() + ); + callback.execute(); + return; + } + SegmentHolder holder = segmentsToLoad.get(segment); if (holder == null) { @@ -398,6 +403,15 @@ public class HttpLoadQueuePeon extends LoadQueuePeon public void dropSegment(DataSegment segment, LoadPeonCallback callback) { synchronized (lock) { + if (stopped) { + log.warn( + "Server[%s] cannot drop segment[%s] because load queue peon is stopped.", + serverId, + segment.getIdentifier() + ); + callback.execute(); + return; + } SegmentHolder holder = segmentsToDrop.get(segment); if (holder == null) { diff --git a/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java index 9447c7b..0ed48ac 100644 --- a/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -233,7 +233,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase tearDownServerAndCurator(); } - @Test(timeout = 5_000) + @Test(timeout = 10_000) public void testMoveSegment() throws Exception { segmentViewInitLatch = new CountDownLatch(1); diff --git a/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java index 72fb9a3..c238835 100644 --- a/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java @@ -24,14 +24,14 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.druid.java.util.http.client.HttpClient; -import io.druid.java.util.http.client.Request; -import io.druid.java.util.http.client.response.HttpResponseHandler; import io.druid.discovery.DiscoveryDruidNode; import io.druid.discovery.DruidNodeDiscovery; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.RE; import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.Request; +import io.druid.java.util.http.client.response.HttpResponseHandler; import io.druid.server.ServerTestHelper; import io.druid.server.coordination.DataSegmentChangeRequest; import io.druid.server.coordination.SegmentLoadDropHandler; @@ -57,40 +57,92 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class HttpLoadQueuePeonTest { + final DataSegment segment1 = new DataSegment( + "test1", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final DataSegment segment2 = new DataSegment( + "test2", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final DataSegment segment3 = new DataSegment( + "test3", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final DataSegment segment4 = new DataSegment( + "test4", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final TestDruidCoordinatorConfig config = new TestDruidCoordinatorConfig( + null, + null, + null, + null, + null, + null, + 10, + null, + false, + false, + Duration.ZERO + ) + { + @Override + public int getHttpLoadQueuePeonBatchSize() + { + return 2; + } + }; + @Test(timeout = 10000) public void testSimple() throws Exception { - final DataSegment segment1 = new DataSegment( - "test1", Intervals.of("2014/2015"), "v1", - null, null, null, null, 0, 0 + HttpLoadQueuePeon httpLoadQueuePeon = new HttpLoadQueuePeon( + "http://dummy:4000", + ServerTestHelper.MAPPER, + new TestHttpClient(), + config, + Executors.newScheduledThreadPool( + 2, + Execs.makeThreadFactory("HttpLoadQueuePeonTest-%s") + ), + Execs.singleThreaded("HttpLoadQueuePeonTest") ); - final DataSegment segment2 = new DataSegment( - "test2", Intervals.of("2014/2015"), "v1", - null, null, null, null, 0, 0 - ); + httpLoadQueuePeon.start(); - final DataSegment segment3 = new DataSegment( - "test3", Intervals.of("2014/2015"), "v1", - null, null, null, null, 0, 0 + Map<String, CountDownLatch> latches = ImmutableMap.of( + segment1.getIdentifier(), new CountDownLatch(1), + segment2.getIdentifier(), new CountDownLatch(1), + segment3.getIdentifier(), new CountDownLatch(1), + segment4.getIdentifier(), new CountDownLatch(1) ); - final DataSegment segment4 = new DataSegment( - "test4", Intervals.of("2014/2015"), "v1", - null, null, null, null, 0, 0 - ); + httpLoadQueuePeon.dropSegment(segment1, () -> latches.get(segment1.getIdentifier()).countDown()); + httpLoadQueuePeon.loadSegment(segment2, () -> latches.get(segment2.getIdentifier()).countDown()); + httpLoadQueuePeon.dropSegment(segment3, () -> latches.get(segment3.getIdentifier()).countDown()); + httpLoadQueuePeon.loadSegment(segment4, () -> latches.get(segment4.getIdentifier()).countDown()); + + latches.get(segment1.getIdentifier()).await(); + latches.get(segment2.getIdentifier()).await(); + latches.get(segment3.getIdentifier()).await(); + latches.get(segment4.getIdentifier()).await(); + httpLoadQueuePeon.stop(); + } + + @Test(timeout = 10000) + public void testLoadDropAfterStop() throws Exception + { HttpLoadQueuePeon httpLoadQueuePeon = new HttpLoadQueuePeon( "http://dummy:4000", ServerTestHelper.MAPPER, new TestHttpClient(), - new TestDruidCoordinatorConfig(null, null, null, null, null, null, 10, null, false, false, Duration.ZERO) { - @Override - public int getHttpLoadQueuePeonBatchSize() - { - return 2; - } - }, + config, Executors.newScheduledThreadPool( 2, Execs.makeThreadFactory("HttpLoadQueuePeonTest-%s") @@ -107,48 +159,16 @@ public class HttpLoadQueuePeonTest segment4.getIdentifier(), new CountDownLatch(1) ); - httpLoadQueuePeon.dropSegment(segment1, new LoadPeonCallback() - { - @Override - public void execute() - { - latches.get(segment1.getIdentifier()).countDown(); - } - }); - - httpLoadQueuePeon.loadSegment(segment2, new LoadPeonCallback() - { - @Override - public void execute() - { - latches.get(segment2.getIdentifier()).countDown(); - } - }); - - httpLoadQueuePeon.dropSegment(segment3, new LoadPeonCallback() - { - @Override - public void execute() - { - latches.get(segment3.getIdentifier()).countDown(); - } - }); - - httpLoadQueuePeon.loadSegment(segment4, new LoadPeonCallback() - { - @Override - public void execute() - { - latches.get(segment4.getIdentifier()).countDown(); - } - }); - + httpLoadQueuePeon.dropSegment(segment1, () -> latches.get(segment1.getIdentifier()).countDown()); + httpLoadQueuePeon.loadSegment(segment2, () -> latches.get(segment2.getIdentifier()).countDown()); latches.get(segment1.getIdentifier()).await(); latches.get(segment2.getIdentifier()).await(); + httpLoadQueuePeon.stop(); + httpLoadQueuePeon.dropSegment(segment3, () -> latches.get(segment3.getIdentifier()).countDown()); + httpLoadQueuePeon.loadSegment(segment4, () -> latches.get(segment4.getIdentifier()).countDown()); latches.get(segment3.getIdentifier()).await(); latches.get(segment4.getIdentifier()).await(); - httpLoadQueuePeon.stop(); } private static class TestDruidNodeDiscovery implements DruidNodeDiscovery @@ -191,12 +211,17 @@ public class HttpLoadQueuePeonTest httpResponseHandler.handleResponse(httpResponse); try { List<DataSegmentChangeRequest> changeRequests = ServerTestHelper.MAPPER.readValue( - request.getContent().array(), new TypeReference<List<DataSegmentChangeRequest>>() {} + request.getContent().array(), new TypeReference<List<DataSegmentChangeRequest>>() + { + } ); List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> statuses = new ArrayList<>(changeRequests.size()); for (DataSegmentChangeRequest cr : changeRequests) { - statuses.add(new SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(cr, SegmentLoadDropHandler.Status.SUCCESS)); + statuses.add(new SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus( + cr, + SegmentLoadDropHandler.Status.SUCCESS + )); } return (ListenableFuture) Futures.immediateFuture( new ByteArrayInputStream( --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@druid.apache.org For additional commands, e-mail: dev-h...@druid.apache.org