Repository: cassandra Updated Branches: refs/heads/trunk e39454873 -> cad3a2d51
Revert "Update cqlsh driver for new driver execution API" This reverts commit acf67d559c5c25f1ec1a6070c2d91777dd3420b4. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0fbf7159 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0fbf7159 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0fbf7159 Branch: refs/heads/trunk Commit: 0fbf715916b48a8e8abad5911e2697791b49f824 Parents: acf67d5 Author: Sylvain Lebresne <[email protected]> Authored: Fri Oct 30 14:51:26 2015 +0100 Committer: Sylvain Lebresne <[email protected]> Committed: Fri Oct 30 14:51:26 2015 +0100 ---------------------------------------------------------------------- bin/cqlsh.py | 69 ++++++++++--------- lib/cassandra-driver-internal-only-2.7.2.zip | Bin 0 -> 229600 bytes ...iver-internal-only-3.0.0a2.post0-95c6008.zip | Bin 233564 -> 0 bytes 3 files changed, 38 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fbf7159/bin/cqlsh.py ---------------------------------------------------------------------- diff --git a/bin/cqlsh.py b/bin/cqlsh.py index 17bddd3..09da020 100644 --- a/bin/cqlsh.py +++ b/bin/cqlsh.py @@ -111,13 +111,13 @@ except ImportError, e: 'Error: %s\n' % (sys.executable, sys.path, e)) from cassandra.auth import PlainTextAuthProvider -from cassandra.cluster import Cluster +from cassandra.cluster import Cluster, PagedResult from cassandra.metadata import (ColumnMetadata, KeyspaceMetadata, TableMetadata, protect_name, protect_names, protect_value) from cassandra.policies import WhiteListRoundRobinPolicy from cassandra.protocol import QueryMessage, ResultMessage -from cassandra.query import SimpleStatement, ordered_dict_factory, TraceUnavailable +from cassandra.query import SimpleStatement, ordered_dict_factory # cqlsh should run correctly when run out of a Cassandra source tree, # out of an unpacked Cassandra tarball, and after a proper package install. @@ -675,7 +675,6 @@ class Shell(cmd.Cmd): self.session.default_timeout = client_timeout self.session.row_factory = ordered_dict_factory - self.session.default_consistency_level = cassandra.ConsistencyLevel.ONE self.get_connection_versions() self.current_keyspace = keyspace @@ -1122,8 +1121,8 @@ class Shell(cmd.Cmd): def do_use(self, parsed): ksname = parsed.get_binding('ksname') - success, _ = self.perform_simple_statement(SimpleStatement(parsed.extract_orig())) - if success: + result, future = self.perform_simple_statement(SimpleStatement(parsed.extract_orig())) + if result: if ksname[0] == '"' and ksname[-1] == '"': self.current_keyspace = self.cql_unprotect_name(ksname) else: @@ -1140,7 +1139,7 @@ class Shell(cmd.Cmd): def perform_statement(self, statement): stmt = SimpleStatement(statement, consistency_level=self.consistency_level, serial_consistency_level=self.serial_consistency_level, fetch_size=self.page_size if self.use_paging else None) - success, future = self.perform_simple_statement(stmt) + result, future = self.perform_simple_statement(stmt) if future: if future.warnings: @@ -1148,17 +1147,19 @@ class Shell(cmd.Cmd): if self.tracing_enabled: try: - for trace in future.get_all_query_traces(self.max_trace_wait): + trace = future.get_query_trace(self.max_trace_wait) + if trace: print_trace(self, trace) - except TraceUnavailable: - msg = "Statement trace did not complete within %d seconds; trace data may be incomplete." % (self.session.max_trace_wait,) - self.writeresult(msg, color=RED) - for trace_id in future.get_query_trace_ids(): - self.show_session(trace_id) + elif stmt.trace_id: + self.writeresult("This statement trace may be incomplete", color=RED) + self.show_session(stmt.trace_id) + else: + msg = "Statement trace did not complete within %d seconds" % (self.session.max_trace_wait) + self.writeresult(msg, color=RED) except Exception, err: self.printerr("Unable to fetch query trace: %s" % (str(err),)) - return success + return result def parse_for_table_meta(self, query_string): try: @@ -1176,7 +1177,7 @@ class Shell(cmd.Cmd): while True: try: future = self.session.execute_async(statement, trace=self.tracing_enabled) - result = future.result() + rows = future.result(self.session.default_timeout) break except cassandra.OperationTimedOut, err: self.refresh_schema_metadata_best_effort() @@ -1191,36 +1192,42 @@ class Shell(cmd.Cmd): return False, None if statement.query_string[:6].lower() == 'select': - self.print_result(result, self.parse_for_table_meta(statement.query_string)) + self.print_result(rows, self.parse_for_table_meta(statement.query_string)) elif statement.query_string.lower().startswith("list users") or statement.query_string.lower().startswith("list roles"): - self.print_result(result, self.get_table_meta('system_auth', 'roles')) + self.print_result(rows, self.get_table_meta('system_auth', 'roles')) elif statement.query_string.lower().startswith("list"): - self.print_result(result, self.get_table_meta('system_auth', 'role_permissions')) - elif result: + self.print_result(rows, self.get_table_meta('system_auth', 'role_permissions')) + elif rows: # CAS INSERT/UPDATE self.writeresult("") - self.print_static_result(list(result), self.parse_for_table_meta(statement.query_string)) + self.print_static_result(rows, self.parse_for_table_meta(statement.query_string)) self.flush_output() return True, future - def print_result(self, result, table_meta): + def print_result(self, rows, table_meta): self.decoding_errors = [] self.writeresult("") - if result.has_more_pages and self.tty: + if isinstance(rows, PagedResult) and self.tty: num_rows = 0 while True: - page = result.current_rows - if page: - num_rows += len(page) - self.print_static_result(page, table_meta) - if result.has_more_pages: - raw_input("---MORE---") - result.fetch_next_page() - else: + page = list(rows.current_response) + if not page: + break + num_rows += len(page) + self.print_static_result(page, table_meta) + if not rows.response_future.has_more_pages: break + raw_input("---MORE---") + + rows.response_future.start_fetching_next_page() + result = rows.response_future.result() + if rows.response_future.has_more_pages: + rows.current_response = result.current_response + else: + rows.current_response = iter(result) else: - rows = list(result) + rows = list(rows or []) num_rows = len(rows) self.print_static_result(rows, table_meta) self.writeresult("(%d rows)" % num_rows) @@ -2328,7 +2335,7 @@ class ImportProcess(multiprocessing.Process): cqltypes = [table_meta.columns[name].typestring for name in self.columns] pk_indexes = [self.columns.index(col.name) for col in table_meta.primary_key] query = 'INSERT INTO %s.%s (%s) VALUES (%%s)' % ( - protect_name(table_meta.keyspace_name), + protect_name(table_meta.keyspace.name), protect_name(table_meta.name), ', '.join(protect_names(self.columns))) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fbf7159/lib/cassandra-driver-internal-only-2.7.2.zip ---------------------------------------------------------------------- diff --git a/lib/cassandra-driver-internal-only-2.7.2.zip b/lib/cassandra-driver-internal-only-2.7.2.zip new file mode 100644 index 0000000..f2e75f1 Binary files /dev/null and b/lib/cassandra-driver-internal-only-2.7.2.zip differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0fbf7159/lib/cassandra-driver-internal-only-3.0.0a2.post0-95c6008.zip ---------------------------------------------------------------------- diff --git a/lib/cassandra-driver-internal-only-3.0.0a2.post0-95c6008.zip b/lib/cassandra-driver-internal-only-3.0.0a2.post0-95c6008.zip deleted file mode 100644 index da7fa0d..0000000 Binary files a/lib/cassandra-driver-internal-only-3.0.0a2.post0-95c6008.zip and /dev/null differ
