So it is also terribly slow.

Does not work with materialized views, quick hack about that below and UDT,
this requires more time to fix.

So I used it to retrieve the only built-in type column, the key. To make
the task more time-consuming I exteneded the dataset a bit, to ~2.5M
records.

All of my multithreaded dumper, my spark job using connector and
cassandra-unload retrieve 2.5M records in ~1m20s. Increased concurrency
halves this time. But it works only with this trivial case fetching key
only, no actual payload.
Workload of fetching actual user data for this 2.5M is ~ 2m20s and it does
not go down with number of threads or spark cores.

I feel stupid. More than two minutes to transfer ~10Gb in a highly
distributed system.


index c570574..e7af28b 100644
--- a/src/main/java/com/datastax/loader/CqlDelimParser.java
+++ b/src/main/java/com/datastax/loader/CqlDelimParser.java
@@ -20,6 +20,7 @@ import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.AbstractTableMetadata;
 import com.datastax.driver.core.KeyspaceMetadata;
 import com.datastax.driver.core.exceptions.InvalidTypeException;
 import com.datastax.loader.parser.BigDecimalParser;
@@ -205,9 +206,10 @@ public class CqlDelimParser {
             System.err.println("Keyspace " + keyspace + " not found.");
             System.exit(-1);
         }
-        TableMetadata tm = km.getTable(tablename);
+        AbstractTableMetadata tm = km.getTable(tablename);
+        if ( tm == null ) tm = km.getMaterializedView(tablename);
         if (null == tm) {
-            System.err.println("Table " + tablename + " not found.");
+            System.err.println("Table/view " + tablename + " not found.");
             System.exit(-1);
         }
         List<String> inList = new ArrayList<String>();
diff --git a/src/main/java/com/datastax/loader/CqlDelimUnload.java
b/src/main/java/com/datastax/loader/CqlDelimUnload.java
index 472e33b..f084ce8 100644
--- a/src/main/java/com/datastax/loader/CqlDelimUnload.java
+++ b/src/main/java/com/datastax/loader/CqlDelimUnload.java
@@ -56,6 +56,7 @@ import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManagerFactory;

