[
https://issues.apache.org/jira/browse/CASSANDRA-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15048561#comment-15048561
]
Stefania commented on CASSANDRA-9302:
-------------------------------------
Thank you for your review! I applied your suggestions to the 2.1 branch since I
believe we still need this in 2.1. However pleae continue reviewing the 2.2
branch so we detect any merge errors as well.
bq. COPY_OPTIONS could be composed of common options and to/from options.
To/from options used by name in complete_copy_options.
OK
bq. Since COPY_OPTIONS is always converted to a set for completion, I suggest
just making it a set.
OK
bq. COPY help: move MAXATTEMPTS, REPORTFREQUENCY above TO/FROM groups?
I already introduced groups in CASSANDRA-9303 which should be committed shortly
after this ticket. I prefer not to add
groups here to avoid rebase problems.
{quote}
JOBS='6' - the number of jobs each process can work on at a time
^ alignment problem
It is not clear what "jobs" means in this context, and how the user might want
to set this
{quote}
I changed it to MAXREQUESTS, I hope it's clearer.
bq. copy.ImportReader.read_rows: Mostly unnecessary to preallocate the rows
list, since this loop is dominated by other things.
OK
bq. copy.ImportTask.process_records: "(batches ore retries)" (typo)
OK
bq. copy.ImportTask.receive: "start_time < 0.1): # 10 millis" incorrect comment
OK
bq. copy.ImportTask.receive: "except Queue.Empty": what expectation do you have
for this queue to be empty if we're testing "not self.inmsg.empty()" in the
loop?
Queue.empty() is not very reliable so we must check for exceptions (tests
started failing when I remove the exception check). To avoi confusion, I
removed empty() from the loop condition and added a break on exception instead.
bq. copy.ImportTask.receive: "if err.startswith('ValueError') or
err.startswith('TypeError')": may want to add IndexError so this type of error
is not retried:
Added
bq. copy.ImportTask.reset_batch: "return dict(\[(k, v) if k != 'imported' else
(k, 0) for k, v in batch.iteritems()\]) --> batch\['imported'\] = 0
OK
bq. copy.ImportTask.batch_id not used: looks like this is only written, never
read. If this is removed, send_batch can accept rows directly and new_batch and
make_batch can go away.
batch\['id'\] is used by the worker processes for failure injection. We need to
set it in the ImportTask because the ids must be unique across processes.
bq. copy.ImportTask.receive_meter: seems like overkill since it's ultimately
just used to track the same total as ImportTask.succeeded
It's required to report statistics of imported rows to the user not so much for
tracking the total, see {{RateMeter.log_message()}}. Logging rows as they are
sent with send_meter would be misleading IMO.
bq. copy.ImportProcess.sessions: I see that we're starting a whitelist session
per local host. This could lead to quite a bit of overhead for larger clusters
(sessions = nodes * worker processes). I understand grouping by replica, but I
don't understand why driver TAR is not effective here. To my way of thinking we
should be able to have a single session per worker (token-aware, DC-aware),
group rows in a batch message by replica (same way it's done presently), and
let the driver TAR take it from there.
I had a single session initially and I tried again today. Perhaps I'm doing
something wrong but a single session created with the code below returns lots
of timeouts to the point that we cannot even complete importing 1M rows in over
a minute, whereas by contacting hosts directly we can complete 1M records in
about 27-28 seconds on my laptop.
{code}
cluster = Cluster(
contact_points=(self.hostname,),
port=self.port,
cql_version=self.cql_version,
protocol_version=self.protocol_version,
auth_provider=self.auth_provider,
load_balancing_policy=TokenAwarePolicy(WhiteListRoundRobinPolicy([self.hostname])),
ssl_options=ssl_settings(self.hostname, self.config_file) if
self.ssl else None,
default_retry_policy=ExpBackoffRetryPolicy(self),
compression=None,
connect_timeout=self.connect_timeout)
{code}
I tested on 2.1 which uses the driver version 2.7.2. Do you have any
suggestions?
bq. copy.ImportProcess.get_replica Since you're randomizing here, doesn't that
decimate some of the split batchability within each batch? We might be better
off randomizing a start index across processes, then round-robin replicas per
batch.
OK - randomising was not the best idea here so I've introduced a randomized
index per worker process as you suggested.
bq. copy.ImportProcess.execute_statement:
"callback=partial(self.result_callback, batch)," no need to make partials
because the add_callbacks method accepts callback_args to be passed through.
I could not get it to work (the callbacks where no longer called, perhaps the
self parameter?). I did not spend too much time investigating however.
bq. copy.ImportProcess.split_batches: "if replica in rows_by_replica:" could
eliminate conditional by using rows_by_replica = defaultdict(list)
OK
bq. "if len(remaining_rows) > 0:" --> if remaining_rows (If you choose to do
this there are several call sites to be tweaked in this changeset)
OK - I hope I caught them all
bq. copy.ImportProcess.get_replica: "TODO: DC locality" leftover todo
OK
bq. Did you think at all about just constructing simple string queries instead
of prepared? Python can use a lot of cycles doing conversions like this. Right
now we have string->Python type->cql serialized. If we built strings from our
input (something Python is usually very efficient doing) it would require less
conversions for the bulk of the data (we would still need to convert PK columns
for getting the token). Obviously there is always the alternative "more
workers", but it's something to consider for general efficiency.
It was one of the suggestions of this ticket to introduce prepared statements
but just to be sure I compared the performance of strings vs prepared
statements again today and I encountered lots of timeouts making the import
much slower when using strings.
bq. Not related to this change, but I noticed copy.ExportProcess.get_session:
executor_threads=max(2, self.csv_options\['jobs'\] / 2)). I can't think of a
reason to configure executor threads here. That setting only applies to a
thread pool for processing async events inside the client (should not have
bearing on request execution).
Removed, thank you.
> Optimize cqlsh COPY FROM, part 3
> --------------------------------
>
> Key: CASSANDRA-9302
> URL: https://issues.apache.org/jira/browse/CASSANDRA-9302
> Project: Cassandra
> Issue Type: Improvement
> Components: Tools
> Reporter: Jonathan Ellis
> Assignee: Stefania
> Priority: Critical
> Fix For: 2.1.x
>
>
> We've had some discussion moving to Spark CSV import for bulk load in 3.x,
> but people need a good bulk load tool now. One option is to add a separate
> Java bulk load tool (CASSANDRA-9048), but if we can match that performance
> from cqlsh I would prefer to leave COPY FROM as the preferred option to which
> we point people, rather than adding more tools that need to be supported
> indefinitely.
> Previous work on COPY FROM optimization was done in CASSANDRA-7405 and
> CASSANDRA-8225.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)