[
https://issues.apache.org/jira/browse/CASSANDRA-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15047094#comment-15047094
]
Adam Holmberg commented on CASSANDRA-9302:
------------------------------------------
[~Stefania] my standard disclaimer when reviewing with someone new: I write
down almost everything I note during a review. Some of it can be nit-picky --
feel free to take it or leave it. I also welcome dialog or feedback about what
kind of inputs you want for future reviews
I looked at cassandra-2.2...stef1927:9302-2.2 @ 8c52ae93
---------------------------------------
*bin/cqlsh.py*
COPY_OPTIONS could be composed of common options and to/from options. To/from
options used by name in complete_copy_options.
Since COPY_OPTIONS is always converted to a set for completion, I suggest just
making it a set.
COPY help: move MAXATTEMPTS, REPORTFREQUENCY above TO/FROM groups?
JOBS='6' - the number of jobs each process can work on at a time
^ alignment problem
It is not clear what "jobs" means in this context, and how the user might want
to set this
*copy.ImportReader.read_rows*
Mostly unnecessary to preallocate the rows list, since this loop is dominated
by other things. If you wanted to simplify
and get rid of that loop you could write
{code}
rows = list(next(self.reader) for _ in xrange(self.chunksize))
self.exhausted = len(rows) < chunksize
return rows
{code}
*copy.ImportTask.process_records*
"(batches ore retries)" (typo)
*copy.ImportTask.receive:*
"start_time < 0.1): # 10 millis" incorrect comment
*copy.ImportTask.receive:*
"except Queue.Empty": what expectation do you have for this queue to be empty
if we're testing "not self.inmsg.empty()" in the loop?
*copy.ImportTask.receive:*
"if err.startswith('ValueError') or err.startswith('TypeError')": may want to
add IndexError so this type of error is not retried:
{code}
cassandra@cqlsh:test> copy t2 (a,b,c,d) FROM 'f.csv';
Starting copy of test.t2 with columns ['a', 'b', 'c', 'd'].
Failed to import 7 rows: IndexError - list index out of range - will retry
later, attempt 1 of 5
Failed to import 7 rows: IndexError - list index out of range - will retry
later, attempt 2 of 5
Failed to import 7 rows: IndexError - list index out of range - will retry
later, attempt 3 of 5
Failed to import 7 rows: IndexError - list index out of range - will retry
later, attempt 4 of 5
Failed to import 7 rows: IndexError - list index out of range - given up after
5 attempts
Failed to process 7 batches
Processed 0 rows; Written: 0.000000 rows/s
0 rows imported in 0.096 seconds.
{code}
*copy.ImportTask.reset_batch:*
"return dict([(k, v) if k != 'imported' else (k, 0) for k, v in
batch.iteritems()])" --> batch['imported'] = 0 (?)
*copy.ImportTask.batch_id* not used: looks like this is only written, never read
If this is removed, send_batch can accept rows directly and new_batch and
make_batch can go away.
*copy.ImportTask.receive_meter*: seems like overkill since it's ultimately just
used to track the same total as ImportTask.succeeded
*copy.ImportProcess.sessions:*
I see that we're starting a whitelist session per local host. This could lead
to quite a bit of overhead for larger clusters (sessions = nodes * worker
processes).
bq. manual replica selection rather than relying on the driver TAR - we get
better performance because we can batch by replica, not just by primary key.
I understand grouping by replica, but I don't understand why driver TAR is not
effective here. To my way of thinking we should be able to have a single
session per worker (token-aware, DC-aware), group rows in a batch message by
replica (same way it's done presently), and let the driver TAR take it from
there.
*copy.ImportProcess.get_replica*
Since you're randomizing here, doesn't that decimate some of the split
batchability within each batch? We might be better off randomizing a start
index across processes, then round-robin replicas per batch.
*copy.ImportProcess.execute_statement:**
"callback=partial(self.result_callback, batch)," no need to make partials
because the add_callbacks method accepts callback_args
to be passed through.
*copy.ImportProcess.split_batches:*
"if replica in rows_by_replica:" could eliminate conditional by using
rows_by_replica = defaultdict(list)
"if len(remaining_rows) > 0:" --> if remaining_rows
>From the [style guide|https://www.python.org/dev/peps/pep-0008/]:
{quote}
For sequences, (strings, lists, tuples), use the fact that empty sequences are
false.
Yes: if not seq:
if seq:
{quote}
(If you choose to do this there are several call sites to be tweaked in this
changeset)
*copy.ImportProcess.get_replica:*
"TODO: DC locality" leftover todo
*General:*
Did you think at all about just constructing simple string queries instead of
prepared? Python can use a lot of cycles doing conversions like this. Right now
we have string-->Python type-->cql serialized. If we built strings from our
input (something Python is usually very efficient doing) it would require less
conversions for the bulk of the data (we would still need to convert PK columns
for getting the token). Obviously there is always the alternative "more
workers", but it's something to consider for general efficiency.
> Optimize cqlsh COPY FROM, part 3
> --------------------------------
>
> Key: CASSANDRA-9302
> URL: https://issues.apache.org/jira/browse/CASSANDRA-9302
> Project: Cassandra
> Issue Type: Improvement
> Components: Tools
> Reporter: Jonathan Ellis
> Assignee: Stefania
> Priority: Critical
> Fix For: 2.1.x
>
>
> We've had some discussion moving to Spark CSV import for bulk load in 3.x,
> but people need a good bulk load tool now. One option is to add a separate
> Java bulk load tool (CASSANDRA-9048), but if we can match that performance
> from cqlsh I would prefer to leave COPY FROM as the preferred option to which
> we point people, rather than adding more tools that need to be supported
> indefinitely.
> Previous work on COPY FROM optimization was done in CASSANDRA-7405 and
> CASSANDRA-8225.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)