+import com.datastax.driver.core.AbstractTableMetadata;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.ColumnMetadata;
@@ -547,9 +548,12 @@ public class CqlDelimUnload {
                 table = table.replaceAll("\"", "");
             else
                 table = table.toLowerCase();
-
-            List<ColumnMetadata> lcm = session.getCluster().getMetadata()
-                .getKeyspace(keyspace).getTable(table).getPartitionKey();
+
+            AbstractTableMetadata tm = session.getCluster().getMetadata()
+                .getKeyspace(keyspace).getTable(table);
+            if ( tm == null )
+              tm = session.getCluster().getMetada
ta().getKeyspace(keyspace).getMaterializedView(table);
+            List<ColumnMetadata> lcm = tm.getPartitionKey();
             String partitionKey = lcm.get(0).getName();
             for (int i = 1; i < lcm.size(); i++) {
                 partitionKey = partitionKey + "," + lcm.get(i).getName();


On 17 August 2017 at 21:11, Jeff Jirsa <jji...@gmail.com> wrote:

> Brian Hess has perhaps the best open source code example of the right way
> to do this:
>
> https://github.com/brianmhess/cassandra-loader/blob/master/
> src/main/java/com/datastax/loader/CqlDelimUnload.java
>
>
>
> On Thu, Aug 17, 2017 at 10:00 AM, Alex Kotelnikov <
> alex.kotelni...@diginetica.com> wrote:
>
>> yup, user_id is the primary key.
>>
>> First of all,can you share, how to "go to a node directly"?.
>>
>> Also such approach will retrieve all the data RF times, coordinator
>> should have enough metadata to avoid that.
>>
>> Should not requesting multiple coordinators provide certain concurrency?
>>
>> On 17 August 2017 at 19:54, Dor Laor <d...@scylladb.com> wrote:
>>
>>> On Thu, Aug 17, 2017 at 9:36 AM, Alex Kotelnikov <
>>> alex.kotelni...@diginetica.com> wrote:
>>>
>>>> Dor,
>>>>
>>>> I believe, I tried it in many ways and the result is quite
>>>> disappointing.
>>>> I've run my scans on 3 different clusters, one of which was using on
>>>> VMs and I was able to scale it up and down (3-5-7 VMs, 8 to 24 cores) to
>>>> see, how this affects the performance.
>>>>
>>>> I also generated the flow from spark cluster ranging from 4 to 40
>>>> parallel tasks as well as just multi-threaded client.
>>>>
>>>> The surprise is that trivial fetch of all records using token ranges
>>>> takes pretty much the same time in all setups.
>>>>
>>>> The only beneficial thing I've learned is that it is much more
>>>> efficient to create a MATERIALIZED VIEW than to filter (even using
>>>> secondary index).
>>>>
>>>> Say, I have a typical dataset, around 3Gb of data, 1M records. And I
>>>> have a trivial scan practice:
>>>>
>>>> String.format("SELECT token(user_id), user_id, events FROM user_events
>>>> WHERE token(user_id) >= %d ", start) + (end != null ? String.format(" AND
>>>> token(user_id) < %d ", end) : "")
>>>>
>>>
>>> Is user_id the primary key? Looks like this query will just go to the
>>> cluster and access a random coordinator each time.
>>> C* doesn't save the subsequent token on the same node. It's hashed.
>>> The idea of parallel cluster scan is to go directly to all nodes in
>>> parallel and query them for the hashed keys they own.
>>>
>>>
>>>> I split all tokens into start-end ranges (except for last range, which
>>>> only has start) and query ranges in multiple threads, up to 40.
>>>>
>>>> Whole process takes ~40s on 3 VMs cluster  2+2+4 cores, 16Gb RAM each 1
>>>> virtual disk. And it takes ~30s on real hardware clusters
>>>> 8servers*8cores*32Gb. Level of the concurrency does not matter pretty much
>>>> at all. Util it is too high or too low.
>>>> Size of tokens range matters, but here I see the rule "make it larger,
>>>> but avoid cassandra timeouts".
>>>> I also tried spark connector to validate that my test multithreaded app
>>>> is not the bottleneck. It is not.
>>>>
>>>> I expected some kind of elasticity, I see none. Feels like I do
>>>> something wrong...
>>>>
>>>>
>>>>
>>>> On 17 August 2017 at 00:19, Dor Laor <d...@scylladb.com> wrote:
>>>>
>>>>> Hi Alex,
>>>>>
>>>>> You probably didn't get the paralelism right. Serial scan has
>>>>> a paralelism of one. If the paralelism isn't large enough, perf will
>>>>> be slow.
>>>>> If paralelism is too large, Cassandra and the disk will trash and have
>>>>> too
>>>>> many context switches.
>>>>>
>>>>> So you need to find your cluster's sweet spot. We documented the
>>>>> procedure
>>>>> to do it in this blog: http://www.scylladb.com/
>>>>> 2017/02/13/efficient-full-table-scans-with-scylla-1-6/
>>>>> and the results are here: http://www.scylladb.com/
>>>>> 2017/03/28/parallel-efficient-full-table-scan-scylla/
>>>>> The algorithm should translate to Cassandra but you'll have to use
>>>>> different rules of the thumb.
>>>>>
>>>>> Best,
>>>>> Dor
>>>>>
>>>>>
>>>>> On Wed, Aug 16, 2017 at 9:50 AM, Alex Kotelnikov <
>>>>> alex.kotelni...@diginetica.com> wrote:
>>>>>
>>>>>> Hey,
>>>>>>
>>>>>> we are trying Cassandra as an alternative for storage huge stream of
>>>>>> data coming from our customers.
>>>>>>
>>>>>> Storing works quite fine, and I started to validate how retrieval
>>>>>> does. We have two types of that: fetching specific records and bulk
>>>>>> retrieval for general analysis.
>>>>>> Fetching single record works like charm. But it is not so with bulk
>>>>>> fetch.
>>>>>>
>>>>>> With a moderately small table of ~2 million records, ~10Gb raw data I
>>>>>> observed very slow operation (using token(partition key) ranges). It 
>>>>>> takes
>>>>>> minutes to perform full retrieval. We tried a couple of configurations
>>>>>> using virtual machines, real hardware and overall looks like it is not
>>>>>> possible to all table data in a reasonable time (by reasonable I mean 
>>>>>> that
>>>>>> since we have 1Gbit network 10Gb can be transferred in a couple of 
>>>>>> minutes
>>>>>> from one server to another and when we have 10+ cassandra servers and 10+
>>>>>> spark executors total time should be even smaller).
>>>>>>
>>>>>> I tried datastax spark connector. Also I wrote a simple test case
>>>>>> using datastax java driver and see how fetch of 10k records takes ~10s 
>>>>>> so I
>>>>>> assume that "sequential" scan will take 200x more time, equals ~30 
>>>>>> minutes.
>>>>>>
>>>>>> May be we are totally wrong trying to use Cassandra this way?
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Best Regards,
>>>>>>
>>>>>>
>>>>>> *Alexander Kotelnikov*
>>>>>>
>>>>>> *Team Lead*
>>>>>>
>>>>>> DIGINETICA
>>>>>> Retail Technology Company
>>>>>>
>>>>>> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>>>>>>
>>>>>> *www.diginetica.com <http://www.diginetica.com/>*
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Best Regards,
>>>>
>>>>
>>>> *Alexander Kotelnikov*
>>>>
>>>> *Team Lead*
>>>>
>>>> DIGINETICA
>>>> Retail Technology Company
>>>>
>>>> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>>>>
>>>> *www.diginetica.com <http://www.diginetica.com/>*
>>>>
>>>
>>>
>>
>>
>> --
>>
>> Best Regards,
>>
>>
>> *Alexander Kotelnikov*
>>
>> *Team Lead*
>>
>> DIGINETICA
>> Retail Technology Company
>>
>> m: +7.921.915.06.28 <+7%20921%20915-06-28>
>>
>> *www.diginetica.com <http://www.diginetica.com/>*
>>
>
>


-- 

Best Regards,


*Alexander Kotelnikov*

*Team Lead*

DIGINETICA
Retail Technology Company

m: +7.921.915.06.28

*www.diginetica.com <http://www.diginetica.com/>*

Reply via email to