[ https://issues.apache.org/jira/browse/CASSANDRA-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15047094#comment-15047094 ]
Adam Holmberg commented on CASSANDRA-9302: ------------------------------------------ [~Stefania] my standard disclaimer when reviewing with someone new: I write down almost everything I note during a review. Some of it can be nit-picky -- feel free to take it or leave it. I also welcome dialog or feedback about what kind of inputs you want for future reviews I looked at cassandra-2.2...stef1927:9302-2.2 @ 8c52ae93 --------------------------------------- *bin/cqlsh.py* COPY_OPTIONS could be composed of common options and to/from options. To/from options used by name in complete_copy_options. Since COPY_OPTIONS is always converted to a set for completion, I suggest just making it a set. COPY help: move MAXATTEMPTS, REPORTFREQUENCY above TO/FROM groups? JOBS='6' - the number of jobs each process can work on at a time ^ alignment problem It is not clear what "jobs" means in this context, and how the user might want to set this *copy.ImportReader.read_rows* Mostly unnecessary to preallocate the rows list, since this loop is dominated by other things. If you wanted to simplify and get rid of that loop you could write {code} rows = list(next(self.reader) for _ in xrange(self.chunksize)) self.exhausted = len(rows) < chunksize return rows {code} *copy.ImportTask.process_records* "(batches ore retries)" (typo) *copy.ImportTask.receive:* "start_time < 0.1): # 10 millis" incorrect comment *copy.ImportTask.receive:* "except Queue.Empty": what expectation do you have for this queue to be empty if we're testing "not self.inmsg.empty()" in the loop? *copy.ImportTask.receive:* "if err.startswith('ValueError') or err.startswith('TypeError')": may want to add IndexError so this type of error is not retried: {code} cassandra@cqlsh:test> copy t2 (a,b,c,d) FROM 'f.csv'; Starting copy of test.t2 with columns ['a', 'b', 'c', 'd']. Failed to import 7 rows: IndexError - list index out of range - will retry later, attempt 1 of 5 Failed to import 7 rows: IndexError - list index out of range - will retry later, attempt 2 of 5 Failed to import 7 rows: IndexError - list index out of range - will retry later, attempt 3 of 5 Failed to import 7 rows: IndexError - list index out of range - will retry later, attempt 4 of 5 Failed to import 7 rows: IndexError - list index out of range - given up after 5 attempts Failed to process 7 batches Processed 0 rows; Written: 0.000000 rows/s 0 rows imported in 0.096 seconds. {code} *copy.ImportTask.reset_batch:* "return dict([(k, v) if k != 'imported' else (k, 0) for k, v in batch.iteritems()])" --> batch['imported'] = 0 (?) *copy.ImportTask.batch_id* not used: looks like this is only written, never read If this is removed, send_batch can accept rows directly and new_batch and make_batch can go away. *copy.ImportTask.receive_meter*: seems like overkill since it's ultimately just used to track the same total as ImportTask.succeeded *copy.ImportProcess.sessions:* I see that we're starting a whitelist session per local host. This could lead to quite a bit of overhead for larger clusters (sessions = nodes * worker processes). bq. manual replica selection rather than relying on the driver TAR - we get better performance because we can batch by replica, not just by primary key. I understand grouping by replica, but I don't understand why driver TAR is not effective here. To my way of thinking we should be able to have a single session per worker (token-aware, DC-aware), group rows in a batch message by replica (same way it's done presently), and let the driver TAR take it from there. *copy.ImportProcess.get_replica* Since you're randomizing here, doesn't that decimate some of the split batchability within each batch? We might be better off randomizing a start index across processes, then round-robin replicas per batch. *copy.ImportProcess.execute_statement:** "callback=partial(self.result_callback, batch)," no need to make partials because the add_callbacks method accepts callback_args to be passed through. *copy.ImportProcess.split_batches:* "if replica in rows_by_replica:" could eliminate conditional by using rows_by_replica = defaultdict(list) "if len(remaining_rows) > 0:" --> if remaining_rows >From the [style guide|https://www.python.org/dev/peps/pep-0008/]: {quote} For sequences, (strings, lists, tuples), use the fact that empty sequences are false. Yes: if not seq: if seq: {quote} (If you choose to do this there are several call sites to be tweaked in this changeset) *copy.ImportProcess.get_replica:* "TODO: DC locality" leftover todo *General:* Did you think at all about just constructing simple string queries instead of prepared? Python can use a lot of cycles doing conversions like this. Right now we have string-->Python type-->cql serialized. If we built strings from our input (something Python is usually very efficient doing) it would require less conversions for the bulk of the data (we would still need to convert PK columns for getting the token). Obviously there is always the alternative "more workers", but it's something to consider for general efficiency. > Optimize cqlsh COPY FROM, part 3 > -------------------------------- > > Key: CASSANDRA-9302 > URL: https://issues.apache.org/jira/browse/CASSANDRA-9302 > Project: Cassandra > Issue Type: Improvement > Components: Tools > Reporter: Jonathan Ellis > Assignee: Stefania > Priority: Critical > Fix For: 2.1.x > > > We've had some discussion moving to Spark CSV import for bulk load in 3.x, > but people need a good bulk load tool now. One option is to add a separate > Java bulk load tool (CASSANDRA-9048), but if we can match that performance > from cqlsh I would prefer to leave COPY FROM as the preferred option to which > we point people, rather than adding more tools that need to be supported > indefinitely. > Previous work on COPY FROM optimization was done in CASSANDRA-7405 and > CASSANDRA-8225. -- This message was sent by Atlassian JIRA (v6.3.4#6332)