> On Sept. 6, 2012, 7:30 a.m., Mike Percy wrote:
> > Hey, thanks for this contribution! I agree with the overall direction of
> > the approach here. However since by doing this, we are exposing these APIs
> > as public APIs, I think now would be a great time to take a good hard look
> > at FlumeFormatter itself...
> >
> > FlumeFormatter is not a very good interface, in my opinion. It is actually
> > two interfaces merged into one... someone would either call the
> > SequenceFile-related stuff, or would call getBytes(). I don't really think
> > that makes much sense and I think getBytes() should be removed from
> > FlumeFormatter. The only class that calls it is HDFSCompressedDataStream,
> > which should really just be using EventSerializer (which is what
> > HDFSDataStream uses).
> >
> > One additional thing we should consider is whether it would be doable to
> > allow a FlumeFormatter (which, sans getBytes(), is pretty targeted for
> > SequenceFiles) to write multiple key/value pairs from a single event. The
> > desire for that feature came up on the list a couple weeks ago, and if we
> > are committing to a public API then I'd like to give it a shot. It's easy
> > with EventSerializer, for example.
> >
> > Any thoughts on this? I think it's acceptable to expose the
> > SequenceFile.Writer in the interface if we need to do that to make this
> > possible. How about changing it to something like this?
> >
> > interface FlumeFormatter {
> > Class getKeyClass();
> > Class getValueClass();
> > void write(Event e, SequenceFile.Writer out);
> >
> > interface Builder {
> > FlumeFormatter build(Context context);
> > }
> > }
> >
>
> Chris Birchall wrote:
> Mike,
>
> Thanks for the review. I agree with your suggested interface with
> FlumeFormatter. I think it's reasonable to assume that anybody writing their
> own FlumeFormatter will know how to handle a SequenceFile.Writer directly.
> This gives users a lot of flexibility (e.g. to call sync() to finish a
> compression block when they want to, or to call syncFs() in order to ensure
> their data has been replicated).
>
> With these changes FlumeFormatter will become dependent on Hadoop
> classes, so I guess it should be moved from flume-ng-core to flume-hdfs-sink.
> I am also tempted to change the name, as the interface is now
> SequenceFile-specific and is now in charge of writing, rather than just
> extracting a key and value. Maybe HDFSSeqFileWriter?
>
> I will have a play around and attach another patch, probably next week.
Chris, ya know, maybe my API suggestion is not great, being low level and
allowing a "serializer" access a lot of the implementation. I was just trying
to allow an API that lets people write multiple keys & values from one Event.
:) We have to be a bit careful not too expose too much implementation, because
people will be tempted to do things that depend on the implementation instead
of the interfaces.
For example, HDFSSequenceFile.sync(), which calls SequenceFile.Writer.sync(),
is automatically called by the BucketWriter (via append > flush > doFlush) at
the end of each append batch, before the transaction is committed. That is the
general contract of Flume - we are durable right before the end of each
Transaction. There should never be a reason to have to call it manually. But
wait... I think that is wrong...
So, actually there is a bug in the HDFSSequenceFile code it looks like! I think
the original author thought that sync() provides durability. But it looks like
we should be calling syncFs() instead. Yikes! That is something that needs to
be fixed! It looks like sync() just writes a sync marker, which should be
unnecessary since append() automatically calls checkAndWriteSync() every time
(from looking at src/core/org/apache/hadoop/io/SequenceFile.java from Hadoop
1.0.3). In the SequenceFile.Writer, append() also calls finish() on the
compression buffer, so I don't think we need to worry about that. Maybe we
should expose the syncInterval as a config option somehow, though.
Regarding the interface, here is another proposal that provides limited scope
of the plugins while allowing multiple keyvals:
interface FlumeFormatter {
Class getKeyClass();
Class getValueClass();
public Iterable<Map.Entry<Object key, Object value>> serialize(Event e);
interface Builder {
FlumeFormatter build(Context context);
}
}
Do you think that's too awkward or complex?
Let me know what you think of my argument here. :) BTW, changing the interface
name is fine IMO if you want to make it clearer.
- Mike
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/6918/#review11085
-----------------------------------------------------------
On Sept. 5, 2012, 5:51 a.m., Chris Birchall wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/6918/
> -----------------------------------------------------------
>
> (Updated Sept. 5, 2012, 5:51 a.m.)
>
>
> Review request for Flume.
>
>
> Description
> -------
>
> This patch allows users to customise the format of HDFS SequenceFiles by
> providing a custom FlumeFormatter implementation.
>
> Currently, the user can set hdfs.writeFormat to either "Text" or "Writable",
> corresponding to HDFSTextFormatter and HDFSWritableFormatter respectively.
>
> With this patch, hdfs.writeFormat can also be set to the full class name of a
> Builder implementation, e.g.:
>
> agent_foo.sinks.hdfs-sink.writeFormat=com.mycompany.flume.MyCustomFormatter$Builder
>
> They can also pass custom configuration params to the builder, e.g.:
> agent_foo.sinks.hdfs-sink.writeFormat.ignoreHeaders=foo,bar
>
> These params will be passed to the Builder's build() method as a Context
> object.
>
> I've tried to be as consistent as possible with the design of
> EventSerializerFactory:
> * Use an enum for the different formatter types, rather than static strings.
> * Use a Builder, rather than constructing a FlumeFormatter directly.
>
>
> This addresses bug FLUME-1100.
> https://issues.apache.org/jira/browse/FLUME-1100
>
>
> Diffs
> -----
>
> flume-ng-core/src/main/java/org/apache/flume/sink/FlumeFormatter.java
> 05d60b7
>
> flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
> 9a76ecb
>
> flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterFactory.java
> 7b47942
>
> flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSFormatterType.java
> PRE-CREATION
>
> flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSTextFormatter.java
> 5839dbb
>
> flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSWritableFormatter.java
> 9f03389
>
> flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MyCustomFormatter.java
> PRE-CREATION
>
> flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSFormatterFactory.java
> PRE-CREATION
>
> Diff: https://reviews.apache.org/r/6918/diff/
>
>
> Testing
> -------
>
> Unit tests included in patch.
>
> Using a patched build of Flume in an internal project (not in production).
>
>
> Thanks,
>
> Chris Birchall
>
>