So it is also terribly slow. Does not work with materialized views, quick hack about that below and UDT, this requires more time to fix.
So I used it to retrieve the only built-in type column, the key. To make the task more time-consuming I exteneded the dataset a bit, to ~2.5M records. All of my multithreaded dumper, my spark job using connector and cassandra-unload retrieve 2.5M records in ~1m20s. Increased concurrency halves this time. But it works only with this trivial case fetching key only, no actual payload. Workload of fetching actual user data for this 2.5M is ~ 2m20s and it does not go down with number of threads or spark cores. I feel stupid. More than two minutes to transfer ~10Gb in a highly distributed system. index c570574..e7af28b 100644 --- a/src/main/java/com/datastax/loader/CqlDelimParser.java +++ b/src/main/java/com/datastax/loader/CqlDelimParser.java @@ -20,6 +20,7 @@ import com.datastax.driver.core.DataType; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.AbstractTableMetadata; import com.datastax.driver.core.KeyspaceMetadata; import com.datastax.driver.core.exceptions.InvalidTypeException; import com.datastax.loader.parser.BigDecimalParser; @@ -205,9 +206,10 @@ public class CqlDelimParser { System.err.println("Keyspace " + keyspace + " not found."); System.exit(-1); } - TableMetadata tm = km.getTable(tablename); + AbstractTableMetadata tm = km.getTable(tablename); + if ( tm == null ) tm = km.getMaterializedView(tablename); if (null == tm) { - System.err.println("Table " + tablename + " not found."); + System.err.println("Table/view " + tablename + " not found."); System.exit(-1); } List<String> inList = new ArrayList<String>(); diff --git a/src/main/java/com/datastax/loader/CqlDelimUnload.java b/src/main/java/com/datastax/loader/CqlDelimUnload.java index 472e33b..f084ce8 100644 --- a/src/main/java/com/datastax/loader/CqlDelimUnload.java +++ b/src/main/java/com/datastax/loader/CqlDelimUnload.java @@ -56,6 +56,7 @@ import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; +import com.datastax.driver.core.AbstractTableMetadata; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Session; import com.datastax.driver.core.ColumnMetadata; @@ -547,9 +548,12 @@ public class CqlDelimUnload { table = table.replaceAll("\"", ""); else table = table.toLowerCase(); - - List<ColumnMetadata> lcm = session.getCluster().getMetadata() - .getKeyspace(keyspace).getTable(table).getPartitionKey(); + + AbstractTableMetadata tm = session.getCluster().getMetadata() + .getKeyspace(keyspace).getTable(table); + if ( tm == null ) + tm = session.getCluster().getMetada ta().getKeyspace(keyspace).getMaterializedView(table); + List<ColumnMetadata> lcm = tm.getPartitionKey(); String partitionKey = lcm.get(0).getName(); for (int i = 1; i < lcm.size(); i++) { partitionKey = partitionKey + "," + lcm.get(i).getName(); On 17 August 2017 at 21:11, Jeff Jirsa <jji...@gmail.com> wrote: > Brian Hess has perhaps the best open source code example of the right way > to do this: > > https://github.com/brianmhess/cassandra-loader/blob/master/ > src/main/java/com/datastax/loader/CqlDelimUnload.java > > > > On Thu, Aug 17, 2017 at 10:00 AM, Alex Kotelnikov < > alex.kotelni...@diginetica.com> wrote: > >> yup, user_id is the primary key. >> >> First of all,can you share, how to "go to a node directly"?. >> >> Also such approach will retrieve all the data RF times, coordinator >> should have enough metadata to avoid that. >> >> Should not requesting multiple coordinators provide certain concurrency? >> >> On 17 August 2017 at 19:54, Dor Laor <d...@scylladb.com> wrote: >> >>> On Thu, Aug 17, 2017 at 9:36 AM, Alex Kotelnikov < >>> alex.kotelni...@diginetica.com> wrote: >>> >>>> Dor, >>>> >>>> I believe, I tried it in many ways and the result is quite >>>> disappointing. >>>> I've run my scans on 3 different clusters, one of which was using on >>>> VMs and I was able to scale it up and down (3-5-7 VMs, 8 to 24 cores) to >>>> see, how this affects the performance. >>>> >>>> I also generated the flow from spark cluster ranging from 4 to 40 >>>> parallel tasks as well as just multi-threaded client. >>>> >>>> The surprise is that trivial fetch of all records using token ranges >>>> takes pretty much the same time in all setups. >>>> >>>> The only beneficial thing I've learned is that it is much more >>>> efficient to create a MATERIALIZED VIEW than to filter (even using >>>> secondary index). >>>> >>>> Say, I have a typical dataset, around 3Gb of data, 1M records. And I >>>> have a trivial scan practice: >>>> >>>> String.format("SELECT token(user_id), user_id, events FROM user_events >>>> WHERE token(user_id) >= %d ", start) + (end != null ? String.format(" AND >>>> token(user_id) < %d ", end) : "") >>>> >>> >>> Is user_id the primary key? Looks like this query will just go to the >>> cluster and access a random coordinator each time. >>> C* doesn't save the subsequent token on the same node. It's hashed. >>> The idea of parallel cluster scan is to go directly to all nodes in >>> parallel and query them for the hashed keys they own. >>> >>> >>>> I split all tokens into start-end ranges (except for last range, which >>>> only has start) and query ranges in multiple threads, up to 40. >>>> >>>> Whole process takes ~40s on 3 VMs cluster 2+2+4 cores, 16Gb RAM each 1 >>>> virtual disk. And it takes ~30s on real hardware clusters >>>> 8servers*8cores*32Gb. Level of the concurrency does not matter pretty much >>>> at all. Util it is too high or too low. >>>> Size of tokens range matters, but here I see the rule "make it larger, >>>> but avoid cassandra timeouts". >>>> I also tried spark connector to validate that my test multithreaded app >>>> is not the bottleneck. It is not. >>>> >>>> I expected some kind of elasticity, I see none. Feels like I do >>>> something wrong... >>>> >>>> >>>> >>>> On 17 August 2017 at 00:19, Dor Laor <d...@scylladb.com> wrote: >>>> >>>>> Hi Alex, >>>>> >>>>> You probably didn't get the paralelism right. Serial scan has >>>>> a paralelism of one. If the paralelism isn't large enough, perf will >>>>> be slow. >>>>> If paralelism is too large, Cassandra and the disk will trash and have >>>>> too >>>>> many context switches. >>>>> >>>>> So you need to find your cluster's sweet spot. We documented the >>>>> procedure >>>>> to do it in this blog: http://www.scylladb.com/ >>>>> 2017/02/13/efficient-full-table-scans-with-scylla-1-6/ >>>>> and the results are here: http://www.scylladb.com/ >>>>> 2017/03/28/parallel-efficient-full-table-scan-scylla/ >>>>> The algorithm should translate to Cassandra but you'll have to use >>>>> different rules of the thumb. >>>>> >>>>> Best, >>>>> Dor >>>>> >>>>> >>>>> On Wed, Aug 16, 2017 at 9:50 AM, Alex Kotelnikov < >>>>> alex.kotelni...@diginetica.com> wrote: >>>>> >>>>>> Hey, >>>>>> >>>>>> we are trying Cassandra as an alternative for storage huge stream of >>>>>> data coming from our customers. >>>>>> >>>>>> Storing works quite fine, and I started to validate how retrieval >>>>>> does. We have two types of that: fetching specific records and bulk >>>>>> retrieval for general analysis. >>>>>> Fetching single record works like charm. But it is not so with bulk >>>>>> fetch. >>>>>> >>>>>> With a moderately small table of ~2 million records, ~10Gb raw data I >>>>>> observed very slow operation (using token(partition key) ranges). It >>>>>> takes >>>>>> minutes to perform full retrieval. We tried a couple of configurations >>>>>> using virtual machines, real hardware and overall looks like it is not >>>>>> possible to all table data in a reasonable time (by reasonable I mean >>>>>> that >>>>>> since we have 1Gbit network 10Gb can be transferred in a couple of >>>>>> minutes >>>>>> from one server to another and when we have 10+ cassandra servers and 10+ >>>>>> spark executors total time should be even smaller). >>>>>> >>>>>> I tried datastax spark connector. Also I wrote a simple test case >>>>>> using datastax java driver and see how fetch of 10k records takes ~10s >>>>>> so I >>>>>> assume that "sequential" scan will take 200x more time, equals ~30 >>>>>> minutes. >>>>>> >>>>>> May be we are totally wrong trying to use Cassandra this way? >>>>>> >>>>>> -- >>>>>> >>>>>> Best Regards, >>>>>> >>>>>> >>>>>> *Alexander Kotelnikov* >>>>>> >>>>>> *Team Lead* >>>>>> >>>>>> DIGINETICA >>>>>> Retail Technology Company >>>>>> >>>>>> m: +7.921.915.06.28 <+7%20921%20915-06-28> >>>>>> >>>>>> *www.diginetica.com <http://www.diginetica.com/>* >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> >>>> Best Regards, >>>> >>>> >>>> *Alexander Kotelnikov* >>>> >>>> *Team Lead* >>>> >>>> DIGINETICA >>>> Retail Technology Company >>>> >>>> m: +7.921.915.06.28 <+7%20921%20915-06-28> >>>> >>>> *www.diginetica.com <http://www.diginetica.com/>* >>>> >>> >>> >> >> >> -- >> >> Best Regards, >> >> >> *Alexander Kotelnikov* >> >> *Team Lead* >> >> DIGINETICA >> Retail Technology Company >> >> m: +7.921.915.06.28 <+7%20921%20915-06-28> >> >> *www.diginetica.com <http://www.diginetica.com/>* >> > > -- Best Regards, *Alexander Kotelnikov* *Team Lead* DIGINETICA Retail Technology Company m: +7.921.915.06.28 *www.diginetica.com <http://www.diginetica.com/>*