[ 
https://issues.apache.org/jira/browse/STORM-756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15039536#comment-15039536
 ] 

ASF GitHub Bot commented on STORM-756:
--------------------------------------

Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/920#discussion_r46637732
  
    --- Diff: 
storm-core/test/jvm/backtype/storm/utils/ShellBoltMessageQueueTest.java ---
    @@ -0,0 +1,85 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.utils;
    +
    +import backtype.storm.multilang.BoltMsg;
    +import com.google.common.collect.Lists;
    +import junit.framework.TestCase;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.TimeUnit;
    +
    +public class ShellBoltMessageQueueTest extends TestCase {
    +    @Test
    +    public void testPollTaskIdsFirst() throws InterruptedException {
    +        ShellBoltMessageQueue queue = new ShellBoltMessageQueue();
    +
    +        // put bolt message first, then put task ids
    +        queue.putBoltMsg(new BoltMsg());
    +        ArrayList<Integer> taskIds = Lists.newArrayList(1, 2, 3);
    +        queue.putTaskIds(taskIds);
    +
    +        Object msg = queue.poll(10, TimeUnit.SECONDS);
    +
    +        // task ids should be pulled first
    +        assertTrue(msg instanceof List<?>);
    +        assertEquals(msg, taskIds);
    +    }
    +
    +    @Test
    +    public void testPollWhileThereAreNoDataAvailable() throws 
InterruptedException {
    +        ShellBoltMessageQueue queue = new ShellBoltMessageQueue();
    +
    +        long start = System.currentTimeMillis();
    +        Object msg = queue.poll(1, TimeUnit.SECONDS);
    +        long finish = System.currentTimeMillis();
    +        long waitDuration = finish - start;
    +
    +        assertNull(msg);
    +        assertTrue("wait duration should be equal or greater than 1000, 
current: " + waitDuration, waitDuration >= 1000);
    --- End diff --
    
    The difference between #897 and #920 is just this line.
    I was checking (finish - start) > 1000, which was not accurate because its 
difference could be 1000.xxx milliseconds, which makes waitDuration to 1000.


> [multilang] Introduce overflow control mechanism
> ------------------------------------------------
>
>                 Key: STORM-756
>                 URL: https://issues.apache.org/jira/browse/STORM-756
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-multilang
>    Affects Versions: 0.10.0, 0.9.4, 0.11.0
>            Reporter: Jungtaek Lim
>            Assignee: Jungtaek Lim
>
> It's from STORM-738, 
> https://issues.apache.org/jira/browse/STORM-738?focusedCommentId=14394106&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14394106
> A. ShellBolt side control
> We can modify ShellBolt to have sent tuple ids list, and stop sending tuples 
> when list exceeds configured max value. In order to achieve this, subprocess 
> should notify "tuple id is complete" to ShellBolt.
> * It introduces new commands for multi-lang, "proceed" (or better name)
> * ShellBolt stores in-progress-of-processing tuples list.
> * Its overhead could be big, subprocess should always notify to ShellBolt 
> when any tuples are processed.
> B. subprocess side control
> We can modify subprocess to check pending queue after reading tuple.
> If it exceeds configured max value, subprocess can request "delay" to 
> ShellBolt for slowing down.
> When ShellBolt receives "delay", BoltWriterRunnable should stop polling 
> pending queue and continue polling later.
> How long ShellBolt wait for resending? Its unit would be "delay time" or 
> "tuple count". I don't know which is better yet.
> * It introduces new commands for multi-lang, "delay" (or better name)
> * I don't think it would be introduced soon, but subprocess can request delay 
> based on own statistics. (ex. pending tuple count * average tuple processed 
> time for time unit, average pending tuple count for count unit)
> ** We can leave when and how much to request "delay" to user. User can make 
> his/her own algorithm to control flooding.
> In my opinion B seems to more natural cause current issue is by subprocess 
> side so it would be better to let subprocess overcome it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to