[ 
https://issues.apache.org/jira/browse/FLUME-2581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14281142#comment-14281142
 ] 

Hari Shreedharan commented on FLUME-2581:
-----------------------------------------

Thanks for the tests Roshan. I see that the indexing code we added to fix one 
special case (where the log files may not have the latest checkpoint offsets), 
we may have caused an average slowdown. So I am considering this - how about we 
make the indexing conditional? If a user is seeing really slow replay due to 
that one special case, they could use the indexing - maybe we can check for all 
the files and then decide to index or not? But as a first step, lets make the 
indexing optional? 

> File Channel replay speed improvement
> -------------------------------------
>
>                 Key: FLUME-2581
>                 URL: https://issues.apache.org/jira/browse/FLUME-2581
>             Project: Flume
>          Issue Type: Bug
>    Affects Versions: v1.5.1
>            Reporter: Roshan Naik
>         Attachments: replayspeed.patch
>
>
> The following two jiras were meant to address file channel replay performance 
> in v1.4:
> FLUME-2155 committed in flume-1.5 (released)
> FLUME-2450 committed in flume-1.6 (currently trunk)
> My measurements are showing that file channel replay speed was actually 
> fastest in flume 1.4 (prior to these jiras being committed).  It was 
> significantly slower in flume 1.5 (due to flume FLUME-2155). 
> FLUME-2450 has brought it back to near 1.4 speed. So net effect is almost 0. 
> For measuring i wrote a unit test that pumped 5 million events and drained 
> about half of it. Every time a batch of events was inserted, about half of 
> them were drained.  At the end the FC was left with about 2.5 mill events. At 
> this point i restart the FC and measure how long it took to come up.
> Here are a couple readings for each flume version:
> 1.4 :  34 sec,   34 sec
> 1.5 : 236 sec, 295 sec
> 1.6 :  34 sec,    36 sec
> below is test code that i added in TestCheckpointRebuilder
> {code}
>   // Starts with empty FC, pumps and drains some data, does a chkPt,
>   // again pumps and drains some data, stops fc, measures replay speed
>   @Test
>   public void testReplaySpeed_WithDrain() throws Exception {
>     //1 start FC
>     Map<String, String> overrides = Maps.newHashMap();
>     overrides.put(FileChannelConfiguration.CAPACITY,
>             String.valueOf(6000000));
>     overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY,
>             String.valueOf(5000));
>     channel = createFileChannel(overrides);
>     channel.start();
>     //2 pump and drain data
>     Assert.assertTrue(channel.isOpen());
>     fillAndDrainChannel(channel, "test");
>     //3 stop
>     channel.stop();
>     channel.close();
>     // 4 instantiate new fc
>     channel = createFileChannel(overrides);
>     //5 start FC .. measure replay
>     System.out.println("=========== > CLOCK STARTS NOW ");
>     long start = System.currentTimeMillis();
>     channel.start();
>     long end = System.currentTimeMillis();
>     System.out.println("=========== > Time Taken : " + (end-start) + " sec");
>     Assert.assertTrue(channel.isOpen());
>     // 6 wrap up
>     channel.stop();
>     channel.close();
>   }
>   public static void fillAndDrainChannel(final Channel channel, final String 
> prefix)
>           throws Exception {
>     int[] batchSizes = new int[] {
>             1000, 100, 10, 1
>     };
>     for (int i = 0; i < batchSizes.length; i++) {
>       try {
>         while(true) {
>           Set<String> batch = putEvents(channel, prefix, batchSizes[i],
>                   Integer.MAX_VALUE, true);
>           if(batch.isEmpty()) {
>             break;
>           }
>           TestUtils.takeEvents(channel, batch.size() / 2, batch.size() / 2);
>         }
>       } catch (ChannelException e) {
>         Assert.assertTrue(("The channel has reached it's capacity. This might 
> "
>                 + "be the result of a sink on the channel having too low of 
> batch "
>                 + "size, a downstream system running slower than normal, or 
> that "
>                 + "the channel capacity is just too low. [channel="
>                 + channel.getName() + "]").equals(e.getMessage())
>                 || e.getMessage().startsWith("Put queue for 
> FileBackedTransaction " +
>                 "of capacity "));
>       }
>     }
>   }
> {code}
> FYI: TestUtils.createFileChannel seems to set the capacity.. so i  commented 
> it out for this test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to