viongpanzi opened a new pull request #10362:
URL: https://github.com/apache/druid/pull/10362
# Description
<!-- Describe the goal of this PR, what problem are you fixing. If there is
a corresponding issue (referenced above), it's not necessary to repeat the
description here, however, you may choose to keep one summary sentence. -->
<!-- Describe your patch: what did you change in code? How did you fix the
problem? -->
<!-- If there are several relatively logically separate changes in this PR,
create a mini-section for each of them. For example: -->
This PR try to fix the negative queueSize problem. When we upgrade our
clusters to version 0.18.0, after running for a while, queuedSize of the
loadQueuePeon will be negative.
This can be reproduce by adding the following codes to the tail of
testFailAssign method in LoadQueuePeonTest.
```
curator.delete().guaranteed().forPath(loadRequestPath);
while (null != curator.checkExists().forPath(loadRequestPath)) {
Thread.sleep(5);
}
Assert.assertEquals(0, loadQueuePeon.getSegmentsToLoad().size());
Assert.assertEquals(0L, loadQueuePeon.getLoadQueueSize());
```
And the full testFailAssign method will be:
```
public void testFailAssign() throws Exception
{
final DataSegment segment =
dataSegmentWithInterval("2014-10-22T00:00:00Z/P1D");
final CountDownLatch loadRequestSignal = new CountDownLatch(1);
final CountDownLatch segmentLoadedSignal = new CountDownLatch(1);
loadQueuePeon = new CuratorLoadQueuePeon(
curator,
LOAD_QUEUE_PATH,
jsonMapper,
Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"),
Execs.singleThreaded("test_load_queue_peon-%d"),
// set time-out to 1 ms so that LoadQueuePeon will fail the
assignment quickly
new TestDruidCoordinatorConfig(
null,
null,
null,
new Duration(1),
null,
null,
10,
new Duration("PT1s")
)
);
loadQueuePeon.start();
loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event)
{
if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
loadRequestSignal.countDown();
}
}
}
);
loadQueueCache.start();
loadQueuePeon.loadSegment(
segment,
new LoadPeonCallback()
{
@Override
public void execute()
{
segmentLoadedSignal.countDown();
}
}
);
String loadRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH,
segment.getId().toString());
Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignal));
Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath));
Assert.assertEquals(
segment,
((SegmentChangeRequestLoad) jsonMapper
.readValue(curator.getData().decompressed().forPath(loadRequestPath),
DataSegmentChangeRequest.class))
.getSegment()
);
// don't simulate completion of load request here
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignal));
Assert.assertEquals(0, loadQueuePeon.getSegmentsToLoad().size());
Assert.assertEquals(0L, loadQueuePeon.getLoadQueueSize());
curator.delete().guaranteed().forPath(loadRequestPath);
while (null != curator.checkExists().forPath(loadRequestPath)) {
Thread.sleep(5);
}
Assert.assertEquals(0, loadQueuePeon.getSegmentsToLoad().size());
Assert.assertEquals(0L, loadQueuePeon.getLoadQueueSize());
}
```
After running the above method, we got the following logs:
```
...
2020-09-06T22:22:38,377 DEBUG [main]
org.apache.druid.server.coordinator.CuratorLoadQueuePeon - Asking server
peon[/druid/loadqueue/localhost:1234] to load
segment[test_load_queue_peon_2014-10-22T00:00:00.000Z_2014-10-23T00:00:00.000Z_2015-05-27T03:38:35.683Z]
2020-09-06T22:22:38,519 DEBUG [test_load_queue_peon_scheduled-0]
org.apache.druid.server.coordinator.CuratorLoadQueuePeon - ZKNode created for
server to [/druid/loadqueue/localhost:1234] load
[test_load_queue_peon_2014-10-22T00:00:00.000Z_2014-10-23T00:00:00.000Z_2015-05-27T03:38:35.683Z]
2020-09-06T22:22:38,529 ERROR [test_load_queue_peon_scheduled-0]
org.apache.druid.server.coordinator.CuratorLoadQueuePeon -
Server[/druid/loadqueue/localhost:1234], throwable caught when submitting
[SegmentChangeRequestLoad{segment=DataSegment{binaryVersion=9,
id=test_load_queue_peon_2014-10-22T00:00:00.000Z_2014-10-23T00:00:00.000Z_2015-05-27T03:38:35.683Z,
loadSpec={}, dimensions=[], metrics=[], shardSpec=NoneShardSpec,
lastCompactionState=null, size=1200}}].
org.apache.druid.java.util.common.ISE:
/druid/loadqueue/localhost:1234/test_load_queue_peon_2014-10-22T00:00:00.000Z_2014-10-23T00:00:00.000Z_2015-05-27T03:38:35.683Z
was never removed! Failing this operation!
at
org.apache.druid.server.coordinator.CuratorLoadQueuePeon$SegmentChangeProcessor.lambda$scheduleNodeDeletedCheck$1(CuratorLoadQueuePeon.java:285)
~[classes/:?]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_212]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[?:1.8.0_212]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_212]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[?:1.8.0_212]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_212]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]
2020-09-06T22:22:38,627 DEBUG [main-EventThread]
org.apache.druid.server.coordinator.CuratorLoadQueuePeon -
Server[/druid/loadqueue/localhost:1234] done processing load of segment
[/druid/loadqueue/localhost:1234/test_load_queue_peon_2014-10-22T00:00:00.000Z_2014-10-23T00:00:00.000Z_2015-05-27T03:38:35.683Z]
2020-09-06T22:22:38,628 INFO [Curator-Framework-0]
org.apache.curator.framework.imps.CuratorFrameworkImpl -
backgroundOperationsLoop exiting
2020-09-06T22:22:38,632 INFO [ProcessThread(sid:0 cport:59826):]
org.apache.zookeeper.server.PrepRequestProcessor - Processed session
termination for sessionid: 0x1003a64174e0000
2020-09-06T22:22:38,632 INFO [main] org.apache.zookeeper.ZooKeeper -
Session: 0x1003a64174e0000 closed
2020-09-06T22:22:38,632 INFO [main-EventThread]
org.apache.zookeeper.ClientCnxn - EventThread shut down for session:
0x1003a64174e0000
2020-09-06T22:22:38,633 INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:59826]
org.apache.zookeeper.server.NIOServerCnxn - Closed socket connection for client
/127.0.0.1:59843 which had sessionid 0x1003a64174e0000
2020-09-06T22:22:38,636 INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:59826]
org.apache.zookeeper.server.NIOServerCnxnFactory - NIOServerCnxn factory exited
run method
2020-09-06T22:22:38,638 INFO [main]
org.apache.zookeeper.server.ZooKeeperServer - shutting down
2020-09-06T22:22:38,638 INFO [main]
org.apache.zookeeper.server.SessionTrackerImpl - Shutting down
2020-09-06T22:22:38,638 INFO [main]
org.apache.zookeeper.server.PrepRequestProcessor - Shutting down
2020-09-06T22:22:38,638 INFO [main]
org.apache.zookeeper.server.SyncRequestProcessor - Shutting down
2020-09-06T22:22:38,639 INFO [ProcessThread(sid:0 cport:59826):]
org.apache.zookeeper.server.PrepRequestProcessor - PrepRequestProcessor exited
loop!
2020-09-06T22:22:38,639 INFO [SyncThread:0]
org.apache.zookeeper.server.SyncRequestProcessor - SyncRequestProcessor exited!
2020-09-06T22:22:38,639 INFO [main]
org.apache.zookeeper.server.FinalRequestProcessor - shutdown of request
processor complete
java.lang.AssertionError:
Expected :0
Actual :-1200
<Click to see difference>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at org.junit.Assert.assertEquals(Assert.java:631)
at
org.apache.druid.server.coordinator.LoadQueuePeonTest.testFailAssign(LoadQueuePeonTest.java:344)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Process finished with exit code 255
```
Finally, we found that the cause is actionCompleted method in
CuratorLoadQueuePeon may be executed twice when failed loading happened and
queuedSize will minus 2 * segment size.
```
private void actionCompleted(SegmentHolder segmentHolder)
{
switch (segmentHolder.getType()) {
case LOAD:
segmentsToLoad.remove(segmentHolder.getSegment());
queuedSize.addAndGet(-segmentHolder.getSegmentSize());
break;
case DROP:
segmentsToDrop.remove(segmentHolder.getSegment());
break;
default:
throw new UnsupportedOperationException();
}
executeCallbacks(segmentHolder);
}
```
We need to check the returned value of segmentsToLoad.remove method, if null
returned `queuedSize.addAndGet(-segmentHolder.getSegmentSize())` will skipped!
<!--
In each section, please describe design decisions made, including:
- Choice of algorithms
- Behavioral aspects. What configuration values are acceptable? How are
corner cases and error conditions handled, such as when there are insufficient
resources?
- Class organization and design (how the logic is split between classes,
inheritance, composition, design patterns)
- Method organization and design (how the logic is split between methods,
parameters and return types)
- Naming (class, method, API, configuration, HTTP endpoint, names of
emitted metrics)
-->
<!-- It's good to describe an alternative design (or mention an alternative
name) for every design (or naming) decision point and compare the alternatives
with the designs that you've implemented (or the names you've chosen) to
highlight the advantages of the chosen designs and names. -->
<!-- If there was a discussion of the design of the feature implemented in
this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in
the development mailing list), link to that discussion from this PR description
and explain what have changed in your final design compared to your original
proposal or the consensus version in the end of the discussion. If something
hasn't changed since the original discussion, you can omit a detailed
discussion of those aspects of the design here, perhaps apart from brief
mentioning for the sake of readability of this PR description. -->
<!-- Some of the aspects mentioned above may be omitted for simple and small
changes. -->
<hr>
This PR has:
- [x] been self-reviewed.
- [ ] using the [concurrency
checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md)
(Remove this item if the PR doesn't have any relation to concurrency.)
- [ ] added documentation for new or modified features or behaviors.
- [ ] added Javadocs for most classes and all non-trivial methods. Linked
related entities via Javadoc links.
- [ ] added or updated version, license, or notice information in
[licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
- [ ] added comments explaining the "why" and the intent of the code
wherever would not be obvious for an unfamiliar reader.
- [x] added unit tests or modified existing tests to cover new code paths,
ensuring the threshold for [code
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
is met.
- [] added integration tests.
- [x] been tested in a test Druid cluster.
<!-- Check the items by putting "x" in the brackets for the done things. Not
all of these items apply to every PR. Remove the items which are not done or
not relevant to the PR. None of the items from the checklist above are strictly
necessary, but it would be very helpful if you at least self-review the PR. -->
<hr>
##### Key changed/added classes in this PR
* `CuratorLoadQueuePeon`
* `LoadQueuePeonTest`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]