> On Aug. 19, 2012, 10:51 p.m., Brock Noland wrote:
> > Nice patch! This looks to remove many of the probabilistic test failures of 
> > testRestartLogReplayV{1,2} which is exactly what I was hoping for!  A 
> > couple of items to work on below but overall I think the approach is sound.
> > 
> > == Review items ==
> > 
> > 1)  OK, this fixes the big TestFileChannel.testRestartLogReplayV{1,2} 
> > failure mode, that is lost puts and takes. With the fix it still fails 
> > eventually to replay the logs. The reason I believe this is true is that we 
> > can have this scenario:
> > 
> > put
> > checkpoint (put is written to in flights)
> > commit
> > replay
> > 
> > the put is written out in the inflight puts file and then on replay it's 
> > added to the transaction map and put back into the queue, but it was also 
> > in the queue at checkpoint time. I was able to get the test to pass 170 
> > times in a row by adding a queue remove in the replayLog method:
> > 
> >         transactionMap.put(txnID, FlumeEventPointer.fromLong(eventPointer));
> >         queue.remove(FlumeEventPointer.fromLong(eventPointer));
> > 
> > That is, if it's truly inflight, then a commit has not occurred and the 
> > record will be added to the queue when the commit has is replayed.
> > 
> > 
> > 2) The failure I got after 170 runs was caused by this scenerio:
> > 
> > put
> > checkpoint (put is written to inflights)
> > commit
> > checkpoint (no in flights and as such inflight files are not updated, thus 
> > have old data)
> > replay
> > 
> > After commenting out:
> > 
> >         if(values.isEmpty()){
> >           return;
> >         }
> > 
> > in the serializeAndWrite method, the test ran 306 times in a row without 
> > failing.
> > 
> > 3) I a little unsure of the inflight take logic.
> > 
> > take
> > checkpoint
> > commit
> > 
> > On replay, put the take back in the queue and then skip ahead to the 
> > checkpoint. At that point we replay the commit the but the commit has no 
> > seen the takes so it will not remove them the queue?
> > 
> > 
> > 
> > == Wishlist ==
> > 
> > Since we are planning on making the rest of the file format more 
> > extensible, would you be opposed  to using protocol buffers for these two 
> > files? That way we wouldn't have to upgrade when we integrate this with 
> > FLUME-1487. Basically you could copy the protocol buffers generation code 
> > from FLUME-1487. In that change we stop doing random writes to files so 
> > we'd have two files:
> > 
> > inflighttakes and inflighttakes.meta where the meta file would have the 
> > checksum
> > 
> > This might be a .proto file which would work.
> > 
> > message InFlightTransactions {
> >   repeated InFlightTransaction transactions = 1;
> > }
> > 
> > message InFlightTransaction {
> >   required sfixed64 transactionID = 1;
> >   repeated sfixed64 pointers = 3;
> > }
> > 
> > message InFlightTransactionsMetaData {
> >   required bytes checksum = 1;
> > }
> > 
> > 
> > 
> > with changes
> > v1 9.5%
> > fail 20
> > success 211
> > 
> > v2 7.5%
> > fail 7
> > success 93
> > 
> > without
> > v1 3.9%
> > fail 5
> > success 127
> > 
> > v2 5.2%
> > fail 5
> > success 95
> >
> 
> Brock Noland wrote:
>     oops, ignore "with changes" and below.
> 
> Brock Noland wrote:
>     Regarding the scenario in #3, I believe this test encapsulates the 
> problem (with the patch applied) 
>     
>       @Test
>       public void testTakeTransactionCrossingCheckpoint() throws Exception {
>         Map<String, String> overrides = Maps.newHashMap();
>         overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL,
>             String.valueOf(1000L));
>         channel = createFileChannel(overrides);
>         channel.start();
>         Assert.assertTrue(channel.isOpen());
>         List<String> in = Lists.newArrayList();
>         try {
>           while(true) {
>             in.addAll(putEvents(channel, "restart", 1, 1));
>           }
>         } catch (ChannelException e) {
>           Assert.assertEquals("Cannot acquire capacity. [channel="
>               +channel.getName()+"]", e.getMessage());
>         }
>         List<String> out = Lists.newArrayList();
>         // now take one item off the channel
>         Transaction tx = channel.getTransaction();
>         tx.begin();
>         Event e = channel.take();
>         Assert.assertNotNull(e);
>         String s = new String(e.getBody(), Charsets.UTF_8);
>         out.add(s);
>         LOG.info("Slow take got " + s);
>         Thread.sleep(2000L); // sleep so a checkpoint occurs. take is before
>         // and commit is after the checkpoint
>         tx.commit();
>         tx.close();
>         channel.stop();
>         channel = createFileChannel(overrides);
>         channel.start();
>         Assert.assertTrue(channel.isOpen());
>         // we should not geet the item we took of the queue above
>         out.addAll(takeEvents(channel, 1, Integer.MAX_VALUE));
>         Collections.sort(in);
>         Collections.sort(out);
>         if(!out.equals(in)) {
>           List<String> difference = new ArrayList<String>();
>           if(in.size() > out.size()) {
>             LOG.info("The channel shorted us");
>             difference.addAll(in);
>             difference.removeAll(out);
>           } else {
>             LOG.info("We got more events than expected, perhaps dups");
>             difference.addAll(out);
>             difference.removeAll(in);
>           }
>           LOG.error("difference = " + difference +
>               ", in.size = " + in.size() + ", out.size = " + out.size());
>           Assert.fail();
>         }
>       }
> 
> Hari Shreedharan wrote:
>     Thanks for the test. Do you mind if I added this to the unit test file?
>     
>     Thanks for such a detailed review! 
>     
>     Regarding 1 and 2:
>     * Only if commits happen, the puts are added to the queue. If the commit 
> is not seen the event is never added to the queue. So the checkpoint will not 
> have the puts if a commit did not happen(hence the need for inflight puts in 
> the first place). And during replay only if a commit for a transaction is 
> seen, do we actually put it into the queue. Also if a put commit happens 
> between 2 checkpoints, then that causes the elements.syncRequired() method to 
> return true, which will force a new inflights file to get written, if values 
> has some content to be written. The problem is that when values.isEmpty() 
> returns true, we still need to truncate the old file, so that we don't leave 
> stale data as is. Nice catch!
>     
>     3 is interesting. This was just me being lazy. I wanted to put the take 
> replays also in the replay logic, but thought there is no need, but it turns 
> out there is! Will do that.
> 
> Brock Noland wrote:
>     Yes please add the unit test.
>     
>     Also, can you expand on #1?  I got a log file like so:
>     
>       1345405182865, 1345405184951, 2, 72894, Put, FlumeEventPointer 
> [fileID=2, offset=72894]
>                                   1345405184952 -- checkpoint
>       1345405182865, 1345405184953, 2, 72974, Commit
>     
>     which was resolved by adding the:
>       queue.remove(FlumeEventPointer.fromLong(eventPointer));

