Match cassandra-loader options in COPY FROM (2.2 version)

patch by Stefania; reviewed by pauloricardomg for CASSANDRA-9303


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/202cf9b0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/202cf9b0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/202cf9b0

Branch: refs/heads/cassandra-3.2
Commit: 202cf9b0bed8bbff41318f1f10043aabf3a7cd4d
Parents: 078aabe
Author: Stefania Alborghetti <stefania.alborghe...@datastax.com>
Authored: Wed Jan 6 12:10:13 2016 +0100
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Wed Jan 6 17:56:30 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |    1 +
 NEWS.txt                                        |    7 +
 bin/cqlsh.py                                    |  135 +-
 conf/cqlshrc.sample                             |   17 +-
 pylib/cqlshlib/copyutil.py                      | 1260 +++++++++++++-----
 pylib/cqlshlib/formatting.py                    |   96 +-
 .../cql3/statements/BatchStatement.java         |   28 +-
 .../cassandra/service/ClientWarningsTest.java   |    5 +-
 tools/bin/cassandra-stress.bat                  |    2 +-
 9 files changed, 1134 insertions(+), 417 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/202cf9b0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fc87c7d..b12f593 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,6 +13,7 @@
  * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
  * Verify tables in pseudo-system keyspaces at startup (CASSANDRA-10761)
 Merged from 2.1:
+ * Match cassandra-loader options in COPY FROM (CASSANDRA-9303)
  * Fix binding to any address in CqlBulkRecordWriter (CASSANDRA-9309)
  * cqlsh fails to decode utf-8 characters for text typed columns 
(CASSANDRA-10875)
  * Log error when stream session fails (CASSANDRA-9294)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/202cf9b0/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 57e321e..8cbe4f7 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -269,6 +269,13 @@ Upgrading
      to exclude data centers when the global status is enabled, see 
CASSANDRA-9035 for details.
 
 
+2.1.13
+======
+
+New features
+------------
+    - New options for cqlsh COPY FROM and COPY TO, see CASSANDRA-9303 for 
details.
+
 2.1.10
 =====
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/202cf9b0/bin/cqlsh.py
----------------------------------------------------------------------
diff --git a/bin/cqlsh.py b/bin/cqlsh.py
index 8469836..c38bc2e 100644
--- a/bin/cqlsh.py
+++ b/bin/cqlsh.py
@@ -41,7 +41,6 @@ import optparse
 import os
 import platform
 import sys
-import time
 import traceback
 import warnings
 import webbrowser
@@ -151,7 +150,8 @@ cqlshlibdir = os.path.join(CASSANDRA_PATH, 'pylib')
 if os.path.isdir(cqlshlibdir):
     sys.path.insert(0, cqlshlibdir)
 
