> On Aug. 19, 2012, 10:51 p.m., Brock Noland wrote:
> > Nice patch! This looks to remove many of the probabilistic test failures of 
> > testRestartLogReplayV{1,2} which is exactly what I was hoping for!  A 
> > couple of items to work on below but overall I think the approach is sound.
> > 
> > == Review items ==
> > 
> > 1)  OK, this fixes the big TestFileChannel.testRestartLogReplayV{1,2} 
> > failure mode, that is lost puts and takes. With the fix it still fails 
> > eventually to replay the logs. The reason I believe this is true is that we 
> > can have this scenario:
> > 
> > put
> > checkpoint (put is written to in flights)
> > commit
> > replay
> > 
> > the put is written out in the inflight puts file and then on replay it's 
> > added to the transaction map and put back into the queue, but it was also 
> > in the queue at checkpoint time. I was able to get the test to pass 170 
> > times in a row by adding a queue remove in the replayLog method:
> > 
> >         transactionMap.put(txnID, FlumeEventPointer.fromLong(eventPointer));
> >         queue.remove(FlumeEventPointer.fromLong(eventPointer));
> > 
> > That is, if it's truly inflight, then a commit has not occurred and the 
> > record will be added to the queue when the commit has is replayed.
> > 
> > 
> > 2) The failure I got after 170 runs was caused by this scenerio:
> > 
> > put
> > checkpoint (put is written to inflights)
> > commit
> > checkpoint (no in flights and as such inflight files are not updated, thus 
> > have old data)
> > replay
> > 
> > After commenting out:
> > 
> >         if(values.isEmpty()){
> >           return;
> >         }
> > 
> > in the serializeAndWrite method, the test ran 306 times in a row without 
> > failing.
> > 
> > 3) I a little unsure of the inflight take logic.
> > 
> > take
> > checkpoint
> > commit
> > 
> > On replay, put the take back in the queue and then skip ahead to the 
> > checkpoint. At that point we replay the commit the but the commit has no 
> > seen the takes so it will not remove them the queue?
> > 
> > 
> > 
> > == Wishlist ==
> > 
> > Since we are planning on making the rest of the file format more 
> > extensible, would you be opposed  to using protocol buffers for these two 
> > files? That way we wouldn't have to upgrade when we integrate this with 
> > FLUME-1487. Basically you could copy the protocol buffers generation code 
> > from FLUME-1487. In that change we stop doing random writes to files so 
> > we'd have two files:
> > 
> > inflighttakes and inflighttakes.meta where the meta file would have the 
> > checksum
> > 
> > This might be a .proto file which would work.
> > 
> > message InFlightTransactions {
> >   repeated InFlightTransaction transactions = 1;
> > }
> > 
> > message InFlightTransaction {
> >   required sfixed64 transactionID = 1;
> >   repeated sfixed64 pointers = 3;
> > }
> > 
> > message InFlightTransactionsMetaData {
> >   required bytes checksum = 1;
> > }
> > 
> > 
> > 
> > with changes
> > v1 9.5%
> > fail 20
> > success 211
> > 
> > v2 7.5%
> > fail 7
> > success 93
> > 
> > without
> > v1 3.9%
> > fail 5
> > success 127
> > 
> > v2 5.2%
> > fail 5
> > success 95
> >
> 
> Brock Noland wrote:
>     oops, ignore "with changes" and below.
> 
> Brock Noland wrote:
>     Regarding the scenario in #3, I believe this test encapsulates the 
> problem (with the patch applied) 
>     
>       @Test
>       public void testTakeTransactionCrossingCheckpoint() throws Exception {
>         Map<String, String> overrides = Maps.newHashMap();
>         overrides.put(FileChannelConfiguration.CHECKPOINT_INTERVAL,
>             String.valueOf(1000L));
>         channel = createFileChannel(overrides);
>         channel.start();
>         Assert.assertTrue(channel.isOpen());
>         List<String> in = Lists.newArrayList();
>         try {
>           while(true) {
>             in.addAll(putEvents(channel, "restart", 1, 1));
>           }
>         } catch (ChannelException e) {
>           Assert.assertEquals("Cannot acquire capacity. [channel="
>               +channel.getName()+"]", e.getMessage());
>         }
>         List<String> out = Lists.newArrayList();
>         // now take one item off the channel
>         Transaction tx = channel.getTransaction();
>         tx.begin();
>         Event e = channel.take();
>         Assert.assertNotNull(e);
>         String s = new String(e.getBody(), Charsets.UTF_8);
>         out.add(s);
>         LOG.info("Slow take got " + s);
>         Thread.sleep(2000L); // sleep so a checkpoint occurs. take is before
>         // and commit is after the checkpoint
>         tx.commit();
>         tx.close();
>         channel.stop();
>         channel = createFileChannel(overrides);
>         channel.start();
>         Assert.assertTrue(channel.isOpen());
>         // we should not geet the item we took of the queue above
>         out.addAll(takeEvents(channel, 1, Integer.MAX_VALUE));
>         Collections.sort(in);
>         Collections.sort(out);
>         if(!out.equals(in)) {
>           List<String> difference = new ArrayList<String>();
>           if(in.size() > out.size()) {
>             LOG.info("The channel shorted us");
>             difference.addAll(in);
>             difference.removeAll(out);
>           } else {
>             LOG.info("We got more events than expected, perhaps dups");
>             difference.addAll(out);
>             difference.removeAll(in);
>           }
>           LOG.error("difference = " + difference +
>               ", in.size = " + in.size() + ", out.size = " + out.size());
>           Assert.fail();
>         }
>       }
> 
> Hari Shreedharan wrote:
>     Thanks for the test. Do you mind if I added this to the unit test file?
>     
>     Thanks for such a detailed review! 
>     
>     Regarding 1 and 2:
>     * Only if commits happen, the puts are added to the queue. If the commit 
> is not seen the event is never added to the queue. So the checkpoint will not 
> have the puts if a commit did not happen(hence the need for inflight puts in 
> the first place). And during replay only if a commit for a transaction is 
> seen, do we actually put it into the queue. Also if a put commit happens 
> between 2 checkpoints, then that causes the elements.syncRequired() method to 
> return true, which will force a new inflights file to get written, if values 
> has some content to be written. The problem is that when values.isEmpty() 
> returns true, we still need to truncate the old file, so that we don't leave 
> stale data as is. Nice catch!
>     
>     3 is interesting. This was just me being lazy. I wanted to put the take 
> replays also in the replay logic, but thought there is no need, but it turns 
> out there is! Will do that.

