[ 
https://issues.apache.org/jira/browse/CASSANDRA-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15047094#comment-15047094
 ] 

Adam Holmberg commented on CASSANDRA-9302:
------------------------------------------

[~Stefania] my standard disclaimer when reviewing with someone new: I write 
down almost everything I note during a review. Some of it can be nit-picky -- 
feel free to take it or leave it. I also welcome dialog or feedback about what 
kind of inputs you want for future reviews

I looked at cassandra-2.2...stef1927:9302-2.2 @ 8c52ae93
---------------------------------------
*bin/cqlsh.py*
COPY_OPTIONS could be composed of common options and to/from options. To/from 
options used by name in complete_copy_options.

Since COPY_OPTIONS is always converted to a set for completion, I suggest just 
making it a set.

COPY help: move MAXATTEMPTS, REPORTFREQUENCY above TO/FROM groups?

JOBS='6'               - the number of jobs each process can work on at a time
                       ^ alignment problem
It is not clear what "jobs" means in this context, and how the user might want 
to set this

*copy.ImportReader.read_rows*
Mostly unnecessary to preallocate the rows list, since this loop is dominated 
by other things. If you wanted to simplify
and get rid of that loop you could write
{code}
rows = list(next(self.reader) for _ in xrange(self.chunksize))
self.exhausted = len(rows) < chunksize
return rows
{code}

*copy.ImportTask.process_records*
"(batches ore retries)" (typo)

*copy.ImportTask.receive:*
"start_time < 0.1):  # 10 millis" incorrect comment

*copy.ImportTask.receive:*
"except Queue.Empty": what expectation do you have for this queue to be empty 
if we're testing "not self.inmsg.empty()" in the loop?

*copy.ImportTask.receive:*
"if err.startswith('ValueError') or err.startswith('TypeError')": may want to 
add IndexError so this type of error is not retried:
{code}
cassandra@cqlsh:test> copy t2 (a,b,c,d) FROM 'f.csv';

Starting copy of test.t2 with columns ['a', 'b', 'c', 'd'].
Failed to import 7 rows: IndexError - list index out of range -  will retry 
later, attempt 1 of 5
Failed to import 7 rows: IndexError - list index out of range -  will retry 
later, attempt 2 of 5
Failed to import 7 rows: IndexError - list index out of range -  will retry 
later, attempt 3 of 5
Failed to import 7 rows: IndexError - list index out of range -  will retry 
later, attempt 4 of 5
Failed to import 7 rows: IndexError - list index out of range -  given up after 
5 attempts
Failed to process 7 batches
Processed 0 rows; Written: 0.000000 rows/s
0 rows imported in 0.096 seconds.
{code}

*copy.ImportTask.reset_batch:*
"return dict([(k, v) if k != 'imported' else (k, 0) for k, v in 
batch.iteritems()])" --> batch['imported'] = 0 (?)

*copy.ImportTask.batch_id* not used: looks like this is only written, never read
If this is removed, send_batch can accept rows directly and new_batch and 
make_batch can go away.

*copy.ImportTask.receive_meter*: seems like overkill since it's ultimately just 
used to track the same total as ImportTask.succeeded

*copy.ImportProcess.sessions:*
I see that we're starting a whitelist session per local host. This could lead 
to quite a bit of overhead for larger clusters (sessions = nodes * worker 
processes).
bq. manual replica selection rather than relying on the driver TAR - we get 
better performance because we can batch by replica, not just by primary key.
I understand grouping by replica, but I don't understand why driver TAR is not 
effective here. To my way of thinking we should be able to have a single 
session per worker (token-aware, DC-aware), group rows in a batch message by 
replica (same way it's done presently), and let the driver TAR take it from 
there.

*copy.ImportProcess.get_replica*
Since you're randomizing here, doesn't that decimate some of the split 
batchability within each batch? We might be better off randomizing a start 
index across processes, then round-robin replicas per batch.

*copy.ImportProcess.execute_statement:**
"callback=partial(self.result_callback, batch)," no need to make partials 
because the add_callbacks method accepts callback_args
to be passed through.

*copy.ImportProcess.split_batches:*
"if replica in rows_by_replica:" could eliminate conditional by using 
rows_by_replica = defaultdict(list)
"if len(remaining_rows) > 0:" --> if remaining_rows
>From the [style guide|https://www.python.org/dev/peps/pep-0008/]:
{quote}
For sequences, (strings, lists, tuples), use the fact that empty sequences are 
false.

Yes: if not seq:
     if seq:
{quote}
(If you choose to do this there are several call sites to be tweaked in this 
changeset)

*copy.ImportProcess.get_replica:*
"TODO: DC locality" leftover todo

*General:*
Did you think at all about just constructing simple string queries instead of 
prepared? Python can use a lot of cycles doing conversions like this. Right now 
we have string-->Python type-->cql serialized. If we built strings from our 
input (something Python is usually very efficient doing) it would require less 
conversions for the bulk of the data (we would still need to convert PK columns 
for getting the token). Obviously there is always the alternative "more 
workers", but it's something to consider for general efficiency.

> Optimize cqlsh COPY FROM, part 3
> --------------------------------
>
>                 Key: CASSANDRA-9302
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-9302
>             Project: Cassandra
>          Issue Type: Improvement
>          Components: Tools
>            Reporter: Jonathan Ellis
>            Assignee: Stefania
>            Priority: Critical
>             Fix For: 2.1.x
>
>
> We've had some discussion moving to Spark CSV import for bulk load in 3.x, 
> but people need a good bulk load tool now.  One option is to add a separate 
> Java bulk load tool (CASSANDRA-9048), but if we can match that performance 
> from cqlsh I would prefer to leave COPY FROM as the preferred option to which 
> we point people, rather than adding more tools that need to be supported 
> indefinitely.
> Previous work on COPY FROM optimization was done in CASSANDRA-7405 and 
> CASSANDRA-8225.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to