I think Madhusudan's proposal does not involve reading the whole contents of the memcached cluster - it's applied to a PCollection<byte[]> of keys. So I'd suggest to call it MemcachedIO.lookup() rather than MemcachedIO.read(). And it will not involve the questions of splitting - however, it *will* involve snapshot consistency (looking up the same key at different times may yield different results, including a null result).
Concur with others - please take a look at https://beam.apache.org/documentation/io/authoring-overview/ and https://beam.apache.org/contribute/ptransform-style-guide/ , as well as at the code of other IO transforms. The proposed API contradicts several best practices described in these documents, but is easily fixable. I recommend to also consider how you plan to extend this to support other commands - and which commands do you expect to ever support. Also, I'm unsure about the usefulness of MemcachedIO.lookup(). What's an example real-world use case for such a bulk lookup operation, where you transform a PCollection of keys into a PCollection of key/value pairs? I suppose such a use case exists, but I'd like to know more about it, to see whether this is the best API for it. On Mon, Jul 10, 2017 at 9:18 AM Lukasz Cwik <lc...@google.com.invalid> wrote: > Splitting on slabs should allow you to split more finely grained then per > server since each server itself maintains this information. If you take a > look at the memcached protocol, you can see that lru_crawler supports a > metadump command which will enumerate all the key for a set of given slabs > or for all the slabs. > > For the consistency part, you can get a snapshot like effect (snapshot like > since its per server and not across the server farm) by combining > the "watch mutations evictions" command on one connection with the > "lru_crawler metadump all" on another connection to the same memcached > server. By first connecting using a watcher and then performing a dump you > can create two logical streams of data that can be joined to get a snapshot > per server. If the amount of data/mutations/evications is small, you can > perform all of this within a DoFn otherwise you can just treat each as two > different outputs which you join and perform the same logical operation to > rebuild the snapshot on a per key basis. > > Interestingly, the "watch mutations" command would allow one to build a > streaming memcache IO which shows all changes occurring underneath. > > memcached protocol: > https://github.com/memcached/memcached/blob/master/doc/protocol.txt > > On Mon, Jul 10, 2017 at 2:41 AM, Ismaël Mejía <ieme...@gmail.com> wrote: > > > Hello, > > > > Thanks Lukasz for bring some of this subjects. I have briefly > > discussed with the guys working on this they are the same team who did > > HCatalogIO (Hive). > > > > We just analyzed the different libraries that allowed to develop this > > integration from Java and decided that the most complete > > implementation was spymemcached. One thing I really didn’t like of > > their API is that there is not an abstraction for Mutation (like in > > Bigtable/Hbase) but a corresponding method for each operation so to > > make things easier we discussed to focus first on read/write. > > > > @Lukasz for the enumeration part, I am not sure I follow, we had just > > discussed a naive approach for splitting by server given that > > Memcached is not a cluster but a server farm ‘which means every server > > is its own’ we thought this will be the easiest way to partition, is > > there any technical issue that impeaches this (creating a > > BoundedSource and just read per each server)? Or partitioning by slabs > > will bring us a better optimization? (Notice I am far from an expert > > on Memcached). > > > > For the consistency part I assumed it will be inconsistent when > > reading, because I didn’t know how to do the snapshot but if you can > > give us more details on how to do this, and why it is worth the effort > > (vs the cost of the snapshot), this will be something interesting to > > integrate. > > > > Thanks, > > Ismaël > > > > > > On Sun, Jul 9, 2017 at 7:39 PM, Lukasz Cwik <lc...@google.com.invalid> > > wrote: > > > For the source: > > > Do you plan to support enumerating all the keys via cachedump / > > lru_crawler > > > metadump / ...? > > > If there is an option which doesn't require enumerating the keys, how > > will > > > splitting be done (no splitting / splitting on slab ids / ...)? > > > Can the cache be read while its still being modified (will effectively > a > > > snapshot be made using a watcher or is it expected that the cache will > be > > > read only or inconsistent when reading)? > > > > > > Also, as a usability point, all PTransforms are meant to be applied to > > > PCollections and not vice versa. > > > e.g. > > > PCollection<byte[]> keys = ...; > > > keys.apply(MemCacheIO.withConfig()); > > > > > > This makes it so that people can write: > > > PCollection<...> output = > > > input.apply(ptransform1).apply(ptransform2).apply(...); > > > It also makes it so that a PTransform can be applied to multiple > > > PCollections. > > > > > > If you haven't already, I would also suggest that you take a look at > the > > > Pipeline I/O guide: https://beam.apache.org/documentation/io/io-toc/ > > > Talks about various usability points and how to write a good I/O > > connector. > > > > > > > > > On Sat, Jul 8, 2017 at 9:31 PM, Jean-Baptiste Onofré <j...@nanthrax.net> > > > wrote: > > > > > >> Hi, > > >> > > >> Great job ! > > >> > > >> I'm looking forward for the PRs review. > > >> > > >> Regards > > >> JB > > >> > > >> > > >> On 07/08/2017 09:50 AM, Madhusudan Borkar wrote: > > >> > > >>> Hi, > > >>> We are proposing to build connectors for memcache first and then use > it > > >>> for > > >>> Couchbase. The connector for memcache will be build as a IOTransform > > and > > >>> then it can be used for other memcache implementations including > > >>> Couchbase. > > >>> > > >>> 1. As Source > > >>> > > >>> input will be a key(String / byte[]), output will be a KV<key, > > value> > > >>> > > >>> where key - String / byte[] > > >>> > > >>> value - String / byte[] > > >>> > > >>> Spymemcached supports a multi-get operation where it takes a > bunch > > of > > >>> keys and retrieves the associated values, the input PCollection<key> > > can > > >>> be > > >>> bundled into multiple batches and each batch can be submitted via the > > >>> multi-get operation. > > >>> > > >>> PCollection<KV<byte[], byte[]>> values = > > >>> > > >>> MemCacheIO > > >>> > > >>> .withConfig() > > >>> > > >>> .read() > > >>> > > >>> .withKey(PCollection<byte[]>); > > >>> > > >>> > > >>> 2. As Sink > > >>> > > >>> input will be a KV<key, value>, output will be none or probably a > > >>> boolean indicating the outcome of the operation > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> //write > > >>> > > >>> MemCacheIO > > >>> > > >>> .withConfig() > > >>> > > >>> .write() > > >>> > > >>> .withEntries(PCollection<KV<byte[],byte[]>>); > > >>> > > >>> > > >>> Implementation plan > > >>> > > >>> 1. Develop Memcache connector with 'set' and 'add' operation > > >>> > > >>> 2. Then develop other operations > > >>> > > >>> 3. Use Memcache connector for Couchbase > > >>> > > >>> > > >>> Thanks @Ismael for help > > >>> > > >>> Please, let us know your views. > > >>> > > >>> Madhu Borkar > > >>> > > >>> > > >> -- > > >> Jean-Baptiste Onofré > > >> jbono...@apache.org > > >> http://blog.nanthrax.net > > >> Talend - http://www.talend.com > > >> > > >