Brock: 
A checkpoint would not get written unless and until at least one put gets 
committed(since elements.syncRequired() returns false). I just wrote a unit 
test that waits for a checkpoint after a put, and it just times out since the 
checkpoint never comes. 


- Hari


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/6683/#review10513
-----------------------------------------------------------


On Aug. 18, 2012, 8:40 a.m., Hari Shreedharan wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/6683/
> -----------------------------------------------------------
> 
> (Updated Aug. 18, 2012, 8:40 a.m.)
> 
> 
> Review request for Flume and Brock Noland.
> 
> 
> Description
> -------
> 
> Flume Event Queue now keeps a copy of the event pointers to uncommitted puts 
> and takes. It serializes these on every checkpoint and deserializes these on 
> replay and reinserts these into either the event queue(for takes) or to the 
> replay queue(for puts). 
> 
> I could have used the PutList and TakeList of the transaction for this, but I 
> didn't really like the approach. I don't want to be sharing this kind of data 
> between multiple layers, since that makes it complex to change the 
> FlumeEventQueue implementation without causing major changes in 
> FileBackedTransaction. Also it would lead to a number of cross layer calls to 
> read data - which makes the approach less clean.
> With my current approach, by localizing most changes to the FlumeEventQueue 
> class, only a couple of function calls would need to be removed/modified. 
> Agreed that this is going to be some memory overhead, but this is 
> insignificant compared to the event queue size itself. This would be hardly a 
> few MB extra in memory - but if that gives me cleaner implementation, I would 
> prefer that.
> 
> 
> This addresses bug FLUME-1437.
>     https://issues.apache.org/jira/browse/FLUME-1437
> 
> 
> Diffs
> -----
> 
>   
> flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
>  e7735e8 
>   
> flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
>  9bfee2d 
>   
> flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
>  11f1e1f 
>   
> flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
>  bbca62c 
>   
> flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
>  7ec5916 
>   
> flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
>  1d5a0f9 
>   
> flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
>  569b7c7 
>   
> flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
>  e0b5e3f 
> 
> Diff: https://reviews.apache.org/r/6683/diff/
> 
> 
> Testing
> -------
> 
> Added 4 new unit tests (2 to TestFileChannel.java to test the actual use 
> case, and 2 to TestFlumeEventQueue.java to test the actual functionality of 
> serialization/deserialization).
> 
> 
> Thanks,
> 
> Hari Shreedharan
> 
>

Reply via email to