cqlsh: add COPY command to load data from CSV flat files Patch by paul cannon, reviewed by brandonwilliams for CASSANDRA-4012
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0ba2631e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0ba2631e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0ba2631e Branch: refs/heads/trunk Commit: 0ba2631ee228bdefaba61a53d723a65107ca044d Parents: 0cc168a Author: Brandon Williams <brandonwilli...@apache.org> Authored: Mon Jun 18 13:24:32 2012 -0500 Committer: Brandon Williams <brandonwilli...@apache.org> Committed: Mon Jun 18 13:24:32 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + bin/cqlsh | 227 +++++++++++++++++++++++++++++++++-- pylib/cqlshlib/cql3handling.py | 4 +- pylib/cqlshlib/cqlhandling.py | 15 ++- 4 files changed, 233 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ba2631e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 693b03b..ec03ca6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ * (cql3) Reject (not yet supported) creation of 2ndardy indexes on tables with composite primary keys (CASSANDRA-4328) * Set JVM stack size to 160k for java 7 (CASSANDRA-4275) + * cqlsh: add COPY command to load data from CSV flat files (CASSANDRA-4012) Merged from 1.0: * Set gc_grace on index CF to 0 (CASSANDRA-4314) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ba2631e/bin/cqlsh ---------------------------------------------------------------------- diff --git a/bin/cqlsh b/bin/cqlsh index fecd472..842a313 100755 --- a/bin/cqlsh +++ b/bin/cqlsh @@ -36,7 +36,7 @@ version = "2.2.0" from StringIO import StringIO from itertools import groupby -from contextlib import contextmanager +from contextlib import contextmanager, closing from glob import glob from functools import partial from collections import defaultdict @@ -52,6 +52,7 @@ import locale import re import platform import warnings +import csv # cqlsh should run correctly when run out of a Cassandra source tree, # out of an unpacked Cassandra tarball, and after a proper package install. @@ -189,6 +190,7 @@ cqlsh_extra_syntax_rules = r''' | <assumeCommand> | <sourceCommand> | <captureCommand> + | <copyCommand> | <debugCommand> | <helpCommand> | <exitCommand> @@ -220,6 +222,15 @@ cqlsh_extra_syntax_rules = r''' <captureCommand> ::= "CAPTURE" ( fname=( <stringLiteral> | "OFF" ) )? ; +<copyCommand> ::= "COPY" cf=<columnFamilyName> + ( "(" [colnames]=<colname> ( "," [colnames]=<colname> )* ")" )? + "FROM" ( fname=<stringLiteral> | "STDIN" ) + ( "WITH" <copyOption> ( "AND" <copyOption> )* )? + ; + +<copyOption> ::= [optnames]=<cfOptionName> "=" [optvals]=<cfOptionVal> + ; + # avoiding just "DEBUG" so that this rule doesn't get treated as a terminal <debugCommand> ::= "DEBUG" "THINGS"? ; @@ -272,6 +283,41 @@ cqlsh_syntax_completer('sourceCommand', 'fname') \ cqlsh_syntax_completer('captureCommand', 'fname') \ (complete_source_quoted_filename) +@cqlsh_syntax_completer('copyCommand', 'fname') +def copy_fname_completer(ctxt, cqlsh): + lasttype = ctxt.get_binding('*LASTTYPE*') + if lasttype == 'unclosedString': + return complete_source_quoted_filename(ctxt, cqlsh) + partial = ctxt.get_binding('partial') + if partial == '': + return ["'"] + return () + +@cqlsh_syntax_completer('copyCommand', 'colnames') +def complete_copy_column_names(ctxt, cqlsh): + existcols = map(cqlsh.cql_unprotect_name, ctxt.get_binding('colnames', ())) + ks = cqlsh.cql_unprotect_name(ctxt.get_binding('ksname', None)) + cf = cqlsh.cql_unprotect_name(ctxt.get_binding('cfname')) + colnames = cqlsh.get_column_names(ks, cf) + if len(existcols) == 0: + return [colnames[0]] + return set(colnames[1:]) - set(existcols) + +COPY_OPTIONS = ('DELIMITER', 'QUOTE', 'ESCAPE', 'HEADER') + +@cqlsh_syntax_completer('copyOption', 'optnames') +def complete_copy_options(ctxt, cqlsh): + optnames = map(str.upper, ctxt.get_binding('optnames', ())) + return set(COPY_OPTIONS) - set(optnames) + +@cqlsh_syntax_completer('copyOption', 'optvals') +def complete_copy_opt_values(ctxt, cqlsh): + optnames = ctxt.get_binding('optnames', ()) + lastopt = optnames[-1].lower() + if lastopt == 'header': + return ['true', 'false'] + return [cqlhandling.Hint('<single_character_string>')] + class NoKeyspaceError(Exception): pass @@ -469,6 +515,22 @@ def show_warning_without_quoting_line(message, category, filename, lineno, file= warnings.showwarning = show_warning_without_quoting_line warnings.filterwarnings('always', category=cql3handling.UnexpectedTableStructure) +def describe_interval(seconds): + desc = [] + for length, unit in ((86400, 'day'), (3600, 'hour'), (60, 'minute')): + num = int(seconds) / length + if num > 0: + desc.append('%d %s' % (num, unit)) + if num > 1: + desc[-1] += 's' + seconds %= length + words = '%.03f seconds' % seconds + if len(desc) > 1: + words = ', '.join(desc) + ', and ' + words + elif len(desc) == 1: + words = desc[0] + ' and ' + words + return words + class Shell(cmd.Cmd): default_prompt = "cqlsh> " continue_prompt = " ... " @@ -481,6 +543,8 @@ class Shell(cmd.Cmd): debug = False stop = False shunted_query_out = None + csv_dialect_defaults = dict(delimiter=',', doublequote=False, + escapechar='\\', quotechar='"') def __init__(self, hostname, port, color=False, username=None, password=None, encoding=None, stdin=None, tty=True, @@ -522,7 +586,6 @@ class Shell(cmd.Cmd): stdin = sys.stdin self.tty = tty if tty: - self.prompt = None self.reset_prompt() self.report_connection() print 'Use HELP for help.' @@ -661,6 +724,25 @@ class Shell(cmd.Cmd): filterable.add(cm.name) return filterable + def get_column_names(self, ksname, cfname): + if ksname is None: + ksname = self.current_keyspace + if self.cqlver_atleast(3): + return self.get_column_names_from_layout(ksname, cfname) + else: + return self.get_column_names_from_cfdef(ksname, cfname) + + def get_column_names_from_layout(self, ksname, cfname): + layout = self.get_columnfamily_layout(ksname, cfname) + return [col.name for col in layout.columns] + + def get_column_names_from_cfdef(self, ksname, cfname): + cfdef = self.get_columnfamily(cfname, ksname=ksname) + key_alias = cfdef.key_alias + if key_alias is None: + key_alias = 'KEY' + return [key_alias] + [cm.name for cm in cfdef.column_metadata] + # ===== thrift-dependent parts ===== def get_cluster_name(self): @@ -758,16 +840,25 @@ class Shell(cmd.Cmd): def get_input_line(self, prompt=''): if self.tty: - line = raw_input(self.prompt) + '\n' + line = raw_input(prompt) + '\n' else: - sys.stdout.write(self.prompt) - sys.stdout.flush() line = self.stdin.readline() if not len(line): raise EOFError self.lineno += 1 return line + def use_stdin_reader(self, until='', prompt=''): + until += '\n' + while True: + try: + newline = self.get_input_line(prompt=prompt) + except EOFError: + return + if newline == until: + return + yield newline + def cmdloop(self): """ Adapted from cmd.Cmd's version, because there is literally no way with @@ -1065,8 +1156,7 @@ class Shell(cmd.Cmd): debug=debug_completion, startsymbol='cqlshCommand') def set_prompt(self, prompt): - if self.prompt != '': - self.prompt = prompt + self.prompt = prompt def cql_protect_name(self, name): return cqlruleset.maybe_escape_name(name) @@ -1339,6 +1429,125 @@ class Shell(cmd.Cmd): do_desc = do_describe + def do_copy(self, parsed): + r""" + COPY [cqlsh only] + + Imports CSV data into a Cassandra table. + + COPY <table_name> [ ( column [, ...] ) ] + FROM ( '<filename>' | STDIN ) + [ WITH <option>='value' [AND ...] ]; + + Available options and defaults: + + DELIMITER=',' - character that appears between records + QUOTE='"' - quoting character to be used to quote fields + ESCAPE='\' - character to appear before the QUOTE char when quoted + HEADER=false - whether to ignore the first line + + When entering CSV data on STDIN, you can use the sequence "\." + on a line by itself to end the data input. + """ + + ks = self.cql_unprotect_name(parsed.get_binding('ksname', None)) + if ks is None: + ks = self.current_keyspace + cf = self.cql_unprotect_name(parsed.get_binding('cfname')) + columns = parsed.get_binding('colnames', None) + if columns is None: + # default to all known columns + columns = self.get_column_names(ks, cf) + else: + columns = map(self.cql_unprotect_name, columns) + fname = parsed.get_binding('fname', None) + if fname is not None: + fname = os.path.expanduser(self.cql_unprotect_value(fname)) + copyoptnames = map(str.lower, parsed.get_binding('optnames', ())) + copyoptvals = map(self.cql_unprotect_value, parsed.get_binding('optvals', ())) + opts = dict(zip(copyoptnames, copyoptvals)) + + # when/if COPY TO is supported, this would be a good place to branch + # on direction. + + timestart = time.time() + rows = self.perform_csv_import(ks, cf, columns, fname, opts) + timeend = time.time() + + print "%d rows imported in %s." % (rows, describe_interval(timeend - timestart)) + + def perform_csv_import(self, ks, cf, columns, fname, opts): + dialect_options = self.csv_dialect_defaults.copy() + if 'quote' in opts: + dialect_options['quotechar'] = opts.pop('quote') + if 'escape' in opts: + dialect_options['escapechar'] = opts.pop('escape') + if 'delimiter' in opts: + dialect_options['delimiter'] = opts.pop('delimiter') + header = bool(opts.pop('header', '').lower() == 'true') + if dialect_options['quotechar'] == dialect_options['escapechar']: + dialect_options['doublequote'] = True + del dialect_options['escapechar'] + + if opts: + self.printerr('Unrecognized COPY FROM options: %s' + % ', '.join(opts.keys())) + return 0 + + if fname is None: + do_close = False + print "[Use \. on a line by itself to end input]" + linesource = self.use_stdin_reader(prompt='[copy] ', until=r'\.') + else: + do_close = True + try: + linesource = open(fname, 'r') + except IOError, e: + self.printerr("Can't open %r for reading: %s" % (fname, e)) + return 0 + if header: + linesource.next() + + prepq = self.prep_import_insert(ks, cf, columns) + try: + reader = csv.reader(linesource, **dialect_options) + for rownum, row in enumerate(reader): + if len(row) != len(columns): + self.printerr("Record #%d (line %d) has the wrong number of fields " + "(%d instead of %d)." + % (rownum, reader.line_num, len(row), len(columns))) + return rownum + if not self.do_import_insert(prepq, row): + self.printerr("Aborting import at record #%d (line %d). " + "Previously-inserted values still present." + % (rownum, reader.line_num)) + return rownum + finally: + if do_close: + linesource.close() + elif self.tty: + print + return rownum + 1 + + def prep_import_insert(self, ks, cf, columns): + # would be nice to be able to use a prepared query here, but in order + # to use that interface, we'd need to have all the input as native + # values already, reading them from text just like the various + # Cassandra cql types do. Better just to submit them all as intact + # CQL string literals and let Cassandra do its thing. + return 'INSERT INTO %s.%s (%s) VALUES (%%s)' % ( + self.cql_protect_name(ks), + self.cql_protect_name(cf), + ', '.join(map(self.cql_protect_name, columns)) + ) + + def do_import_insert(self, prepq, rowvalues): + valstring = ', '.join(map(self.cql_protect_value, rowvalues)) + cql = prepq % valstring + if self.debug: + print "Import using CQL: %s" % cql + return self.perform_statement(cql) + def do_show(self, parsed): """ SHOW [cqlsh only] @@ -1457,7 +1666,7 @@ class Shell(cmd.Cmd): """ fname = parsed.get_binding('fname') - fname = os.path.expanduser(cqlsh.cql_unprotect_value(fname)) + fname = os.path.expanduser(self.cql_unprotect_value(fname)) try: f = open(fname, 'r') except IOError, e: @@ -1521,7 +1730,7 @@ class Shell(cmd.Cmd): ' to disable.' % (self.query_out.name,)) return - fname = os.path.expanduser(cqlsh.cql_unprotect_value(fname)) + fname = os.path.expanduser(self.cql_unprotect_value(fname)) try: f = open(fname, 'a') except IOError, e: http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ba2631e/pylib/cqlshlib/cql3handling.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py index 8785a65..2bc7ef9 100644 --- a/pylib/cqlshlib/cql3handling.py +++ b/pylib/cqlshlib/cql3handling.py @@ -267,8 +267,8 @@ def unreserved_keyword_completer(ctxt, cass): return () def get_cf_layout(ctxt, cass): - ks = ctxt.get_binding('ksname', None) - cf = ctxt.get_binding('cfname') + ks = dequote_name(ctxt.get_binding('ksname', None)) + cf = dequote_name(ctxt.get_binding('cfname')) return cass.get_columnfamily_layout(ks, cf) syntax_rules += r''' http://git-wip-us.apache.org/repos/asf/cassandra/blob/0ba2631e/pylib/cqlshlib/cqlhandling.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/cqlhandling.py b/pylib/cqlshlib/cqlhandling.py index 3866f3c..aa29c60 100644 --- a/pylib/cqlshlib/cqlhandling.py +++ b/pylib/cqlshlib/cqlhandling.py @@ -215,22 +215,25 @@ class CqlParsingRuleSet(pylexotron.ParsingRuleSet): # inside a string literal prefix = None dequoter = util.identity + lasttype = None if tokens: - if tokens[-1][0] == 'unclosedString': + lasttype = tokens[-1][0] + if lasttype == 'unclosedString': prefix = self.token_dequote(tokens[-1]) tokens = tokens[:-1] partial = prefix + partial dequoter = self.dequote_value requoter = self.escape_value - elif tokens[-1][0] == 'unclosedName': + elif lasttype == 'unclosedName': prefix = self.token_dequote(tokens[-1]) tokens = tokens[:-1] partial = prefix + partial dequoter = self.dequote_name requoter = self.escape_name - elif tokens[-1][0] == 'unclosedComment': + elif lasttype == 'unclosedComment': return [] bindings['partial'] = partial + bindings['*LASTTYPE*'] = lasttype bindings['*SRC*'] = text # find completions for the position @@ -302,6 +305,7 @@ class CqlParsingRuleSet(pylexotron.ParsingRuleSet): init_bindings = {'cassandra_conn': cassandra_conn} if debug: init_bindings['*DEBUG*'] = True + print "cql_complete(%r, partial=%r)" % (text, partial) completions, hints = self.cql_complete_single(text, partial, init_bindings, startsymbol=startsymbol) @@ -495,6 +499,7 @@ JUNK ::= /([ \t\r\f\v]+|(--|[/][/])[^\n\r]*([\n\r]|$)|[/][*].*?[*][/])/ ; ; <colname> ::= <term> | <identifier> + | nocomplete=<K_KEY> ; <statementBody> ::= <useStatement> @@ -528,6 +533,10 @@ JUNK ::= /([ \t\r\f\v]+|(--|[/][/])[^\n\r]*([\n\r]|$)|[/][*].*?[*][/])/ ; <columnFamilyName> ::= ( ksname=<name> "." )? cfname=<name> ; ''' +@completer_for('colname', 'nocomplete') +def nocomplete(ctxt, cass): + return () + @completer_for('consistencylevel', 'cl') def cl_completer(ctxt, cass): return CqlRuleSet.consistency_levels