Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 c7f9c8d21 -> d01250d63 refs/heads/trunk bf52074f6 -> b31787435
(cqlsh): Show progress of COPY operations. patch by Mikhail Stepura; reviewed by Tyler Hobbs for CASSANDRA-7789 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d01250d6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d01250d6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d01250d6 Branch: refs/heads/cassandra-2.1 Commit: d01250d63acf08c354bc400c957572bbd68f7ea6 Parents: c7f9c8d Author: Mikhail Stepura <mish...@apache.org> Authored: Wed Aug 20 11:55:50 2014 -0700 Committer: Mikhail Stepura <mish...@apache.org> Committed: Wed Aug 27 15:43:42 2014 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + bin/cqlsh | 27 +++++++++-------- pylib/cqlshlib/async_insert.py | 13 ++++---- pylib/cqlshlib/meter.py | 59 +++++++++++++++++++++++++++++++++++++ 4 files changed, 79 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d01250d6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0f44c91..4fb773d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.1 + * (cqlsh): Show progress of COPY operations (CASSANDRA-7789) * Add syntax to remove multiple elements from a map (CASSANDRA-6599) * Support non-equals conditions in lightweight transactions (CASSANDRA-6839) * Add IF [NOT] EXISTS to create/drop triggers (CASSANDRA-7606) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d01250d6/bin/cqlsh ---------------------------------------------------------------------- diff --git a/bin/cqlsh b/bin/cqlsh index c055771..dfce885 100755 --- a/bin/cqlsh +++ b/bin/cqlsh @@ -64,8 +64,6 @@ except ImportError: pass CQL_LIB_PREFIX = 'cassandra-driver-internal-only-' -FUTURES_LIB_PREFIX = 'futures-' -SIX_LIB_PREFIX = 'six-' CASSANDRA_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..') @@ -89,12 +87,13 @@ cql_zip = find_zip(CQL_LIB_PREFIX) if cql_zip: ver = os.path.splitext(os.path.basename(cql_zip))[0][len(CQL_LIB_PREFIX):] sys.path.insert(0, os.path.join(cql_zip, 'cassandra-driver-' + ver)) -futures_zip = find_zip(FUTURES_LIB_PREFIX) -if futures_zip: - sys.path.insert(0, futures_zip) -six_zip = find_zip(SIX_LIB_PREFIX) -if six_zip: - sys.path.insert(0, six_zip) + +third_parties = ('futures-', 'six-') + +for lib in third_parties: + lib_zip = find_zip(lib) + if lib_zip: + sys.path.insert(0, lib_zip) warnings.filterwarnings("ignore", r".*blist.*") try: @@ -118,7 +117,7 @@ cqlshlibdir = os.path.join(CASSANDRA_PATH, 'pylib') if os.path.isdir(cqlshlibdir): sys.path.insert(0, cqlshlibdir) -from cqlshlib import cqlhandling, cql3handling, pylexotron, sslhandling, async_insert +from cqlshlib import cqlhandling, cql3handling, pylexotron, sslhandling, async_insert, meter from cqlshlib.displaying import (RED, BLUE, CYAN, ANSI_RESET, COLUMN_NAME_COLORS, FormattedValue, colorme) from cqlshlib.formatting import format_by_type, formatter_for, format_value_utype @@ -1361,7 +1360,7 @@ class Shell(cmd.Cmd): linesource.close() elif self.tty: print - return rownum-1 + return rownum def create_insert_statement(self, columns, nullval, table_meta, row): @@ -1437,23 +1436,25 @@ class Shell(cmd.Cmd): except IOError, e: self.printerr("Can't open %r for writing: %s" % (fname, e)) return 0 + wmeter = meter.Meter() try: + dump = self.prep_export_dump(ks, cf, columns) writer = csv.writer(csvdest, **dialect_options) if header: writer.writerow(columns) - rows = 0 for row in dump: fmt = lambda v: \ format_value(v, output_encoding=encoding, nullval=nullval, time_format=self.display_time_format, float_precision=self.display_float_precision).strval writer.writerow(map(fmt, row.values())) - rows += 1 + wmeter.mark_written() + wmeter.done() finally: if do_close: csvdest.close() - return rows + return wmeter.num_finished() def prep_export_dump(self, ks, cf, columns): if columns is None: http://git-wip-us.apache.org/repos/asf/cassandra/blob/d01250d6/pylib/cqlshlib/async_insert.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/async_insert.py b/pylib/cqlshlib/async_insert.py index a4adcd2..d325716 100644 --- a/pylib/cqlshlib/async_insert.py +++ b/pylib/cqlshlib/async_insert.py @@ -14,11 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from itertools import count from threading import Event, Condition +from . import meter import sys - class _CountDownLatch(object): def __init__(self, counter=1): self._count = counter @@ -47,10 +46,10 @@ class _ChainedWriter(object): self._session = session self._cancellation_event = Event() self._first_error = None - self._num_finished = count(start=1) self._task_counter = _CountDownLatch(self.CONCURRENCY) self._enumerated_reader = enumerated_reader self._statement_func = statement_func + self._meter = meter.Meter() def insert(self): if not self._enumerated_reader: @@ -65,8 +64,9 @@ class _ChainedWriter(object): self._cancellation_event.set() sys.stdout.write('Aborting due to keyboard interrupt\n') self._task_counter.await() + self._meter.done() + return self._meter.num_finished(), self._first_error - return next(self._num_finished), self._first_error def _abort(self, error, failed_record): if not self._first_error: @@ -83,10 +83,7 @@ class _ChainedWriter(object): return if result is not self._sentinel: - finished = next(self._num_finished) - if not finished % 1000: - sys.stdout.write('Imported %s rows\r' % finished) - sys.stdout.flush() + self._meter.mark_written() try: (current_record, row) = next(self._enumerated_reader) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d01250d6/pylib/cqlshlib/meter.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/meter.py b/pylib/cqlshlib/meter.py new file mode 100644 index 0000000..e1a6bfc --- /dev/null +++ b/pylib/cqlshlib/meter.py @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from time import time +import sys +from threading import RLock + + +class Meter(object): + + def __init__(self): + self._num_finished = 0 + self._last_checkpoint_time = None + self._current_rate = 0.0 + self._lock = RLock() + + def mark_written(self): + with self._lock: + if not self._last_checkpoint_time: + self._last_checkpoint_time = time() + self._num_finished += 1 + + if self._num_finished % 10000 == 0: + previous_checkpoint_time = self._last_checkpoint_time + self._last_checkpoint_time = time() + new_rate = 10000.0 / (self._last_checkpoint_time - previous_checkpoint_time) + if self._current_rate == 0.0: + self._current_rate = new_rate + else: + self._current_rate = (self._current_rate + new_rate) / 2.0 + + if self._num_finished % 1000 != 0: + return + output = 'Processed %s rows; Write: %.2f rows/s\r' % \ + (self._num_finished, self._current_rate) + sys.stdout.write(output) + sys.stdout.flush() + + def num_finished(self): + with self._lock: + return self._num_finished + + def done(self): + print "" + +