viongpanzi opened a new pull request #10362:
URL: https://github.com/apache/druid/pull/10362


   # Description
   
   <!-- Describe the goal of this PR, what problem are you fixing. If there is 
a corresponding issue (referenced above), it's not necessary to repeat the 
description here, however, you may choose to keep one summary sentence. -->
   
   <!-- Describe your patch: what did you change in code? How did you fix the 
problem? -->
   
   <!-- If there are several relatively logically separate changes in this PR, 
create a mini-section for each of them. For example: -->
   
   This PR try to fix the negative queueSize problem. When we upgrade our 
clusters to version 0.18.0, after running for a while, queuedSize of the 
loadQueuePeon will be negative.
   
   This can be reproduce by adding the following codes to the tail of 
testFailAssign method in LoadQueuePeonTest.
   
   ```
       curator.delete().guaranteed().forPath(loadRequestPath);
       while (null != curator.checkExists().forPath(loadRequestPath)) {
         Thread.sleep(5);
       }
       Assert.assertEquals(0, loadQueuePeon.getSegmentsToLoad().size());
       Assert.assertEquals(0L, loadQueuePeon.getLoadQueueSize());
   ```
   
   And the full testFailAssign method will be: 
   ```
   public void testFailAssign() throws Exception
     {
       final DataSegment segment = 
dataSegmentWithInterval("2014-10-22T00:00:00Z/P1D");
   
       final CountDownLatch loadRequestSignal = new CountDownLatch(1);
       final CountDownLatch segmentLoadedSignal = new CountDownLatch(1);
   
       loadQueuePeon = new CuratorLoadQueuePeon(
           curator,
           LOAD_QUEUE_PATH,
           jsonMapper,
           Execs.scheduledSingleThreaded("test_load_queue_peon_scheduled-%d"),
           Execs.singleThreaded("test_load_queue_peon-%d"),
           // set time-out to 1 ms so that LoadQueuePeon will fail the 
assignment quickly
           new TestDruidCoordinatorConfig(
               null,
               null,
               null,
               new Duration(1),
               null,
               null,
               10,
               new Duration("PT1s")
           )
       );
   
       loadQueuePeon.start();
   
       loadQueueCache.getListenable().addListener(
           new PathChildrenCacheListener()
           {
             @Override
             public void childEvent(CuratorFramework client, 
PathChildrenCacheEvent event)
             {
               if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
                 loadRequestSignal.countDown();
               }
             }
           }
       );
       loadQueueCache.start();
   
       loadQueuePeon.loadSegment(
           segment,
           new LoadPeonCallback()
           {
             @Override
             public void execute()
             {
               segmentLoadedSignal.countDown();
             }
           }
       );
   
       String loadRequestPath = ZKPaths.makePath(LOAD_QUEUE_PATH, 
segment.getId().toString());
       Assert.assertTrue(timing.forWaiting().awaitLatch(loadRequestSignal));
       Assert.assertNotNull(curator.checkExists().forPath(loadRequestPath));
       Assert.assertEquals(
           segment,
           ((SegmentChangeRequestLoad) jsonMapper
               
.readValue(curator.getData().decompressed().forPath(loadRequestPath), 
DataSegmentChangeRequest.class))
               .getSegment()
       );
   
       // don't simulate completion of load request here
       Assert.assertTrue(timing.forWaiting().awaitLatch(segmentLoadedSignal));
       Assert.assertEquals(0, loadQueuePeon.getSegmentsToLoad().size());
       Assert.assertEquals(0L, loadQueuePeon.getLoadQueueSize());
       curator.delete().guaranteed().forPath(loadRequestPath);
       while (null != curator.checkExists().forPath(loadRequestPath)) {
         Thread.sleep(5);
       }
       Assert.assertEquals(0, loadQueuePeon.getSegmentsToLoad().size());
       Assert.assertEquals(0L, loadQueuePeon.getLoadQueueSize());
     }
   ```
   After running the above method, we got the following logs:
   
   ```
   ...
   2020-09-06T22:22:38,377 DEBUG [main] 
org.apache.druid.server.coordinator.CuratorLoadQueuePeon - Asking server 
peon[/druid/loadqueue/localhost:1234] to load 
segment[test_load_queue_peon_2014-10-22T00:00:00.000Z_2014-10-23T00:00:00.000Z_2015-05-27T03:38:35.683Z]
   2020-09-06T22:22:38,519 DEBUG [test_load_queue_peon_scheduled-0] 
org.apache.druid.server.coordinator.CuratorLoadQueuePeon - ZKNode created for 
server to [/druid/loadqueue/localhost:1234] load 
[test_load_queue_peon_2014-10-22T00:00:00.000Z_2014-10-23T00:00:00.000Z_2015-05-27T03:38:35.683Z]
   2020-09-06T22:22:38,529 ERROR [test_load_queue_peon_scheduled-0] 
org.apache.druid.server.coordinator.CuratorLoadQueuePeon - 
Server[/druid/loadqueue/localhost:1234], throwable caught when submitting 
[SegmentChangeRequestLoad{segment=DataSegment{binaryVersion=9, 
id=test_load_queue_peon_2014-10-22T00:00:00.000Z_2014-10-23T00:00:00.000Z_2015-05-27T03:38:35.683Z,
 loadSpec={}, dimensions=[], metrics=[], shardSpec=NoneShardSpec, 
lastCompactionState=null, size=1200}}].
   org.apache.druid.java.util.common.ISE: 
/druid/loadqueue/localhost:1234/test_load_queue_peon_2014-10-22T00:00:00.000Z_2014-10-23T00:00:00.000Z_2015-05-27T03:38:35.683Z
 was never removed! Failing this operation!
        at 
org.apache.druid.server.coordinator.CuratorLoadQueuePeon$SegmentChangeProcessor.lambda$scheduleNodeDeletedCheck$1(CuratorLoadQueuePeon.java:285)
 ~[classes/:?]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_212]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_212]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_212]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [?:1.8.0_212]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_212]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_212]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]
   2020-09-06T22:22:38,627 DEBUG [main-EventThread] 
org.apache.druid.server.coordinator.CuratorLoadQueuePeon - 
Server[/druid/loadqueue/localhost:1234] done processing load of segment 
[/druid/loadqueue/localhost:1234/test_load_queue_peon_2014-10-22T00:00:00.000Z_2014-10-23T00:00:00.000Z_2015-05-27T03:38:35.683Z]
   2020-09-06T22:22:38,628 INFO [Curator-Framework-0] 
org.apache.curator.framework.imps.CuratorFrameworkImpl - 
backgroundOperationsLoop exiting
   2020-09-06T22:22:38,632 INFO [ProcessThread(sid:0 cport:59826):] 
org.apache.zookeeper.server.PrepRequestProcessor - Processed session 
termination for sessionid: 0x1003a64174e0000
   2020-09-06T22:22:38,632 INFO [main] org.apache.zookeeper.ZooKeeper - 
Session: 0x1003a64174e0000 closed
   2020-09-06T22:22:38,632 INFO [main-EventThread] 
org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 
0x1003a64174e0000
   2020-09-06T22:22:38,633 INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:59826] 
org.apache.zookeeper.server.NIOServerCnxn - Closed socket connection for client 
/127.0.0.1:59843 which had sessionid 0x1003a64174e0000
   2020-09-06T22:22:38,636 INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:59826] 
org.apache.zookeeper.server.NIOServerCnxnFactory - NIOServerCnxn factory exited 
run method
   2020-09-06T22:22:38,638 INFO [main] 
org.apache.zookeeper.server.ZooKeeperServer - shutting down
   2020-09-06T22:22:38,638 INFO [main] 
org.apache.zookeeper.server.SessionTrackerImpl - Shutting down
   2020-09-06T22:22:38,638 INFO [main] 
org.apache.zookeeper.server.PrepRequestProcessor - Shutting down
   2020-09-06T22:22:38,638 INFO [main] 
org.apache.zookeeper.server.SyncRequestProcessor - Shutting down
   2020-09-06T22:22:38,639 INFO [ProcessThread(sid:0 cport:59826):] 
org.apache.zookeeper.server.PrepRequestProcessor - PrepRequestProcessor exited 
loop!
   2020-09-06T22:22:38,639 INFO [SyncThread:0] 
org.apache.zookeeper.server.SyncRequestProcessor - SyncRequestProcessor exited!
   2020-09-06T22:22:38,639 INFO [main] 
org.apache.zookeeper.server.FinalRequestProcessor - shutdown of request 
processor complete
   
   java.lang.AssertionError: 
   Expected :0
   Actual   :-1200
   <Click to see difference>
   
   
        at org.junit.Assert.fail(Assert.java:88)
        at org.junit.Assert.failNotEquals(Assert.java:834)
        at org.junit.Assert.assertEquals(Assert.java:645)
        at org.junit.Assert.assertEquals(Assert.java:631)
        at 
org.apache.druid.server.coordinator.LoadQueuePeonTest.testFailAssign(LoadQueuePeonTest.java:344)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
        at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
   
   
   Process finished with exit code 255
   ```
   
   Finally, we found that the cause is actionCompleted method in 
