Hi, I'm Russell and Software Engineer at DataStax and I work on the Spark Cassandra Connector. I am excited about Apex as a great streaming solution so I took a at the integration with C* and I had a few comments
https://github.com/apache/incubator-apex -malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java This behavior is a bit scary for me, building large batches like this (especially non partition specific) can lead to some stability problems over time. Hint build up can be a concern since those are stored in C* as well Pre C* 3.0. Originally the spark C* connector used batches of 64kb but this caused a large amount of problems on clusters with a HI RF or poorly provisioned setups. Some method to lock the total batch size down may be useful. The other issue is that the "Atomicity" of the batch is a point of serious fights within the C* community. One of the biggest issues being that do to the nature of repair and entropy in the system the Atomicity of a batch cannot be guaranteed in a traditional database sort of way. The guarantee breaks completely in Multi-DC environments for example. All this said, it is probably sufficient from a Data Loss perspective if the CL is high enough and the batches are small enough. https://issues.apache.org/jira/browse/CASSANDRA-10701 There are some other "Caveats" to batches that you should also be aware of. For example a batch containing INSERT ( 1, 2, 1) and INSERT (1 ,1, 2) will treat these inserts as having occurred at the same timestamp (unless they are manually adjusted). Which will end up with a row (1 , 2, 2) based on the greatest value of colliding rows. I also don't see a way to adjust Consistency Level here? https://github.com/apache/incubator-apex -malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java The metadata for any give table can be retrieved without running a query via the driver's Cluster's metadata object. May be better for future proofing? This class may also want to allow the pushdown of projections to C* to limit columns retrieved or if ambitious, pushdown of clustering column predicates. https://github.com/apache/incubator-apex -malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java Same metadata comment as with Input and CL comment https://github.com/apache/incubator-apex -malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java https://github.com/apache/incubator-apex -malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraTransactionalStore.java I still haven't gotten to the Transaction Store but hopefully I can take a good read later. Thanks for your time, Russ -- http://datastax.com/all/images/cs_logo_color_sm.png
