This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix-queryserver.git
The following commit(s) were added to refs/heads/master by this push: new 4f4b25c PHOENIX-5999 Hack together a better implementation for executemany using ExecuteBatch 4f4b25c is described below commit 4f4b25c34d74544c7a7f3f43538586d7a81a221c Author: Josh Elser <els...@apache.org> AuthorDate: Thu Jul 9 21:03:10 2020 -0400 PHOENIX-5999 Hack together a better implementation for executemany using ExecuteBatch Closes #43 --- python-phoenixdb/phoenixdb/avatica/client.py | 33 ++++++++++++++++++++++++ python-phoenixdb/phoenixdb/cursor.py | 20 +++++++------- python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py | 2 +- 3 files changed, 43 insertions(+), 12 deletions(-) diff --git a/python-phoenixdb/phoenixdb/avatica/client.py b/python-phoenixdb/phoenixdb/avatica/client.py index 9b28e65..a1f502c 100644 --- a/python-phoenixdb/phoenixdb/avatica/client.py +++ b/python-phoenixdb/phoenixdb/avatica/client.py @@ -508,6 +508,39 @@ class AvaticaClient(object): response.ParseFromString(response_data) return response.results + def execute_batch(self, connection_id, statement_id, rows): + """Returns an array of update counts corresponding to each row written. + + :param connection_id: + ID of the current connection. + + :param statement_id: + ID of the statement to fetch rows from. + + :param rows: + A list of lists corresponding to the columns to bind to the statement + for many rows. + + :returns: + Update counts for the writes. + """ + request = requests_pb2.ExecuteBatchRequest() + request.statement_id = statement_id + request.connection_id = connection_id + if rows is not None: + for row in rows: + batch = requests_pb2.UpdateBatch() + for col in row: + batch.parameter_values.append(col) + request.updates.append(batch) + + response_data = self._apply(request) + response = responses_pb2.ExecuteBatchResponse() + response.ParseFromString(response_data) + if response.missing_statement: + raise errors.DatabaseError('ExecuteBatch reported missing statement', -1) + return response.update_counts + def fetch(self, connection_id, statement_id, offset=0, frame_max_size=None): """Returns a frame of rows. diff --git a/python-phoenixdb/phoenixdb/cursor.py b/python-phoenixdb/phoenixdb/cursor.py index 5521ebc..ad09106 100644 --- a/python-phoenixdb/phoenixdb/cursor.py +++ b/python-phoenixdb/phoenixdb/cursor.py @@ -93,7 +93,7 @@ class Cursor(object): be automatically called at the end of the ``with`` block. """ if self._closed: - raise ProgrammingError('the cursor is already closed') + raise ProgrammingError('The cursor is already closed.') if self._id is not None: self._connection._client.close_statement(self._connection._id, self._id) self._id = None @@ -153,7 +153,7 @@ class Cursor(object): if frame.rows: self._pos = 0 elif not frame.done: - raise InternalError('got an empty frame, but the statement is not done yet') + raise InternalError('Got an empty frame, but the statement is not done yet.') def _fetch_next_frame(self): offset = self._frame.offset + len(self._frame.rows) @@ -199,7 +199,7 @@ class Cursor(object): typed_value.type = common_pb2.ARRAY typed_value.component_type = rep else: - raise ProgrammingError('scalar value specified for array parameter') + raise ProgrammingError('Scalar value specified for array parameter.') else: if mutate_to is not None: value = mutate_to(value) @@ -211,7 +211,7 @@ class Cursor(object): def execute(self, operation, parameters=None): if self._closed: - raise ProgrammingError('the cursor is already closed') + raise ProgrammingError('The cursor is already closed.') self._updatecount = -1 self._set_frame(None) if parameters is None: @@ -235,18 +235,16 @@ class Cursor(object): def executemany(self, operation, seq_of_parameters): if self._closed: - raise ProgrammingError('the cursor is already closed') + raise ProgrammingError('The cursor is already closed.') self._updatecount = -1 self._set_frame(None) statement = self._connection._client.prepare( self._connection._id, operation, max_rows_total=0) self._set_id(statement.id) self._set_signature(statement.signature) - for parameters in seq_of_parameters: - self._connection._client.execute( - self._connection._id, self._id, - statement.signature, self._transform_parameters(parameters), - first_frame_max_size=0) + return self._connection._client.execute_batch( + self._connection._id, self._id, + [self._transform_parameters(p) for p in seq_of_parameters]) def _transform_row(self, row): """Transforms a Row into Python values. @@ -291,7 +289,7 @@ class Cursor(object): def fetchone(self): if self._frame is None: - raise ProgrammingError('no select statement was executed') + raise ProgrammingError('No select statement was executed.') if self._pos is None: return None rows = self._frame.rows diff --git a/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py b/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py index 61cb4b8..b402322 100644 --- a/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py +++ b/python-phoenixdb/phoenixdb/sqlalchemy_phoenix.py @@ -35,7 +35,7 @@ class PhoenixDDLCompiler(DDLCompiler): def visit_primary_key_constraint(self, constraint): if constraint.name is None: - raise CompileError("can't create primary key without a name") + raise CompileError("Can't create primary key without a name.") return DDLCompiler.visit_primary_key_constraint(self, constraint)