CuratorLoadQueuePeon may be executed twice when failed loading happened and 
queuedSize will minus 2 * segment size. 
   
   ```
   private void actionCompleted(SegmentHolder segmentHolder)
     {
       switch (segmentHolder.getType()) {
         case LOAD:
           segmentsToLoad.remove(segmentHolder.getSegment());
           queuedSize.addAndGet(-segmentHolder.getSegmentSize());
           break;
         case DROP:
           segmentsToDrop.remove(segmentHolder.getSegment());
           break;
         default:
           throw new UnsupportedOperationException();
       }
       executeCallbacks(segmentHolder);
     }
   ```
   
   We need to check the returned value of segmentsToLoad.remove method, if null 
returned `queuedSize.addAndGet(-segmentHolder.getSegmentSize())` will skipped!
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are 
corner cases and error conditions handled, such as when there are insufficient 
resources?
    - Class organization and design (how the logic is split between classes, 
inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, 
parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of 
emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative 
name) for every design (or naming) decision point and compare the alternatives 
with the designs that you've implemented (or the names you've chosen) to 
highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in 
this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in 
the development mailing list), link to that discussion from this PR description 
and explain what have changed in your final design compared to your original 
proposal or the consensus version in the end of the discussion. If something 
hasn't changed since the original discussion, you can omit a detailed 
discussion of those aspects of the design here, perhaps apart from brief 
mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small 
changes. -->
   
   <hr>
   
   This PR has:
   - [x] been self-reviewed.
      - [ ] using the [concurrency 
checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md)
 (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked 
related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in 
[licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
   - [ ] added comments explaining the "why" and the intent of the code 
wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, 
ensuring the threshold for [code 
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
 is met.
   - [] added integration tests.
   - [x] been tested in a test Druid cluster.
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not 
all of these items apply to every PR. Remove the items which are not done or 
not relevant to the PR. None of the items from the checklist above are strictly 
necessary, but it would be very helpful if you at least self-review the PR. -->
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `CuratorLoadQueuePeon`
    * `LoadQueuePeonTest`
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to