Yes please add the unit test.

Also, can you expand on #1?  I got a log file like so:

  1345405182865, 1345405184951, 2, 72894, Put, FlumeEventPointer [fileID=2, 
offset=72894]
                              1345405184952 -- checkpoint
  1345405182865, 1345405184953, 2, 72974, Commit

which was resolved by adding the:
  queue.remove(FlumeEventPointer.fromLong(eventPointer));


> On Aug. 19, 2012, 10:51 p.m., Brock Noland wrote:
> > flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java,
> >  line 647
> > <https://reviews.apache.org/r/6683/diff/3/?file=142790#file142790line647>
> >
> >     If we have no inflights this time, we will *not* overwrite the previous 
> > files which means they will get replayed later down the line.
> 
> Hari Shreedharan wrote:
>     No, we don't actually clear the inflights map on every write. So the 
> previous set of values will get written out as is. That said, if there is a 
> take commit, then syncRequired() returns false, and checkpoint will not be 
> forced. So added a method to make sure take commits cause in flights to get 
> written out.

So as long as we truncate the file it's fine. Reading the code, it actually 
truncates the file if you comment that out and it does work in practice as well.


> On Aug. 19, 2012, 10:51 p.m., Brock Noland wrote:
> > flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java,
> >  line 638
> > <https://reviews.apache.org/r/6683/diff/3/?file=142790#file142790line638>
> >
> >     future.get is never called so any exception will not be propagated
> 
> Hari Shreedharan wrote:
>     Actually exceptions are not what we are looking for. Any exceptions will 
> cause a bad checksum and we won't use the files. This is only meant to make 
> sure there is only one write at a time.

Then I think we should log the exception in the runnable and not propagate.


- Brock


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/6683/#review10513
-----------------------------------------------------------


On Aug. 18, 2012, 8:40 a.m., Hari Shreedharan wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/6683/
> -----------------------------------------------------------
> 
> (Updated Aug. 18, 2012, 8:40 a.m.)
> 
> 
> Review request for Flume and Brock Noland.
> 
> 
> Description
> -------
> 
> Flume Event Queue now keeps a copy of the event pointers to uncommitted puts 
> and takes. It serializes these on every checkpoint and deserializes these on 
> replay and reinserts these into either the event queue(for takes) or to the 
> replay queue(for puts). 
> 
> I could have used the PutList and TakeList of the transaction for this, but I 
> didn't really like the approach. I don't want to be sharing this kind of data 
> between multiple layers, since that makes it complex to change the 
> FlumeEventQueue implementation without causing major changes in 
> FileBackedTransaction. Also it would lead to a number of cross layer calls to 
> read data - which makes the approach less clean.
> With my current approach, by localizing most changes to the FlumeEventQueue 
> class, only a couple of function calls would need to be removed/modified. 
> Agreed that this is going to be some memory overhead, but this is 
> insignificant compared to the event queue size itself. This would be hardly a 
> few MB extra in memory - but if that gives me cleaner implementation, I would 
> prefer that.
> 
> 
> This addresses bug FLUME-1437.
>     https://issues.apache.org/jira/browse/FLUME-1437
> 
> 
> Diffs
> -----
> 
>   
> flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
>  e7735e8 
>   
> flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
>  9bfee2d 
>   
> flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
>  11f1e1f 
>   
> flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
>  bbca62c 
>   
> flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
>  7ec5916 
>   
> flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannel.java
>  1d5a0f9 
>   
> flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
>  569b7c7 
>   
> flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
>  e0b5e3f 
> 
> Diff: https://reviews.apache.org/r/6683/diff/
> 
> 
> Testing
> -------
> 
> Added 4 new unit tests (2 to TestFileChannel.java to test the actual use 
> case, and 2 to TestFlumeEventQueue.java to test the actual functionality of 
> serialization/deserialization).
> 
> 
> Thanks,
> 
> Hari Shreedharan
> 
>

Reply via email to