> On Aug. 19, 2012, 10:51 p.m., Brock Noland wrote:
> > Nice patch! This looks to remove many of the probabilistic test failures of 
> > testRestartLogReplayV{1,2} which is exactly what I was hoping for!  A 
> > couple of items to work on below but overall I think the approach is sound.
> > 
> > == Review items ==
> > 
> > 1)  OK, this fixes the big TestFileChannel.testRestartLogReplayV{1,2} 
> > failure mode, that is lost puts and takes. With the fix it still fails 
> > eventually to replay the logs. The reason I believe this is true is that we 
> > can have this scenario:
> > 
> > put
> > checkpoint (put is written to in flights)
> > commit
> > replay
> > 
> > the put is written out in the inflight puts file and then on replay it's 
> > added to the transaction map and put back into the queue, but it was also 
> > in the queue at checkpoint time. I was able to get the test to pass 170 
> > times in a row by adding a queue remove in the replayLog method:
> > 
> >         transactionMap.put(txnID, FlumeEventPointer.fromLong(eventPointer));
> >         queue.remove(FlumeEventPointer.fromLong(eventPointer));
> > 
> > That is, if it's truly inflight, then a commit has not occurred and the 
> > record will be added to the queue when the commit has is replayed.
> > 
> > 
> > 2) The failure I got after 170 runs was caused by this scenerio:
> > 
> > put
> > checkpoint (put is written to inflights)
> > commit
> > checkpoint (no in flights and as such inflight files are not updated, thus 
> > have old data)
> > replay
> > 
> > After commenting out:
> > 
> >         if(values.isEmpty()){
> >           return;
> >         }
> > 
> > in the serializeAndWrite method, the test ran 306 times in a row without 
> > failing.
> > 
> > 3) I a little unsure of the inflight take logic.
> > 
> > take
> > checkpoint
> > commit
> > 
> > On replay, put the take back in the queue and then skip ahead to the 
> > checkpoint. At that point we replay the commit the but the commit has no 
> > seen the takes so it will not remove them the queue?
> > 
> > 
> > 
> > == Wishlist ==
> > 
> > Since we are planning on making the rest of the file format more 
> > extensible, would you be opposed  to using protocol buffers for these two 
> > files? That way we wouldn't have to upgrade when we integrate this with 
> > FLUME-1487. Basically you could copy the protocol buffers generation code 
> > from FLUME-1487. In that change we stop doing random writes to files so 
> > we'd have two files:
> > 
> > inflighttakes and inflighttakes.meta where the meta file would have the 
> > checksum
> > 
> > This might be a .proto file which would work.
> > 
> > message InFlightTransactions {
> >   repeated InFlightTransaction transactions = 1;
> > }
> > 
> > message InFlightTransaction {
> >   required sfixed64 transactionID = 1;
> >   repeated sfixed64 pointers = 3;
> > }
> > 
> > message InFlightTransactionsMetaData {
> >   required bytes checksum = 1;
> > }
> > 
> > 
> > 
> > with changes
> > v1 9.5%
> > fail 20
> > success 211
> > 
> > v2 7.5%
> > fail 7
> > success 93
> > 
> > without
> > v1 3.9%
> > fail 5
> > success 127
> > 
> > v2 5.2%
> > fail 5
> > success 95
> >
> 
> Brock Noland wrote:
>     oops, ignore "with changes" and below.
> 
> Brock Noland wrote:
>     Regarding the scenario in #3, I believe this test encapsulates the 
> problem (with the patch applied) 
>     
>       @Test
>       public void testTakeTransactionCrossingCheckpoint() throws Exception {
>         Map<String, String> overrides = Maps.newHashMap();
>         overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL,
>             String.valueOf(1000L));
>         channel = createFileChannel(overrides);
>         channel.start();
>         Assert.assertTrue(channel.isOpen());
>         List<String> in = Lists.newArrayList();
>         try {
>           while(true) {
>             in.addAll(putEvents(channel, "restart", 1, 1));
>           }
>         } catch (ChannelException e) {
>           Assert.assertEquals("Cannot acquire capacity. [channel="
>               +channel.getName()+"]", e.getMessage());
>         }
>         List<String> out = Lists.newArrayList();
>         // now take one item off the channel
>         Transaction tx = channel.getTransaction();
>         tx.begin();
>         Event e = channel.take();
>         Assert.assertNotNull(e);
>         String s = new String(e.getBody(), Charsets.UTF_8);
>         out.add(s);
>         LOG.info("Slow take got " + s);
>         Thread.sleep(2000L); // sleep so a checkpoint occurs. take is before
>         // and commit is after the checkpoint
>         tx.commit();
>         tx.close();
>         channel.stop();
>         channel = createFileChannel(overrides);
>         channel.start();
>         Assert.assertTrue(channel.isOpen());
>         // we should not geet the item we took of the queue above
>         out.addAll(takeEvents(channel, 1, Integer.MAX_VALUE));
>         Collections.sort(in);
>         Collections.sort(out);
>         if(!out.equals(in)) {
>           List<String> difference = new ArrayList<String>();
>           if(in.size() > out.size()) {
>             LOG.info("The channel shorted us");
>             difference.addAll(in);
>             difference.removeAll(out);
>           } else {
>             LOG.info("We got more events than expected, perhaps dups");
>             difference.addAll(out);
>             difference.removeAll(in);
>           }
>           LOG.error("difference = " + difference +
>               ", in.size = " + in.size() + ", out.size = " + out.size());
>           Assert.fail();
>         }
>       }
> 
> Hari Shreedharan wrote:
>     Thanks for the test. Do you mind if I added this to the unit test file?
>     
>     Thanks for such a detailed review! 
>     
>     Regarding 1 and 2:
>     * Only if commits happen, the puts are added to the queue. If the commit 
> is not seen the event is never added to the queue. So the checkpoint will not 
> have the puts if a commit did not happen(hence the need for inflight puts in 
> the first place). And during replay only if a commit for a transaction is 
> seen, do we actually put it into the queue. Also if a put commit happens 
> between 2 checkpoints, then that causes the elements.syncRequired() method to 
> return true, which will force a new inflights file to get written, if values 
> has some content to be written. The problem is that when values.isEmpty() 
> returns true, we still need to truncate the old file, so that we don't leave 
> stale data as is. Nice catch!
>     
>     3 is interesting. This was just me being lazy. I wanted to put the take 
> replays also in the replay logic, but thought there is no need, but it turns 
> out there is! Will do that.
> 
> Brock Noland wrote:
>     Yes please add the unit test.
>     
>     Also, can you expand on #1?  I got a log file like so:
>     
>       1345405182865, 1345405184951, 2, 72894, Put, FlumeEventPointer 
> [fileID=2, offset=72894]
>                                   1345405184952 -- checkpoint
>       1345405182865, 1345405184953, 2, 72974, Commit
>     
>     which was resolved by adding the:
>       queue.remove(FlumeEventPointer.fromLong(eventPointer));
> 
> Hari Shreedharan wrote:
>     Brock: 
>     A checkpoint would not get written unless and until at least one put gets 
> committed(since elements.syncRequired() returns false). I just wrote a unit 
> test that waits for a checkpoint after a put, and it just times out since the 
> checkpoint never comes. 
>
> 
> Brock Noland wrote:
>     Right. But there is a race condition between:
>     
>             synchronized (queue) {
>               while(!putList.isEmpty()) {
>                 if(!queue.addTail(putList.removeFirst())) {
>             }
>     
>     and the 
>     
>               queue.completeTransaction(transactionID);
>     
>     which is in log.commit. So it's possible for a checkpoint to happen 
> between the time we add the event to the queue (via addTail) and the time we 
> complete the transaction. If a checkpoint happens during this time and we 
> crash/restart, that event will be added to the queue twice, no? Once because 
> it was added the queue and written to disk and once because it was written 
> out to the in inflightputs file.
>     
>     Seems like queue.completeTransaction(transactionID); should be done 
> inside that queue lock.
> 
> Hari Shreedharan wrote:
>     You are right. There are 2 ways in whcih we could resolve this :
>     1. Move the queue.completeTxn() calls to the transaction level, and put 
> them in synchronized(queue) blocks - so that the log and the queue are 
> written atomically. This will reduce the concurrency of take commits and 
> rollbacks from the current levels, though it is unlikely to affect put 
> commits that much(considering the amount of work already being done).
>     2. Let the race condition be, and let replay handle the case where the 
> events are both in pending and in the queue(by called queue.remove()).
>     
>     I prefer 2 over 1 because that will not affect the performance of the 
> channel for take commits and rollbacks. Thoughts?

I can confirm both 1 and 2 worked in my testing. I prefer 1. The overhead 
should be trivial since it's an in memory operation on a very small data 
structure. It's also is more obvious, I had a good time finding that the queue 
methods were being called in the Log class and not the Transaction class as 
that is where the rest of the operations are at.

I'd also prefer we move any of the queue operations (like the on in the put)  
up the Transaction level. In the Log class other than calling checkpoint and a 
few getters we don't do any queue operations. They are all up in the 
Transaction class.


- Brock


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/6683/#review10513
-----------------------------------------------------------


On Aug. 18, 2012, 8:40 a.m., Hari Shreedharan wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/6683/
> -----------------------------------------------------------
> 
> (Updated Aug. 18, 2012, 8:40 a.m.)
> 
> 
> Review request for Flume and Brock Noland.
> 
> 
> Description
> -------
> 
> Flume Event Queue now keeps a copy of the event pointers to uncommitted puts 
> and takes. It serializes these on every checkpoint and deserializes these on 
> replay and reinserts these into either the event queue(for takes) or to the 
> replay queue(for puts). 
> 
> I could have used the PutList and TakeList of the transaction for this, but I 
> didn't really like the approach. I don't want to be sharing this kind of data 
> between multiple layers, since that makes it complex to change the 
> FlumeEventQueue implementation without causing major changes in 
> FileBackedTransaction. Also it would lead to a number of cross layer calls to 
> read data - which makes the approach less clean.
> With my current approach, by localizing most changes to the FlumeEventQueue 
> class, only a couple of function calls would need to be removed/modified. 
> Agreed that this is going to be some memory overhead, but this is 
> insignificant compared to the event queue size itself. This would be hardly a 
> few MB extra in memory - but if that gives me cleaner implementation, I would 
> prefer that.
> 
> 
> This addresses bug FLUME-1437.
>     https://issues.apache.org/jira/browse/FLUME-1437
> 
> 
> Diffs
> -----
> 
>   
> flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
>  e7735e8 
>   
> flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
>  9bfee2d 
>   
> flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
>  11f1e1f 
>   
> flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
>  bbca62c 
>   
> flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
>  7ec5916 
>   
> flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
>  1d5a0f9 
>   
> flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
>  569b7c7 
>   
> flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
>  e0b5e3f 
> 
> Diff: https://reviews.apache.org/r/6683/diff/
> 
> 
> Testing
> -------
> 
> Added 4 new unit tests (2 to TestFileChannel.java to test the actual use 
> case, and 2 to TestFlumeEventQueue.java to test the actual functionality of 
> serialization/deserialization).
> 
> 
> Thanks,
> 
> Hari Shreedharan
> 
>

Reply via email to