I think Madhusudan's proposal does not involve reading the whole contents
of the memcached cluster - it's applied to a PCollection<byte[]> of keys.
So I'd suggest to call it MemcachedIO.lookup() rather than
MemcachedIO.read(). And it will not involve the questions of splitting -
however, it *will* involve snapshot consistency (looking up the same key at
different times may yield different results, including a null result).

Concur with others - please take a look at
https://beam.apache.org/documentation/io/authoring-overview/ and
https://beam.apache.org/contribute/ptransform-style-guide/ , as well as at
the code of other IO transforms. The proposed API contradicts several best
practices described in these documents, but is easily fixable.

I recommend to also consider how you plan to extend this to support other
commands - and which commands do you expect to ever support.
Also, I'm unsure about the usefulness of MemcachedIO.lookup(). What's an
example real-world use case for such a bulk lookup operation, where you
transform a PCollection of keys into a PCollection of key/value pairs? I
suppose such a use case exists, but I'd like to know more about it, to see
whether this is the best API for it.

On Mon, Jul 10, 2017 at 9:18 AM Lukasz Cwik <lc...@google.com.invalid>
wrote:

> Splitting on slabs should allow you to split more finely grained then per
> server since each server itself maintains this information. If you take a
> look at the memcached protocol, you can see that lru_crawler supports a
> metadump command which will enumerate all the key for a set of given slabs
> or for all the slabs.
>
> For the consistency part, you can get a snapshot like effect (snapshot like
> since its per server and not across the server farm) by combining
> the "watch mutations evictions" command on one connection with the
> "lru_crawler metadump all" on another connection to the same memcached
> server. By first connecting using a watcher and then performing a dump you
> can create two logical streams of data that can be joined to get a snapshot
> per server. If the amount of data/mutations/evications is small, you can
> perform all of this within a DoFn otherwise you can just treat each as two
> different outputs which you join and perform the same logical operation to
> rebuild the snapshot on a per key basis.
>
> Interestingly, the "watch mutations" command would allow one to build a
> streaming memcache IO which shows all changes occurring underneath.
>
> memcached protocol:
> https://github.com/memcached/memcached/blob/master/doc/protocol.txt
>
> On Mon, Jul 10, 2017 at 2:41 AM, Ismaël Mejía <ieme...@gmail.com> wrote:
>
> > Hello,
> >
> > Thanks Lukasz for bring some of this subjects. I have briefly
> > discussed with the guys working on this they are the same team who did
> > HCatalogIO (Hive).
> >
> > We just analyzed the different libraries that allowed to develop this
> > integration from Java and decided that the most complete
> > implementation was spymemcached. One thing I really didn’t like of
> > their API is that there is not an abstraction for Mutation (like in
> > Bigtable/Hbase) but a corresponding method for each operation so to
> > make things easier we discussed to focus first on read/write.
> >
> > @Lukasz for the enumeration part, I am not sure I follow, we had just
> > discussed a naive approach for splitting by server given that
> > Memcached is not a cluster but a server farm ‘which means every server
> > is its own’ we thought this will be the easiest way to partition, is
> > there any technical issue that impeaches this (creating a
> > BoundedSource and just read per each server)? Or partitioning by slabs
> > will bring us a better optimization? (Notice I am far from an expert
> > on Memcached).
> >
> > For the consistency part I assumed it will be inconsistent when
> > reading, because I didn’t know how to do the snapshot but if you can
> > give us more details on how to do this, and why it is worth the effort
> > (vs the cost of the snapshot), this will be something interesting to
> > integrate.
> >
> > Thanks,
> > Ismaël
> >
> >
> > On Sun, Jul 9, 2017 at 7:39 PM, Lukasz Cwik <lc...@google.com.invalid>
> > wrote:
> > > For the source:
> > > Do you plan to support enumerating all the keys via cachedump /
> > lru_crawler
> > > metadump / ...?
> > > If there is an option which doesn't require enumerating the keys, how
> > will
> > > splitting be done (no splitting / splitting on slab ids / ...)?
> > > Can the cache be read while its still being modified (will effectively
> a
> > > snapshot be made using a watcher or is it expected that the cache will
> be
> > > read only or inconsistent when reading)?
> > >
> > > Also, as a usability point, all PTransforms are meant to be applied to
> > > PCollections and not vice versa.
> > > e.g.
> > > PCollection<byte[]> keys = ...;
> > > keys.apply(MemCacheIO.withConfig());
> > >
> > > This makes it so that people can write:
> > > PCollection<...> output =
> > > input.apply(ptransform1).apply(ptransform2).apply(...);
> > > It also makes it so that a PTransform can be applied to multiple
> > > PCollections.
> > >
> > > If you haven't already, I would also suggest that you take a look at
> the
> > > Pipeline I/O guide: https://beam.apache.org/documentation/io/io-toc/
> > > Talks about various usability points and how to write a good I/O
> > connector.
> > >
> > >
> > > On Sat, Jul 8, 2017 at 9:31 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> Great job !
> > >>
> > >> I'm looking forward for the PRs review.
> > >>
> > >> Regards
> > >> JB
> > >>
> > >>
> > >> On 07/08/2017 09:50 AM, Madhusudan Borkar wrote:
> > >>
> > >>> Hi,
> > >>> We are proposing to build connectors for memcache first and then use
> it
> > >>> for
> > >>> Couchbase. The connector for memcache will be build as a IOTransform
> > and
> > >>> then it can be used for other memcache implementations including
> > >>> Couchbase.
> > >>>
> > >>> 1. As Source
> > >>>
> > >>>     input will be a key(String / byte[]), output will be a KV<key,
> > value>
> > >>>
> > >>>     where key - String / byte[]
> > >>>
> > >>>     value - String / byte[]
> > >>>
> > >>>     Spymemcached supports a multi-get operation where it takes a
> bunch
> > of
> > >>> keys and retrieves the associated values, the input PCollection<key>
> > can
> > >>> be
> > >>> bundled into multiple batches and each batch can be submitted via the
> > >>> multi-get operation.
> > >>>
> > >>> PCollection<KV<byte[], byte[]>> values =
> > >>>
> > >>>     MemCacheIO
> > >>>
> > >>>     .withConfig()
> > >>>
> > >>>     .read()
> > >>>
> > >>>     .withKey(PCollection<byte[]>);
> > >>>
> > >>>
> > >>> 2. As Sink
> > >>>
> > >>>     input will be a KV<key, value>, output will be none or probably a
> > >>> boolean indicating the outcome of the operation
> > >>>
> > >>>
> > >>>
> > >>>
> > >>>
> > >>> //write
> > >>>
> > >>>     MemCacheIO
> > >>>
> > >>>     .withConfig()
> > >>>
> > >>>     .write()
> > >>>
> > >>>     .withEntries(PCollection<KV<byte[],byte[]>>);
> > >>>
> > >>>
> > >>> Implementation plan
> > >>>
> > >>> 1. Develop Memcache connector with 'set' and 'add' operation
> > >>>
> > >>> 2. Then develop other operations
> > >>>
> > >>> 3. Use Memcache connector for Couchbase
> > >>>
> > >>>
> > >>> Thanks @Ismael for help
> > >>>
> > >>> Please, let us know your views.
> > >>>
> > >>> Madhu Borkar
> > >>>
> > >>>
> > >> --
> > >> Jean-Baptiste Onofré
> > >> jbono...@apache.org
> > >> http://blog.nanthrax.net
> > >> Talend - http://www.talend.com
> > >>
> >
>

Reply via email to