Roshan Naik created FLUME-2581:
----------------------------------

             Summary: File Channel replay speed improvement
                 Key: FLUME-2581
                 URL: https://issues.apache.org/jira/browse/FLUME-2581
             Project: Flume
          Issue Type: Bug
    Affects Versions: v1.5.1
            Reporter: Roshan Naik


The following two jiras were meant to address file channel replay performance 
in v1.4:

FLUME-2155 committed in flume-1.5 (released)
FLUME-2450 committed in flume-1.6 (currently trunk)

My measurements are showing that file channel replay speed was actually fastest 
in flume 1.4 (prior to these jiras being committed).  It was significantly 
slower in flume 1.5 (due to flume FLUME-2155). 
FLUME-2450 has brought it back to near 1.4 speed. So net effect is almost 0. 

For measuring i wrote a unit test that pumped 5 million events and drained 
about half of it. Every time a batch of events was inserted, about half of them 
were drained.  At the end the FC was left with about 2.5 mill events. At this 
point i restart the FC and measure how long it took to come up.

Here are a couple readings for each flume version:

1.4 :  33,709 sec,   33,249 sec
1.5 : 236,470 sec, 294,530 sec
1.6 :  33,934 sec,    35,934 sec

below is test code that i added in TestCheckpointRebuilder
{code}
  // Starts with empty FC, pumps and drains some data, does a chkPt,
  // again pumps and drains some data, stops fc, measures replay speed
  @Test
  public void testReplaySpeed_WithDrain() throws Exception {
    //1 start FC
    Map<String, String> overrides = Maps.newHashMap();
    overrides.put(FileChannelConfiguration.CAPACITY,
            String.valueOf(6000000));
    overrides.put(FileChannelConfiguration.TRANSACTION_CAPACITY,
            String.valueOf(5000));
    channel = createFileChannel(overrides);
    channel.start();

    //2 pump and drain data
    Assert.assertTrue(channel.isOpen());
    fillAndDrainChannel(channel, "test");

    //3 stop
    channel.stop();
    channel.close();

    // 4 instantiate new fc
    channel = createFileChannel(overrides);

    //5 start FC .. measure replay
    System.out.println("=========== > CLOCK STARTS NOW ");
    long start = System.currentTimeMillis();
    channel.start();
    long end = System.currentTimeMillis();
    System.out.println("=========== > Time Taken : " + (end-start) + " sec");
    Assert.assertTrue(channel.isOpen());

    // 6 wrap up
    channel.stop();
    channel.close();
  }

  public static void fillAndDrainChannel(final Channel channel, final String 
prefix)
          throws Exception {
    int[] batchSizes = new int[] {
            1000, 100, 10, 1
    };
    for (int i = 0; i < batchSizes.length; i++) {
      try {
        while(true) {
          Set<String> batch = putEvents(channel, prefix, batchSizes[i],
                  Integer.MAX_VALUE, true);
          if(batch.isEmpty()) {
            break;
          }
          TestUtils.takeEvents(channel, batch.size() / 2, batch.size() / 2);

        }
      } catch (ChannelException e) {
        Assert.assertTrue(("The channel has reached it's capacity. This might "
                + "be the result of a sink on the channel having too low of 
batch "
                + "size, a downstream system running slower than normal, or 
that "
                + "the channel capacity is just too low. [channel="
                + channel.getName() + "]").equals(e.getMessage())
                || e.getMessage().startsWith("Put queue for 
FileBackedTransaction " +
                "of capacity "));
      }
    }
  }
{code}

FYI: TestUtils.createFileChannel seems to set the capacity.. so i  commented it 
out for this test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to