-from cqlshlib import cql3handling, cqlhandling, copyutil, pylexotron, 
sslhandling
+from cqlshlib import cql3handling, cqlhandling, pylexotron, sslhandling
+from cqlshlib.copyutil import ExportTask, ImportTask
 from cqlshlib.displaying import (ANSI_RESET, BLUE, COLUMN_NAME_COLORS, CYAN,
                                  RED, FormattedValue, colorme)
 from cqlshlib.formatting import (DEFAULT_DATE_FORMAT, DEFAULT_NANOTIME_FORMAT,
@@ -456,10 +456,12 @@ def complete_copy_column_names(ctxt, cqlsh):
     return set(colnames[1:]) - set(existcols)
 
 
-COPY_COMMON_OPTIONS = ['DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER', 'NULL',
-                       'MAXATTEMPTS', 'REPORTFREQUENCY']
-COPY_FROM_OPTIONS = ['CHUNKSIZE', 'INGESTRATE', 'MAXBATCHSIZE', 'MINBATCHSIZE']
-COPY_TO_OPTIONS = ['ENCODING', 'TIMEFORMAT', 'PAGESIZE', 'PAGETIMEOUT', 
'MAXREQUESTS']
+COPY_COMMON_OPTIONS = ['DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER', 'NULL', 
'DATETIMEFORMAT',
+                       'MAXATTEMPTS', 'REPORTFREQUENCY', 'DECIMALSEP', 
'THOUSANDSSEP', 'BOOLSTYLE',
+                       'NUMPROCESSES', 'CONFIGFILE', 'RATEFILE']
+COPY_FROM_OPTIONS = ['CHUNKSIZE', 'INGESTRATE', 'MAXBATCHSIZE', 
'MINBATCHSIZE', 'MAXROWS',
+                     'SKIPROWS', 'SKIPCOLS', 'MAXPARSEERRORS', 
'MAXINSERTERRORS', 'ERRFILE']
+COPY_TO_OPTIONS = ['ENCODING', 'PAGESIZE', 'PAGETIMEOUT', 'BEGINTOKEN', 
'ENDTOKEN', 'MAXOUTPUTSIZE', 'MAXREQUESTS']
 
 
 @cqlsh_syntax_completer('copyOption', 'optnames')
@@ -575,23 +577,6 @@ warnings.showwarning = show_warning_without_quoting_line
 warnings.filterwarnings('always', 
category=cql3handling.UnexpectedTableStructure)
 
 
-def describe_interval(seconds):
-    desc = []
-    for length, unit in ((86400, 'day'), (3600, 'hour'), (60, 'minute')):
-        num = int(seconds) / length
-        if num > 0:
-            desc.append('%d %s' % (num, unit))
-            if num > 1:
-                desc[-1] += 's'
-        seconds %= length
-    words = '%.03f seconds' % seconds
-    if len(desc) > 1:
-        words = ', '.join(desc) + ', and ' + words
-    elif len(desc) == 1:
-        words = desc[0] + ' and ' + words
-    return words
-
-
 def insert_driver_hooks():
     extend_cql_deserialization()
     auto_format_udts()
@@ -658,8 +643,7 @@ class Shell(cmd.Cmd):
     last_hist = None
     shunted_query_out = None
     use_paging = True
-    csv_dialect_defaults = dict(delimiter=',', doublequote=False,
-                                escapechar='\\', quotechar='"')
+
     default_page_size = 100
 
     def __init__(self, hostname, port, color=False,
@@ -1711,32 +1695,67 @@ class Shell(cmd.Cmd):
           COPY x TO: Exports data from a Cassandra table in CSV format.
 
         COPY <table_name> [ ( column [, ...] ) ]
-             FROM ( '<filename>' | STDIN )
+             FROM ( '<file_pattern_1, file_pattern_2, ... file_pattern_n>' | 
STDIN )
              [ WITH <option>='value' [AND ...] ];
 
+        File patterns are either file names or valid python glob expressions, 
e.g. *.csv or folder/*.csv.
+
         COPY <table_name> [ ( column [, ...] ) ]
              TO ( '<filename>' | STDOUT )
              [ WITH <option>='value' [AND ...] ];
 
-        Available options and defaults:
+        Available common COPY options and defaults:
 
           DELIMITER=','           - character that appears between records
           QUOTE='"'               - quoting character to be used to quote 
fields
           ESCAPE='\'              - character to appear before the QUOTE char 
when quoted
           HEADER=false            - whether to ignore the first line
           NULL=''                 - string that represents a null value
-          ENCODING='utf8'         - encoding for CSV output (COPY TO)
-          TIMEFORMAT=             - timestamp strftime format (COPY TO)
+          DATETIMEFORMAT=         - timestamp strftime format
             '%Y-%m-%d %H:%M:%S%z'   defaults to time_format value in cqlshrc
-          MAXREQUESTS=6           - the maximum number of requests each worker 
process can work on in parallel (COPY TO)
-          PAGESIZE=1000           - the page size for fetching results (COPY 
TO)
-          PAGETIMEOUT=10          - the page timeout for fetching results 
(COPY TO)
-          MAXATTEMPTS=5           - the maximum number of attempts for errors
-          CHUNKSIZE=1000          - the size of chunks passed to worker 
processes (COPY FROM)
-          INGESTRATE=100000       - an approximate ingest rate in rows per 
second (COPY FROM)
-          MAXBATCHSIZE=20         - the maximum size of an import batch (COPY 
FROM)
-          MINBATCHSIZE=2          - the minimum size of an import batch (COPY 
FROM)
+          MAXATTEMPTS=5           - the maximum number of attempts per batch 
or range
           REPORTFREQUENCY=0.25    - the frequency with which we display status 
updates in seconds
+          DECIMALSEP='.'          - the separator for decimal values
+          THOUSANDSSEP=''         - the separator for thousands digit groups
+          BOOLSTYLE='True,False'  - the representation for booleans, case 
insensitive, specify true followed by false,
+                                    for example yes,no or 1,0
+          NUMPROCESSES=n          - the number of worker processes, by default 
the number of cores minus one
+                                    capped at 16
+          CONFIGFILE=''           - a configuration file with the same format 
as .cqlshrc (see the Python ConfigParser
+                                    documentation) where you can specify WITH 
options under the following optional
+                                    sections: [copy], [copy-to], [copy-from], 
[copy:ks.table], [copy-to:ks.table],
+                                    [copy-from:ks.table], where <ks> is your 
keyspace name and <table> is your table
+                                    name. Options are read from these 
sections, in the order specified
+                                    above, and command line options always 
override options in configuration files.
+                                    Depending on the COPY direction, only the 
relevant copy-from or copy-to sections
+                                    are used. If no configfile is specified 
then .cqlshrc is searched instead.
+          RATEFILE=''             - an optional file where to print the output 
statistics
+
+        Available COPY FROM options and defaults:
+
+          CHUNKSIZE=1000          - the size of chunks passed to worker 
processes
+          INGESTRATE=100000       - an approximate ingest rate in rows per 
second
+          MINBATCHSIZE=2          - the minimum size of an import batch
+          MAXBATCHSIZE=20         - the maximum size of an import batch
+          MAXROWS=-1              - the maximum number of rows, -1 means no 
maximum
+          SKIPROWS=0              - the number of rows to skip
+          SKIPCOLS=''             - a comma separated list of column names to 
skip
+          MAXPARSEERRORS=-1       - the maximum global number of parsing 
errors, -1 means no maximum
+          MAXINSERTERRORS=-1      - the maximum global number of insert 
errors, -1 means no maximum
+          ERRFILE=''              - a file where to store all rows that could 
not be imported, by default this is
+                                    import_ks_table.err where <ks> is your 
keyspace and <table> is your table name.
+
+        Available COPY TO options and defaults:
+
+          ENCODING='utf8'          - encoding for CSV output
+          PAGESIZE='1000'          - the page size for fetching results
+          PAGETIMEOUT=10           - the page timeout in seconds for fetching 
results
+          BEGINTOKEN=''            - the minimum token string to consider when 
exporting data
+          ENDTOKEN=''              - the maximum token string to consider when 
exporting data
+          MAXREQUESTS=6            - the maximum number of requests each 
worker process can work on in parallel
+          MAXOUTPUTSIZE='-1'       - the maximum size of the output file 
measured in number of lines,
+                                     beyond this maximum the output file will 
be split into segments,
+                                     -1 means unlimited.
 
         When entering CSV data on STDIN, you can use the sequence "\."
         on a line by itself to end the data input.
@@ -1747,55 +1766,31 @@ class Shell(cmd.Cmd):
             ks = self.current_keyspace
             if ks is None:
                 raise NoKeyspaceError("Not in any keyspace.")
-        cf = self.cql_unprotect_name(parsed.get_binding('cfname'))
+        table = self.cql_unprotect_name(parsed.get_binding('cfname'))
         columns = parsed.get_binding('colnames', None)
         if columns is not None:
             columns = map(self.cql_unprotect_name, columns)
         else:
             # default to all known columns
-            columns = self.get_column_names(ks, cf)
+            columns = self.get_column_names(ks, table)
+
         fname = parsed.get_binding('fname', None)
         if fname is not None:
-            fname = os.path.expanduser(self.cql_unprotect_value(fname))
+            fname = self.cql_unprotect_value(fname)
+
         copyoptnames = map(str.lower, parsed.get_binding('optnames', ()))
         copyoptvals = map(self.cql_unprotect_value, 
parsed.get_binding('optvals', ()))
-        cleancopyoptvals = [optval.decode('string-escape') for optval in 
copyoptvals]
-        opts = dict(zip(copyoptnames, cleancopyoptvals))
-
-        print "\nStarting copy of %s.%s with columns %s." % (ks, cf, columns)
-
-        timestart = time.time()
+        opts = dict(zip(copyoptnames, copyoptvals))
 
         direction = parsed.get_binding('dir').upper()
         if direction == 'FROM':
-            rows = self.perform_csv_import(ks, cf, columns, fname, opts)
-            verb = 'imported'
+            task = ImportTask(self, ks, table, columns, fname, opts, 
DEFAULT_PROTOCOL_VERSION, CONFIG_FILE)
         elif direction == 'TO':
-            rows = self.perform_csv_export(ks, cf, columns, fname, opts)
-            verb = 'exported'
+            task = ExportTask(self, ks, table, columns, fname, opts, 
DEFAULT_PROTOCOL_VERSION, CONFIG_FILE)
         else:
             raise SyntaxError("Unknown direction %s" % direction)
 
-        timeend = time.time()
-        print "\n%d rows %s in %s." % (rows, verb, describe_interval(timeend - 
timestart))
-
-    def perform_csv_import(self, ks, cf, columns, fname, opts):
-        csv_options, dialect_options, unrecognized_options = 
copyutil.parse_options(self, opts)
-        if unrecognized_options:
-            self.printerr('Unrecognized COPY FROM options: %s' % ', 
'.join(unrecognized_options.keys()))
-            return 0
-
-        return copyutil.ImportTask(self, ks, cf, columns, fname, csv_options, 
dialect_options,
-                                   DEFAULT_PROTOCOL_VERSION, CONFIG_FILE).run()
-
-    def perform_csv_export(self, ks, cf, columns, fname, opts):
-        csv_options, dialect_options, unrecognized_options = 
copyutil.parse_options(self, opts)
-        if unrecognized_options:
-            self.printerr('Unrecognized COPY TO options: %s' % ', 
'.join(unrecognized_options.keys()))
-            return 0
-
-        return copyutil.ExportTask(self, ks, cf, columns, fname, csv_options, 
dialect_options,
-                                   DEFAULT_PROTOCOL_VERSION, CONFIG_FILE).run()
+        task.run()
 
     def do_show(self, parsed):
         """

http://git-wip-us.apache.org/repos/asf/cassandra/blob/202cf9b0/conf/cqlshrc.sample
----------------------------------------------------------------------
diff --git a/conf/cqlshrc.sample b/conf/cqlshrc.sample
index 302d25f..4c66861 100644
--- a/conf/cqlshrc.sample
+++ b/conf/cqlshrc.sample
@@ -43,7 +43,7 @@ completekey = tab
 ;browser =
 
 [cql]
-version = 3.1.5
+version = 3.2.1
 
 [connection]
 hostname = 127.0.0.1
@@ -68,3 +68,18 @@ max_trace_wait = 10.0
 
 
 ; vim: set ft=dosini :
+
+;; optional options for COPY TO and COPY FROM
+;[copy]
+;maxattempts=10
+;numprocesses=4
+
+;; optional options for COPY FROM
+;[copy-from]
+;chunksize=5000
+;ingestrate=50000
+
+;; optional options for COPY TO
+;[copy-to]
+;pagesize=2000
+;pagetimeout=20

Reply via email to