IMPALA-4359: qgen: add UPSERT support UPSERTs are very similar to INSERTs, so the UPSERT support is simply folded into that of INSERT. We do this by adding another "conflict action", CONFLICT_ACTION_UPDATE. The object responsible for holding the conflict_action attribute is now the InsertClause. This is needed here because the SqlWriter now needs to know the conflict_action both when writing the InsertClause (Impala) and at the tail end of the InsertStatement (PostgreSQL). We also add a few properties to the InsertStatement interface so that the PostgresqlSqlWriter can form the correct "DO UPDATE" conflic action, in which primary key columns and updatable columns must be known. More information on that here:
https://www.postgresql.org/docs/9.5/static/sql-insert.html By default, we will tend to generate 3 UPSERTs for every 1 INSERT. In addition to adding unit tests to make sure UPSERTs are properly written, I used discrepancy_searcher.py --profile dmlonly, both with and without --explain-only, do run tests. I made sure we were generating syntactically valid UPSERT statements, and that the INSERT/UPSERT ratio was roughly 1/3 after 100 statements. Change-Id: I6382f6ab22ba29c117e39a5d90592d3637df4b25 Reviewed-on: http://gerrit.cloudera.org:8080/5795 Reviewed-by: Taras Bobrovytsky <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0154ace6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0154ace6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0154ace6 Branch: refs/heads/master Commit: 0154ace61feff65917388dd08591cb9d0d4369ed Parents: 39987d9 Author: Michael Brown <[email protected]> Authored: Mon Jan 23 10:55:47 2017 -0800 Committer: Impala Public Jenkins <[email protected]> Committed: Sat Feb 4 06:13:34 2017 +0000 ---------------------------------------------------------------------- tests/comparison/discrepancy_searcher.py | 18 +-- tests/comparison/model_translator.py | 29 +++- tests/comparison/query.py | 104 +++++++++---- tests/comparison/query_profile.py | 25 ++- tests/comparison/statement_generator.py | 35 +++-- tests/comparison/tests/query_object_testdata.py | 155 ++++++++++++++++++- 6 files changed, 296 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0154ace6/tests/comparison/discrepancy_searcher.py ---------------------------------------------------------------------- diff --git a/tests/comparison/discrepancy_searcher.py b/tests/comparison/discrepancy_searcher.py index 4b0bce9..6534e4d 100755 --- a/tests/comparison/discrepancy_searcher.py +++ b/tests/comparison/discrepancy_searcher.py @@ -660,13 +660,13 @@ class QueryResultDiffSearcher(object): copy_select_query.from_clause = FromClause(src_table) if new_table.primary_keys: - conflict_action = InsertStatement.CONFLICT_ACTION_IGNORE + conflict_action = InsertClause.CONFLICT_ACTION_IGNORE else: - conflict_action = InsertStatement.CONFLICT_ACTION_DEFAULT + conflict_action = InsertClause.CONFLICT_ACTION_DEFAULT table_copy_statement = InsertStatement( - insert_clause=InsertClause(new_table), select_query=copy_select_query, - conflict_action=conflict_action, execution=StatementExecutionMode.DML_SETUP) + insert_clause=InsertClause(new_table, conflict_action=conflict_action), + select_query=copy_select_query, execution=StatementExecutionMode.DML_SETUP) result = self.query_result_comparator.compare_query_results(table_copy_statement) if result.error: @@ -703,15 +703,15 @@ class QueryResultDiffSearcher(object): dml_table = None if issubclass(statement_type, (InsertStatement,)): dml_choice_src_table = self.query_profile.choose_table(self.common_tables) - # Copy the table we want to INSERT INTO. Do this for the following reasons: + # Copy the table we want to INSERT/UPSERT INTO. Do this for the following reasons: # # 1. If we don't copy, the tables will get larger and larger # 2. If we want to avoid tables getting larger and larger, we have to come up # with some threshold about when to cut and start over. - # 3. If we keep INSERTing into tables and finally find a crash, we have to - # replay all previous INSERTs again. Those INSERTs may not produce the same rows - # as before. To maximize the chance of bug reproduction, run every INSERT on a - # pristine table. + # 3. If we keep INSERT/UPSERTing into tables and finally find a crash, we have to + # replay all previous INSERT/UPSERTs again. Those INSERTs may not produce the + # same rows as before. To maximize the chance of bug reproduction, run every + # INSERT/UPSERT on a pristine table. dml_table = self._concurrently_copy_table(dml_choice_src_table) statement = statement_generator.generate_statement( self.common_tables, dml_table=dml_table) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0154ace6/tests/comparison/model_translator.py ---------------------------------------------------------------------- diff --git a/tests/comparison/model_translator.py b/tests/comparison/model_translator.py index ef87bed..7064d5c 100644 --- a/tests/comparison/model_translator.py +++ b/tests/comparison/model_translator.py @@ -31,7 +31,7 @@ from tests.comparison.db_types import ( Timestamp, TinyInt, VarChar) -from tests.comparison.query import InsertStatement, Query +from tests.comparison.query import InsertClause, Query from tests.comparison.query_flattener import QueryFlattener LOG = getLogger(__name__) @@ -528,6 +528,20 @@ class ImpalaSqlWriter(SqlWriter): result = 'TRIM(%s)' % result return result + def _write_insert_clause(self, insert_clause): + sql = super(ImpalaSqlWriter, self)._write_insert_clause(insert_clause) + if insert_clause.conflict_action == InsertClause.CONFLICT_ACTION_UPDATE: + # The value of sql at this point would be something like: + # + # INSERT INTO <table name> [(column list)] + # + # If it happens that the table name or column list contains the text INSERT in an + # identifier, we want to ensure that the replace() call below does not alter their + # names but instead only modifiers the INSERT keyword to UPSERT. + return sql.replace('INSERT', 'UPSERT', 1) + else: + return sql + class OracleSqlWriter(SqlWriter): @@ -643,10 +657,19 @@ class PostgresqlSqlWriter(SqlWriter): def _write_insert_statement(self, insert_statement): sql = SqlWriter._write_insert_statement(self, insert_statement) - if insert_statement.conflict_action == InsertStatement.CONFLICT_ACTION_DEFAULT: + if insert_statement.conflict_action == InsertClause.CONFLICT_ACTION_DEFAULT: pass - elif insert_statement.conflict_action == InsertStatement.CONFLICT_ACTION_IGNORE: + elif insert_statement.conflict_action == InsertClause.CONFLICT_ACTION_IGNORE: sql += '\nON CONFLICT DO NOTHING' + elif insert_statement.conflict_action == InsertClause.CONFLICT_ACTION_UPDATE: + if insert_statement.updatable_column_names: + primary_keys = insert_statement.primary_key_string + columns = ',\n'.join('{name} = EXCLUDED.{name}'.format(name=name) for name in + insert_statement.updatable_column_names) + sql += '\nON CONFLICT {primary_keys}\nDO UPDATE SET\n{columns}'.format( + primary_keys=primary_keys, columns=columns) + else: + sql += '\nON CONFLICT DO NOTHING' else: raise Exception('InsertStatement has unsupported conflict_action: {0}'.format( insert_statement.conflict_action)) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0154ace6/tests/comparison/query.py ---------------------------------------------------------------------- diff --git a/tests/comparison/query.py b/tests/comparison/query.py index 022dd9c..f7bd593 100644 --- a/tests/comparison/query.py +++ b/tests/comparison/query.py @@ -702,19 +702,69 @@ class LimitClause(object): class InsertClause(object): - def __init__(self, table, column_list=None): + # This enum represents possibilities for different types of INSERTs. A user of this + # object, like StatementGenerator, is responsible for setting the conflict_action + # value appropriately. These values are valid for the conflict_action parameter. + # Because an InsertStatement is a single piece of data shared across multiple SQL + # dialects, this setting can alter the written SQL in multiple dialects. + # + # CONLICT_ACTION_DEFAULT + # + # For Impala, this is a statement like INSERT INTO hdfs_table SELECT * FROM foo + # For PostgreSQL, this is a statement like INSERT INTO hdfs_table SELECT * FROM foo + # + # Example uses cases: inserting into tables that do not have primary keys, or + # inserting into PostgreSQL tables where you want to error if there are attempts to + # insert duplicate primary keys + # + # CONFLICT_ACTION_IGNORE + # + # For Impala, this is a statement like INSERT INTO kudu_table SELECT * FROM foo + # For PostgreSQL, this is a statement like INSERT INTO kudu_table SELECT * FROM foo + # ON CONFLICT DO NOTHING + # + # Example use case: inserting into Kudu tables, where attempts to insert duplicate + # primary key rows are ignored by Impala, so they must also be ignored by PostgreSQL. + # Note that the *syntax* for INSERT doesn't change with Impala, but because it's a + # Kudu table, the behavior differs. + # + # CONFLICT_ACTION_UPDATE + # + # For Impala, this is a statement like UPSERT INTO kudu_table SELECT * FROM foo + # For PostgreSQL, this is a statement like INSERT INTO kudu_table SELECT * FROM foo + # ON CONFLICT DO UPDATE SET + # (col1 = EXCLUDED.col1, ...) + # + # Example use case: upserting into Kudu tables, where attempts to insert duplicate + # primary key rows will either insert a single row, or update a single row already + # there, without error. In PostgreSQL, UPSERT is written via this "ON CONFLICT DO + # UPDATE" clause. + # + # More on PostgreSQL INSERT/UPSERT syntax here: + # https://www.postgresql.org/docs/9.5/static/sql-insert.html + + (CONFLICT_ACTION_DEFAULT, + CONFLICT_ACTION_IGNORE, + CONFLICT_ACTION_UPDATE) = range(3) + + def __init__(self, table, column_list=None, conflict_action=CONFLICT_ACTION_DEFAULT): """ - Represent an INSERT clause, which is the first half of an INSERT statement. The - table is a Table object. + Represent an INSERT/UPSERT clause, which is the first half of an INSERT/UPSERT + statement. Note that UPSERTs are very similar to INSERTs, so this data structure can + easily deal with both. - column_list is an optional list, tuple, or other sequence of - tests.comparison.common.Column objects. + The table is a Table object. - In an INSERT statement, it's a sequence of column names. See + column_list is an optional list, tuple, or other sequence of + tests.comparison.common.Column objects. In an Impala INSERT/UPSERT SQL statement, + it's a sequence of column names. See http://www.cloudera.com/documentation/enterprise/latest/topics/impala_insert.html + + conflict_action takes in one of the CONFLICT_ACTION_* class attributes. See above. """ self.table = table self.column_list = column_list + self.conflict_action = conflict_action class ValuesRow(object): @@ -728,32 +778,22 @@ class ValuesRow(object): class ValuesClause(object): def __init__(self, values_rows): """ - Represent the VALUES clause of an INSERT statement. The values_rows is a sequence of - ValuesRow objects. + Represent the VALUES clause of an INSERT/UPSERT statement. The values_rows is a + sequence of ValuesRow objects. """ self.values_rows = values_rows class InsertStatement(AbstractStatement): - (CONFLICT_ACTION_DEFAULT, - CONFLICT_ACTION_IGNORE) = range(2) - def __init__(self, with_clause=None, insert_clause=None, select_query=None, - values_clause=None, conflict_action=CONFLICT_ACTION_DEFAULT, - execution=None): + values_clause=None, execution=None): """ - Represent an INSERT statement. The INSERT may have an optional WithClause, and then - either a SELECT query (Query) object from whose rows we INSERT, or a VALUES clause, - but not both. + Represent an INSERT/UPSERT statement. Note that UPSERTs are very similar to INSERTs, + so this data structure can easily deal with both. - conflict_action takes in one of the CONFLICT_ACTION_* class attributes. On INSERT if - the conflict_action is CONFLICT_ACTION_DEFAULT, we write standard INSERT queries. - - If CONFLICT_ACTION_IGNORE is chosen instead, PostgreSQL INSERTs will use "ON - CONFLICT DO NOTHING". The syntax doesn't change for Impala, but the implied - semantics are needed: if we are INSERTing a Kudu table, conflict_action must be - CONFLICT_ACTION_IGNORE. + The INSERT/UPSERT may have an optional WithClause, and then either a SELECT query + (Query) object from whose rows we INSERT, or a VALUES clause, but not both. The execution attribute is used by the discrepancy_searcher to track whether this InsertStatement is some sort of setup operation or a true random statement test. @@ -766,7 +806,6 @@ class InsertStatement(AbstractStatement): self.values_clause = values_clause self.with_clause = with_clause self.insert_clause = insert_clause - self.conflict_action = conflict_action @property def select_query(self): @@ -777,7 +816,7 @@ class InsertStatement(AbstractStatement): if self.values_clause is None or select_query is None: self._select_query = select_query else: - raise Exception('An INSERT statement may not have both the select_query and ' + raise Exception('An INSERT/UPSERT statement may not have both the select_query and ' 'values_clause set: {select}; {values}'.format( select=select_query, values=self.values_clause)) @@ -790,7 +829,7 @@ class InsertStatement(AbstractStatement): if self.select_query is None or values_clause is None: self._values_clause = values_clause else: - raise Exception('An INSERT statement may not have both the select_query and ' + raise Exception('An INSERT/UPSERT statement may not have both the select_query and ' 'values_clause set: {select}; {values}'.format( select=self.select_query, values=values_clause)) @@ -815,3 +854,16 @@ class InsertStatement(AbstractStatement): @property def dml_table(self): return self.insert_clause.table + + @property + def conflict_action(self): + return self.insert_clause.conflict_action + + @property + def primary_key_string(self): + return '({primary_key_list})'.format( + primary_key_list=', '.join(self.insert_clause.table.primary_key_names)) + + @property + def updatable_column_names(self): + return self.insert_clause.table.updatable_column_names http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0154ace6/tests/comparison/query_profile.py ---------------------------------------------------------------------- diff --git a/tests/comparison/query_profile.py b/tests/comparison/query_profile.py index 06b8815..81a98a1 100644 --- a/tests/comparison/query_profile.py +++ b/tests/comparison/query_profile.py @@ -27,6 +27,7 @@ from tests.comparison.db_types import ( TYPES, Timestamp) from tests.comparison.query import ( + InsertClause, InsertStatement, Query, StatementExecutionMode, @@ -211,7 +212,10 @@ class DefaultProfile(object): 'none': 1}, 'VALUES_ITEM_EXPR': { 'constant': 1, - 'function': 2}} + 'function': 2}, + 'INSERT_UPSERT': { + InsertClause.CONFLICT_ACTION_IGNORE: 1, + InsertClause.CONFLICT_ACTION_UPDATE: 3}} # On/off switches self._flags = { @@ -631,16 +635,16 @@ class DefaultProfile(object): def choose_insert_source_clause(self): """ - Returns whether we generate an INSERT SELECT or an INSERT VALUES + Returns whether we generate an INSERT/UPSERT SELECT or an INSERT/UPSERT VALUES """ return self._choose_from_weights('INSERT_SOURCE_CLAUSE') def choose_insert_column_list(self, table): """ - Decide whether or not an INSERT will be in the form of: - INSERT INTO table SELECT|VALUES ... + Decide whether or not an INSERT/UPSERT will be in the form of: + INSERT/UPSERT INTO table SELECT|VALUES ... or - INSERT INTO table (col1, col2, ...) SELECT|VALUES ... + INSERT/UPSERT INTO table (col1, col2, ...) SELECT|VALUES ... If the second form, the column list is shuffled. The column list will always contain the primary key columns and between 0 and all additional columns. """ @@ -649,7 +653,8 @@ class DefaultProfile(object): min_additional_insert_cols = 0 if columns_to_insert else 1 remaining_columns = [col for col in table.cols if not col.is_primary_key] shuffle(remaining_columns) - additional_column_count = randint(min_additional_insert_cols, len(remaining_columns)) + additional_column_count = randint(min_additional_insert_cols, + len(remaining_columns)) columns_to_insert.extend(remaining_columns[:additional_column_count]) shuffle(columns_to_insert) return columns_to_insert @@ -658,7 +663,7 @@ class DefaultProfile(object): def choose_insert_values_row_count(self): """ - Choose the number of rows to insert in an INSERT VALUES + Choose the number of rows to insert in an INSERT/UPSERT VALUES """ return self._choose_from_bounds('INSERT_VALUES_ROWS') @@ -669,6 +674,12 @@ class DefaultProfile(object): """ return self._choose_from_weights('VALUES_ITEM_EXPR') + def choose_insert_vs_upsert(self): + """ + Choose whether a particular insertion-type statement will be INSERT or UPSERT. + """ + return self._choose_from_weights('INSERT_UPSERT') + class ImpalaNestedTypesProfile(DefaultProfile): http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0154ace6/tests/comparison/statement_generator.py ---------------------------------------------------------------------- diff --git a/tests/comparison/statement_generator.py b/tests/comparison/statement_generator.py index 55496ac..3784257 100644 --- a/tests/comparison/statement_generator.py +++ b/tests/comparison/statement_generator.py @@ -33,20 +33,21 @@ class InsertStatementGenerator(object): def __init__(self, profile): # QueryProfile-like object self.profile = profile - # used to generate SELECT queries for INSERT ... SELECT statements; + # used to generate SELECT queries for INSERT/UPSERT ... SELECT statements; # to ensure state is completely reset, this is created anew with each call to # generate_statement() self.select_stmt_generator = None def generate_statement(self, tables, dml_table): """ - Return a randomly generated INSERT statement. + Return a randomly generated INSERT or UPSERT statement. Note that UPSERTs are very + similar to INSERTs, which is why this generator handles both. tables should be a list of Table objects. A typical source of such a list comes from db_connection.DbCursor.describe_common_tables(). This list describes the possible - "sources" of the INSERT's WITH and FROM/WHERE clauses. + "sources" of the INSERT/UPSERT's WITH and FROM/WHERE clauses. - dml_table is a required Table object. The INSERT will be into this table. + dml_table is a required Table object. The INSERT/UPSERT will be into this table. """ if not (isinstance(tables, list) and len(tables) > 0 and all((isinstance(t, Table) for t in tables))): @@ -57,23 +58,23 @@ class InsertStatementGenerator(object): self.select_stmt_generator = QueryGenerator(self.profile) - if dml_table.primary_keys: - insert_statement = InsertStatement( - conflict_action=InsertStatement.CONFLICT_ACTION_IGNORE) - else: - insert_statement = InsertStatement( - conflict_action=InsertStatement.CONFLICT_ACTION_DEFAULT) - - insert_statement.execution = StatementExecutionMode.DML_TEST + insert_statement = InsertStatement(execution=StatementExecutionMode.DML_TEST) # Choose whether this is a - # INSERT INTO table SELECT/VALUES + # INSERT/UPSERT INTO table SELECT/VALUES # or - # INSERT INTO table (col1, col2, ...) SELECT/VALUES + # INSERT/UPSERT INTO table (col1, col2, ...) SELECT/VALUES # If the method returns None, it's the former. insert_column_list = self.profile.choose_insert_column_list(dml_table) + + if dml_table.primary_keys: + # Having primary keys implies the table is a Kudu table, which makes it subject to + # both INSERTs (with automatic ignoring of primary key duplicates) and UPSERTs. + conflict_action = self.profile.choose_insert_vs_upsert() + else: + conflict_action = InsertClause.CONFLICT_ACTION_DEFAULT insert_statement.insert_clause = InsertClause( - dml_table, column_list=insert_column_list) + dml_table, column_list=insert_column_list, conflict_action=conflict_action) # We still need to internally track the columns we're inserting. Keep in mind None # means "all" without an explicit column list. Since we've already created the # InsertClause object though, we can fill this in for ourselves. @@ -81,7 +82,7 @@ class InsertStatementGenerator(object): insert_column_list = dml_table.cols insert_item_data_types = [col.type for col in insert_column_list] - # Decide whether this is INSERT VALUES or INSERT SELECT + # Decide whether this is INSERT/UPSERT VALUES or INSERT/UPSERT SELECT insert_source_clause = self.profile.choose_insert_source_clause() if issubclass(insert_source_clause, Query): @@ -99,7 +100,7 @@ class InsertStatementGenerator(object): elif issubclass(insert_source_clause, ValuesClause): insert_statement.values_clause = self._generate_values_clause(insert_column_list) else: - raise Exception('unsupported INSERT source clause: {0}'.format( + raise Exception('unsupported INSERT/UPSERT source clause: {0}'.format( insert_source_clause)) return insert_statement http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0154ace6/tests/comparison/tests/query_object_testdata.py ---------------------------------------------------------------------- diff --git a/tests/comparison/tests/query_object_testdata.py b/tests/comparison/tests/query_object_testdata.py index 4861781..154c8b1 100644 --- a/tests/comparison/tests/query_object_testdata.py +++ b/tests/comparison/tests/query_object_testdata.py @@ -87,6 +87,24 @@ KUDU_TABLE = FakeTable( ] ) +FOUR_COL_KUDU_TABLE = FakeTable( + 'four_col_kudu_table', + [ + FakeColumn('int_col1', Int, is_primary_key=True), + FakeColumn('char_col1', Char, is_primary_key=True), + FakeColumn('int_col2', Int), + FakeColumn('char_col2', Char), + ] +) + + +ONE_COL_KUDU_TABLE = FakeTable( + 'one_col_kudu_table', + [ + FakeColumn('int_col', Int, is_primary_key=True), + ] +) + # This can't be used inline because we need its table expressions later. SIMPLE_WITH_CLAUSE = WithClause( TableExprList([ @@ -388,12 +406,13 @@ INSERT_QUERY_TEST_CASES = [ InsertStatementTest( testid='insert into table select cols ignore conflicts', query=InsertStatement( - insert_clause=InsertClause(KUDU_TABLE), + insert_clause=InsertClause( + KUDU_TABLE, + conflict_action=InsertClause.CONFLICT_ACTION_IGNORE), select_query=FakeQuery( select_clause=FakeSelectClause(*SIMPLE_TABLE.cols), from_clause=FromClause(SIMPLE_TABLE) ), - conflict_action=InsertStatement.CONFLICT_ACTION_IGNORE ), impala_query_string=( 'INSERT INTO kudu_table\n' @@ -414,12 +433,14 @@ INSERT_QUERY_TEST_CASES = [ InsertStatementTest( testid='insert 2 value rows ignore conflicts', query=InsertStatement( - insert_clause=InsertClause(KUDU_TABLE), + insert_clause=InsertClause( + KUDU_TABLE, + conflict_action=InsertClause.CONFLICT_ACTION_IGNORE, + ), values_clause=ValuesClause(( ValuesRow((Int(1), Char('a'))), ValuesRow((Int(2), Char('b'))), )), - conflict_action=InsertStatement.CONFLICT_ACTION_IGNORE ), impala_query_string=( 'INSERT INTO kudu_table\n' @@ -439,13 +460,15 @@ INSERT_QUERY_TEST_CASES = [ testid='insert values seleted from with clause ignore conflicts', query=InsertStatement( with_clause=SIMPLE_WITH_CLAUSE, - insert_clause=InsertClause(KUDU_TABLE, - column_list=(KUDU_TABLE.cols[0],)), + insert_clause=InsertClause( + KUDU_TABLE, + column_list=(KUDU_TABLE.cols[0],), + conflict_action=InsertClause.CONFLICT_ACTION_IGNORE, + ), select_query=FakeQuery( select_clause=FakeSelectClause(*SIMPLE_WITH_CLAUSE.table_exprs[0].cols), from_clause=FromClause(SIMPLE_WITH_CLAUSE.table_exprs[0]) ), - conflict_action=InsertStatement.CONFLICT_ACTION_IGNORE ), impala_query_string=( 'WITH with_view AS (SELECT\n' @@ -466,5 +489,121 @@ INSERT_QUERY_TEST_CASES = [ 'FROM with_view\n' 'ON CONFLICT DO NOTHING' ), - ) + ), + InsertStatementTest( + testid='upsert into table select cols', + query=InsertStatement( + insert_clause=InsertClause( + KUDU_TABLE, + conflict_action=InsertClause.CONFLICT_ACTION_UPDATE), + select_query=FakeQuery( + select_clause=FakeSelectClause(*SIMPLE_TABLE.cols), + from_clause=FromClause(SIMPLE_TABLE) + ), + ), + impala_query_string=( + 'UPSERT INTO kudu_table\n' + 'SELECT\n' + 'fake_table.int_col,\n' + 'TRIM(fake_table.char_col)\n' + 'FROM fake_table' + ), + postgres_query_string=( + 'INSERT INTO kudu_table\n' + 'SELECT\n' + 'fake_table.int_col,\n' + 'fake_table.char_col\n' + 'FROM fake_table\n' + 'ON CONFLICT (int_col)\n' + 'DO UPDATE SET\n' + 'char_col = EXCLUDED.char_col' + ), + ), + InsertStatementTest( + testid='upsert 2 value rows', + query=InsertStatement( + insert_clause=InsertClause( + KUDU_TABLE, + conflict_action=InsertClause.CONFLICT_ACTION_UPDATE, + ), + values_clause=ValuesClause(( + ValuesRow((Int(1), Char('a'))), + ValuesRow((Int(2), Char('b'))), + )), + ), + impala_query_string=( + 'UPSERT INTO kudu_table\n' + 'VALUES\n' + "(1, 'a'),\n" + "(2, 'b')" + ), + postgres_query_string=( + 'INSERT INTO kudu_table\n' + 'VALUES\n' + "(1, 'a' || ''),\n" + "(2, 'b' || '')\n" + 'ON CONFLICT (int_col)\n' + 'DO UPDATE SET\n' + 'char_col = EXCLUDED.char_col' + ), + ), + InsertStatementTest( + testid='upsert select into table with multiple pk / updatable columns', + query=InsertStatement( + insert_clause=InsertClause( + FOUR_COL_KUDU_TABLE, + conflict_action=InsertClause.CONFLICT_ACTION_UPDATE), + select_query=FakeQuery( + select_clause=FakeSelectClause(*FOUR_COL_KUDU_TABLE.cols), + from_clause=FromClause(FOUR_COL_KUDU_TABLE) + ), + ), + impala_query_string=( + 'UPSERT INTO four_col_kudu_table\n' + 'SELECT\n' + 'four_col_kudu_table.int_col1,\n' + 'TRIM(four_col_kudu_table.char_col1),\n' + 'four_col_kudu_table.int_col2,\n' + 'TRIM(four_col_kudu_table.char_col2)\n' + 'FROM four_col_kudu_table' + ), + postgres_query_string=( + 'INSERT INTO four_col_kudu_table\n' + 'SELECT\n' + 'four_col_kudu_table.int_col1,\n' + 'four_col_kudu_table.char_col1,\n' + 'four_col_kudu_table.int_col2,\n' + 'four_col_kudu_table.char_col2\n' + 'FROM four_col_kudu_table\n' + 'ON CONFLICT (int_col1, char_col1)\n' + 'DO UPDATE SET\n' + 'int_col2 = EXCLUDED.int_col2,\n' + 'char_col2 = EXCLUDED.char_col2' + ), + ), + InsertStatementTest( + testid='upsert select into table with no updatable columns', + query=InsertStatement( + insert_clause=InsertClause( + ONE_COL_KUDU_TABLE, + conflict_action=InsertClause.CONFLICT_ACTION_UPDATE), + select_query=FakeQuery( + select_clause=FakeSelectClause(SIMPLE_TABLE.cols[0]), + from_clause=FromClause(SIMPLE_TABLE) + ), + ), + impala_query_string=( + 'UPSERT INTO one_col_kudu_table\n' + 'SELECT\n' + 'fake_table.int_col\n' + 'FROM fake_table' + ), + postgres_query_string=( + 'INSERT INTO one_col_kudu_table\n' + 'SELECT\n' + 'fake_table.int_col\n' + 'FROM fake_table\n' + 'ON CONFLICT DO NOTHING' + ), + ), ]
