This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new fb8c6305e8 PHOENIX-7651 Support RETURNING * with UPSERT and DELETE (#2226) fb8c6305e8 is described below commit fb8c6305e8f8ed42306e2910715616b15d4ad544 Author: Hari Krishna Dara <harid...@gmail.com> AuthorDate: Wed Aug 20 00:56:13 2025 +0530 PHOENIX-7651 Support RETURNING * with UPSERT and DELETE (#2226) --- .gitignore | 5 ++ phoenix-core-client/src/main/antlr3/PhoenixSQL.g | 18 +++-- .../org/apache/phoenix/jdbc/PhoenixStatement.java | 30 ++++---- .../org/apache/phoenix/parse/DeleteStatement.java | 10 ++- .../org/apache/phoenix/parse/ParseNodeFactory.java | 9 +-- .../phoenix/parse/RowReturningDMLStatement.java | 22 ++++++ .../org/apache/phoenix/parse/UpsertStatement.java | 28 +++----- .../java/org/apache/phoenix/util/TupleUtil.java | 22 ++++-- .../java/org/apache/phoenix/end2end/Bson4IT.java | 14 ++-- .../apache/phoenix/end2end/OnDuplicateKey2IT.java | 82 ++++++++++++---------- .../org/apache/phoenix/end2end/UpsertValuesIT.java | 50 +++++++++++-- .../org/apache/phoenix/parse/QueryParserTest.java | 70 ++++++++++++++++++ 12 files changed, 259 insertions(+), 101 deletions(-) diff --git a/.gitignore b/.gitignore index 0851a687cb..29049f43fd 100644 --- a/.gitignore +++ b/.gitignore @@ -39,3 +39,8 @@ phoenix-hbase-compat-1.5.0/ # Code generators .codegenie /.vscode/ + +# Some generated files +ID +tags +filenametags diff --git a/phoenix-core-client/src/main/antlr3/PhoenixSQL.g b/phoenix-core-client/src/main/antlr3/PhoenixSQL.g index 5d3e0c926d..a2e913093e 100644 --- a/phoenix-core-client/src/main/antlr3/PhoenixSQL.g +++ b/phoenix-core-client/src/main/antlr3/PhoenixSQL.g @@ -160,6 +160,7 @@ tokens UNCOVERED = 'uncovered'; REGIONS = 'regions'; NOVERIFY = 'noverify'; + RETURNING = 'returning'; } @@ -866,9 +867,13 @@ finally{ contextStack.pop(); } upsert_node returns [UpsertStatement ret] : UPSERT (hint=hintClause)? INTO t=from_table_name (LPAREN p=upsert_column_refs RPAREN)? - ((VALUES LPAREN v=one_or_more_expressions RPAREN ( ON DUPLICATE KEY ( ig=IGNORE | - ( upd=UPDATE pairs=update_column_pairs ) | ( updo=UPDATE_ONLY upopairs=update_column_pairs ) ) )? ) - | s=select_node) + ((VALUES LPAREN v=one_or_more_expressions RPAREN ( + ON DUPLICATE KEY ( + ig=IGNORE + | ( upd=UPDATE pairs=update_column_pairs ) + | ( updo=UPDATE_ONLY upopairs=update_column_pairs ) + ) )? ) + | s=select_node) rc=( RETURNING ASTERISK )? {ret = factory.upsert( factory.namedTable(null,t,p == null ? null : p.getFirst()), hint, p == null ? null : p.getSecond(), @@ -879,7 +884,8 @@ upsert_node returns [UpsertStatement ret] ig != null ? UpsertStatement.OnDuplicateKeyType.IGNORE : upd != null ? UpsertStatement.OnDuplicateKeyType.UPDATE : updo != null ? UpsertStatement.OnDuplicateKeyType.UPDATE_ONLY - : UpsertStatement.OnDuplicateKeyType.NONE); } + : UpsertStatement.OnDuplicateKeyType.NONE, + rc != null ? true : false); } ; update_column_pairs returns [ List<Pair<ColumnName,ParseNode>> ret] @@ -924,7 +930,9 @@ delete_node returns [DeleteStatement ret] (WHERE v=expression)? (ORDER BY order=order_by)? (LIMIT l=limit)? - {ret = factory.delete(factory.namedTable(null,t), hint, v, order, l, getBindCount(), new HashMap<String, UDFParseNode>(udfParseNodes)); } + rc=(RETURNING ASTERISK)? + {ret = factory.delete(factory.namedTable(null,t), hint, v, order, l, getBindCount(), new + HashMap<String, UDFParseNode>(udfParseNodes), rc != null ? true : false); } ; limit returns [LimitNode ret] diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index c0cb8a7339..8ef621d535 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -179,6 +179,7 @@ import org.apache.phoenix.parse.PFunction; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.PrimaryKeyConstraint; +import org.apache.phoenix.parse.RowReturningDMLStatement; import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.parse.ShowCreateTable; @@ -580,15 +581,15 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable return target; } - private boolean isResultSetExpected(final CompilableStatement stmt) { - return stmt instanceof ExecutableUpsertStatement - && ((ExecutableUpsertStatement) stmt).getOnDupKeyPairs() != null; + private boolean isReturningRowStatement(final CompilableStatement stmt) { + return stmt instanceof RowReturningDMLStatement + && ((RowReturningDMLStatement) stmt).isReturningRow(); } protected int executeMutation(final CompilableStatement stmt, final AuditQueryLogger queryLogger) throws SQLException { return executeMutation(stmt, true, queryLogger, - isResultSetExpected(stmt) ? ReturnResult.NEW_ROW_ON_SUCCESS : null).getFirst(); + isReturningRowStatement(stmt) ? ReturnResult.NEW_ROW_ON_SUCCESS : null).getFirst(); } Pair<Integer, ResultSet> executeMutation(final CompilableStatement stmt, @@ -695,7 +696,8 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable : (isDelete ? ((ExecutableDeleteStatement) stmt).getTable().getName() : null); ResultSet rs = result == null || result.isEmpty() ? null - : TupleUtil.getResultSet(new ResultTuple(result), tableNameVal, connection); + : TupleUtil.getResultSet(new ResultTuple(result), tableNameVal, connection, + !isReturningRowStatement(stmt)); setLastResultSet(rs); return new Pair<>(lastUpdateCount, rs); } @@ -1181,9 +1183,9 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable private ExecutableUpsertStatement(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName, ParseNode>> onDupKeyPairs, - OnDuplicateKeyType onDupKeyType) { + OnDuplicateKeyType onDupKeyType, boolean returningRow) { super(table, hintNode, columns, values, select, bindCount, udfParseNodes, onDupKeyPairs, - onDupKeyType); + onDupKeyType, returningRow); } @SuppressWarnings("unchecked") @@ -1204,8 +1206,8 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable implements CompilableStatement { private ExecutableDeleteStatement(NamedTableNode table, HintNode hint, ParseNode whereNode, List<OrderByNode> orderBy, LimitNode limit, int bindCount, - Map<String, UDFParseNode> udfParseNodes) { - super(table, hint, whereNode, orderBy, limit, bindCount, udfParseNodes); + Map<String, UDFParseNode> udfParseNodes, boolean returningRow) { + super(table, hint, whereNode, orderBy, limit, bindCount, udfParseNodes, returningRow); } @SuppressWarnings("unchecked") @@ -2126,9 +2128,9 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable public ExecutableUpsertStatement upsert(NamedTableNode table, HintNode hintNode, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName, ParseNode>> onDupKeyPairs, - UpsertStatement.OnDuplicateKeyType onDupKeyType) { + UpsertStatement.OnDuplicateKeyType onDupKeyType, boolean returningRow) { return new ExecutableUpsertStatement(table, hintNode, columns, values, select, bindCount, - udfParseNodes, onDupKeyPairs, onDupKeyType); + udfParseNodes, onDupKeyPairs, onDupKeyType, returningRow); } @Override @@ -2155,9 +2157,9 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable @Override public ExecutableDeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode whereNode, List<OrderByNode> orderBy, LimitNode limit, int bindCount, - Map<String, UDFParseNode> udfParseNodes) { + Map<String, UDFParseNode> udfParseNodes, boolean returningRow) { return new ExecutableDeleteStatement(table, hint, whereNode, orderBy, limit, bindCount, - udfParseNodes); + udfParseNodes, returningRow); } @Override @@ -2667,7 +2669,7 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable } executeMutation(stmt, createAuditQueryLogger(stmt, sql)); flushIfNecessary(); - return isResultSetExpected(stmt); + return isReturningRowStatement(stmt); } executeQuery(stmt, createQueryLogger(stmt, sql)); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/DeleteStatement.java b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/DeleteStatement.java index b4e82e0209..be33d7938a 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/DeleteStatement.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/DeleteStatement.java @@ -22,20 +22,23 @@ import java.util.List; import java.util.Map; import org.apache.phoenix.jdbc.PhoenixStatement.Operation; -public class DeleteStatement extends DMLStatement implements FilterableStatement { +public class DeleteStatement extends DMLStatement + implements FilterableStatement, RowReturningDMLStatement { private final ParseNode whereNode; private final List<OrderByNode> orderBy; private final LimitNode limit; private final HintNode hint; + private final boolean returningRow; public DeleteStatement(NamedTableNode table, HintNode hint, ParseNode whereNode, List<OrderByNode> orderBy, LimitNode limit, int bindCount, - Map<String, UDFParseNode> udfParseNodes) { + Map<String, UDFParseNode> udfParseNodes, boolean returningRow) { super(table, bindCount, udfParseNodes); this.whereNode = whereNode; this.orderBy = orderBy == null ? Collections.<OrderByNode> emptyList() : orderBy; this.limit = limit; this.hint = hint == null ? HintNode.EMPTY_HINT_NODE : hint; + this.returningRow = returningRow; } @Override @@ -83,4 +86,7 @@ public class DeleteStatement extends DMLStatement implements FilterableStatement throw new UnsupportedOperationException("Table sampling is not allowd for Deletion"); } + public boolean isReturningRow() { + return returningRow; + } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java index d28dcc7837..6a22664695 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java @@ -900,9 +900,9 @@ public class ParseNodeFactory { public UpsertStatement upsert(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName, ParseNode>> onDupKeyPairs, - UpsertStatement.OnDuplicateKeyType onDupKeyType) { + UpsertStatement.OnDuplicateKeyType onDupKeyType, boolean returningRow) { return new UpsertStatement(table, hint, columns, values, select, bindCount, udfParseNodes, - onDupKeyPairs, onDupKeyType); + onDupKeyPairs, onDupKeyType, returningRow); } public CursorName cursorName(String name) { @@ -927,8 +927,9 @@ public class ParseNodeFactory { public DeleteStatement delete(NamedTableNode table, HintNode hint, ParseNode node, List<OrderByNode> orderBy, LimitNode limit, int bindCount, - Map<String, UDFParseNode> udfParseNodes) { - return new DeleteStatement(table, hint, node, orderBy, limit, bindCount, udfParseNodes); + Map<String, UDFParseNode> udfParseNodes, boolean returningRow) { + return new DeleteStatement(table, hint, node, orderBy, limit, bindCount, udfParseNodes, + returningRow); } public SelectStatement select(SelectStatement statement, ParseNode where) { diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/RowReturningDMLStatement.java b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/RowReturningDMLStatement.java new file mode 100644 index 0000000000..ba368e7bbb --- /dev/null +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/RowReturningDMLStatement.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.parse; + +public interface RowReturningDMLStatement { + boolean isReturningRow(); +} diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/UpsertStatement.java b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/UpsertStatement.java index 279ddfa477..d89b48bdb5 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/parse/UpsertStatement.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/parse/UpsertStatement.java @@ -22,7 +22,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.util.Pair; -public class UpsertStatement extends DMLStatement { +public class UpsertStatement extends DMLStatement implements RowReturningDMLStatement { public enum OnDuplicateKeyType { NONE, @@ -37,29 +37,12 @@ public class UpsertStatement extends DMLStatement { private final HintNode hint; private final List<Pair<ColumnName, ParseNode>> onDupKeyPairs; private final OnDuplicateKeyType onDupKeyType; - - public UpsertStatement(NamedTableNode table, HintNode hint, List<ColumnName> columns, - List<ParseNode> values, SelectStatement select, int bindCount, - Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName, ParseNode>> onDupKeyPairs) { - super(table, bindCount, udfParseNodes); - this.columns = columns == null ? Collections.<ColumnName> emptyList() : columns; - this.values = values; - this.select = select; - this.hint = hint == null ? HintNode.EMPTY_HINT_NODE : hint; - this.onDupKeyPairs = onDupKeyPairs; - if (onDupKeyPairs == null) { - this.onDupKeyType = OnDuplicateKeyType.NONE; - } else if (onDupKeyPairs.isEmpty()) { - this.onDupKeyType = OnDuplicateKeyType.IGNORE; - } else { - this.onDupKeyType = OnDuplicateKeyType.UPDATE; - } - } + private final boolean returningRow; public UpsertStatement(NamedTableNode table, HintNode hint, List<ColumnName> columns, List<ParseNode> values, SelectStatement select, int bindCount, Map<String, UDFParseNode> udfParseNodes, List<Pair<ColumnName, ParseNode>> onDupKeyPairs, - OnDuplicateKeyType onDupKeyType) { + OnDuplicateKeyType onDupKeyType, boolean returningRow) { super(table, bindCount, udfParseNodes); this.columns = columns == null ? Collections.emptyList() : columns; this.values = values; @@ -67,6 +50,7 @@ public class UpsertStatement extends DMLStatement { this.hint = hint == null ? HintNode.EMPTY_HINT_NODE : hint; this.onDupKeyPairs = onDupKeyPairs; this.onDupKeyType = onDupKeyType; + this.returningRow = returningRow; } public List<ColumnName> getColumns() { @@ -92,4 +76,8 @@ public class UpsertStatement extends DMLStatement { public OnDuplicateKeyType getOnDupKeyType() { return onDupKeyType; } + + public boolean isReturningRow() { + return returningRow; + } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/TupleUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/TupleUtil.java index de70f23080..b8f6a769d1 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/TupleUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/TupleUtil.java @@ -24,6 +24,7 @@ import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER; +import edu.umd.cs.findbugs.annotations.SuppressWarnings; import java.io.DataOutput; import java.io.IOException; import java.sql.Connection; @@ -32,6 +33,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; + import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -226,14 +228,20 @@ public class TupleUtil { /** * Convert the given Tuple containing list of Cells to ResultSet with similar effect as if SELECT * * FROM <table-name> is queried. - * @param toProject Tuple to be projected. - * @param tableName Table name. - * @param conn Phoenix Connection object. + * @param toProject Tuple to be projected. + * @param tableName Table name. + * @param conn Phoenix Connection object. + * @param withPrefetch When {@code true}, the returned ResultSet is prefetched, otherwise one + * needs to call next() on it. * @return ResultSet for the give single row. * @throws SQLException If any SQL operation fails. */ - public static ResultSet getResultSet(Tuple toProject, TableName tableName, Connection conn) - throws SQLException { + @SuppressWarnings(value = "OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE", + justification = "Tge statement object needs to be kept open for the returned RS to be " + + "valid, however this is acceptable as not callingPhoenixStatement.close() " + + "causes no resource leak") + public static ResultSet getResultSet(Tuple toProject, TableName tableName, Connection conn, + boolean withPrefetch) throws SQLException { if (tableName == null) { return null; } @@ -268,7 +276,9 @@ public class TupleUtil { ResultSet newResultSet = new PhoenixPrefetchedResultSet(resultSet.getRowProjector(), resultSet.getContext(), Collections .singletonList(new ResultTuple(Result.create(Collections.singletonList(newCell))))); - newResultSet.next(); + if (withPrefetch) { + newResultSet.next(); + } return newResultSet; } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java index b1e534703e..f8884472ab 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java @@ -578,7 +578,8 @@ public class Bson4IT extends ParallelStatsDisabledIT { stmt = conn.prepareStatement( "UPSERT INTO " + tableName + " VALUES (?) ON DUPLICATE KEY UPDATE_ONLY COL = CASE WHEN" + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')" - + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END," + " C1 = ?"); + + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END," + + " C1 = ? RETURNING *"); stmt.setString(1, "pk0001"); stmt.setString(2, "0003"); @@ -611,7 +612,7 @@ public class Bson4IT extends ParallelStatsDisabledIT { stmt = conn.prepareStatement( "UPSERT INTO " + tableName + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN" + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')" - + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END"); + + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END RETURNING *"); stmt.setString(1, "pk1010"); @@ -634,7 +635,7 @@ public class Bson4IT extends ParallelStatsDisabledIT { stmt = conn.prepareStatement( "UPSERT INTO " + tableName + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN" + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')" - + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END"); + + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END RETURNING *"); stmt.setString(1, "pk1011"); @@ -659,7 +660,7 @@ public class Bson4IT extends ParallelStatsDisabledIT { stmt = conn.prepareStatement( "UPSERT INTO " + tableName + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN" + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')" - + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END"); + + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END RETURNING *"); stmt.setString(1, "pk0001"); // Conditional Upsert not successful @@ -668,7 +669,7 @@ public class Bson4IT extends ParallelStatsDisabledIT { stmt = conn.prepareStatement( "UPSERT INTO " + tableName + " VALUES (?) ON DUPLICATE KEY UPDATE_ONLY COL = CASE WHEN" + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')" - + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END"); + + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END RETURNING *"); // the row does not exist already stmt.setString(1, "pk000111"); @@ -680,7 +681,7 @@ public class Bson4IT extends ParallelStatsDisabledIT { stmt = conn.prepareStatement( "UPSERT INTO " + tableName + " VALUES (?) ON DUPLICATE KEY UPDATE COL = CASE WHEN" + " BSON_CONDITION_EXPRESSION(COL, '" + conditionDoc.toJson() + "')" - + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END"); + + " THEN BSON_UPDATE_EXPRESSION(COL, '" + updateExp + "') ELSE COL END RETURNING *"); // the row does not exist already stmt.setString(1, "pk000123456"); @@ -1085,6 +1086,7 @@ public class Bson4IT extends ParallelStatsDisabledIT { stmt.execute(); assertEquals(success ? 1 : 0, stmt.getUpdateCount()); ResultSet resultSet = stmt.getResultSet(); + assertTrue(resultSet.next()); assertEquals(jsonPath == null ? null : RawBsonDocument.parse(getJsonString(jsonPath)), resultSet.getObject(3)); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java index 6d0165f14f..b5a7b055f9 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OnDuplicateKey2IT.java @@ -124,8 +124,8 @@ public class OnDuplicateKey2IT extends ParallelStatsDisabledIT { createIndex(conn, tableName); conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES('a',10)"); - int actualReturnValue = conn.createStatement() - .executeUpdate("UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY IGNORE"); + int actualReturnValue = conn.createStatement().executeUpdate( + "UPSERT INTO " + tableName + " VALUES('a',0) ON DUPLICATE KEY IGNORE " + "" + "RETURNING *"); assertEquals(0, actualReturnValue); conn.close(); @@ -158,8 +158,8 @@ public class OnDuplicateKey2IT extends ParallelStatsDisabledIT { validateAtomicUpsertReturnRow(tableName, conn, bsonDocument1, bsonDocument2); - PreparedStatement ps = conn - .prepareStatement("DELETE FROM " + tableName + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ?"); + PreparedStatement ps = conn.prepareStatement( + "DELETE FROM " + tableName + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ? " + "RETURNING *"); ps.setString(1, "pk000"); ps.setDouble(2, -123.98); ps.setString(3, "pk003"); @@ -199,16 +199,16 @@ public class OnDuplicateKey2IT extends ParallelStatsDisabledIT { verifyIndexRow(conn, tableName, false); - PreparedStatement ps = conn.prepareStatement( - "DELETE FROM " + tableName + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ? AND COL4 = ?"); + PreparedStatement ps = conn.prepareStatement("DELETE FROM " + tableName + + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ? AND COL4 = ? " + "RETURNING *"); ps.setString(1, "pk000"); ps.setDouble(2, -123.98); ps.setString(3, "pk003"); ps.setInt(4, 235); validateReturnedRowAfterDelete(ps, "col2_001", true, false, bsonDocument2, 234); - ps = conn.prepareStatement( - "DELETE FROM " + tableName + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ? AND COL4 = ?"); + ps = conn.prepareStatement("DELETE FROM " + tableName + + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ? AND COL4 = ? " + "RETURNING *"); ps.setString(1, "pk000"); ps.setDouble(2, -123.98); ps.setString(3, "pk003"); @@ -252,16 +252,16 @@ public class OnDuplicateKey2IT extends ParallelStatsDisabledIT { validateAtomicUpsertReturnRow(tableName, conn, bsonDocument1, bsonDocument2); - PreparedStatement ps = conn.prepareStatement( - "DELETE FROM " + tableName + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ? AND COL4 = ?"); + PreparedStatement ps = conn.prepareStatement("DELETE FROM " + tableName + + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ? AND COL4 = ? " + "RETURNING *"); ps.setString(1, "pk000"); ps.setDouble(2, -123.98); ps.setString(3, "pk003"); ps.setInt(4, 235); validateReturnedRowAfterDelete(ps, "col2_001", true, false, bsonDocument2, 234); - ps = conn.prepareStatement( - "DELETE FROM " + tableName + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ? AND COL4 = ?"); + ps = conn.prepareStatement("DELETE FROM " + tableName + + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ? AND COL4 = ? " + "RETURNING *"); ps.setString(1, "pk000"); ps.setDouble(2, -123.98); ps.setString(3, "pk003"); @@ -304,16 +304,16 @@ public class OnDuplicateKey2IT extends ParallelStatsDisabledIT { validateAtomicUpsertOnlyReturnRow(tableName, conn, bsonDocument1, bsonDocument2); - PreparedStatement ps = conn.prepareStatement( - "DELETE FROM " + tableName + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ? AND COL4 = ?"); + PreparedStatement ps = conn.prepareStatement("DELETE FROM " + tableName + + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ? AND COL4 = ? " + "RETURNING *"); ps.setString(1, "pk000"); ps.setDouble(2, -123.98); ps.setString(3, "pk003"); ps.setInt(4, 235); validateReturnedRowAfterDelete(ps, "col2_001", true, false, bsonDocument2, 234); - ps = conn.prepareStatement( - "DELETE FROM " + tableName + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ? AND COL4 = ?"); + ps = conn.prepareStatement("DELETE FROM " + tableName + + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ? AND COL4 = ? " + "RETURNING *"); ps.setString(1, "pk000"); ps.setDouble(2, -123.98); ps.setString(3, "pk003"); @@ -356,8 +356,8 @@ public class OnDuplicateKey2IT extends ParallelStatsDisabledIT { validateAtomicUpsertReturnRow(tableName, conn, bsonDocument1, bsonDocument2); - PreparedStatement ps = conn - .prepareStatement("DELETE FROM " + tableName + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ?"); + PreparedStatement ps = conn.prepareStatement( + "DELETE FROM " + tableName + " WHERE PK1 = ? AND PK2 = ? AND PK3 = ? " + "RETURNING *"); ps.setString(1, "pk000"); ps.setDouble(2, -123.98); ps.setString(3, "pk003"); @@ -395,13 +395,13 @@ public class OnDuplicateKey2IT extends ParallelStatsDisabledIT { BsonDocument bsonDocument2) throws SQLException { addRows(tableName, conn); - PreparedStatement ps = - conn.prepareStatement("DELETE FROM " + tableName + " WHERE PK1 = ? AND PK2 = ?"); + PreparedStatement ps = conn + .prepareStatement("DELETE FROM " + tableName + " WHERE PK1 = ? AND PK2 = ? " + "RETURNING *"); ps.setString(1, "pk001"); ps.setDouble(2, 122.34); validateReturnedRowAfterDelete(ps, "col2_001", false, false, bsonDocument2, 234); - ps = conn.prepareStatement("DELETE FROM " + tableName); + ps = conn.prepareStatement("DELETE FROM " + tableName + " RETURNING *"); validateReturnedRowAfterDelete(ps, "col2_001", false, false, bsonDocument2, 234); ResultSet rs = conn.createStatement().executeQuery("SELECT * FROM " + tableName); @@ -409,8 +409,8 @@ public class OnDuplicateKey2IT extends ParallelStatsDisabledIT { addRows(tableName, conn); - ps = conn.prepareStatement( - "DELETE FROM " + tableName + " WHERE PK1 IN (?) AND PK2 IN (?) AND PK3 IN (?, ?)"); + ps = conn.prepareStatement("DELETE FROM " + tableName + + " WHERE PK1 IN (?) AND PK2 IN (?) AND PK3 IN (?, ?) " + "RETURNING *"); ps.setString(1, "pk001"); ps.setDouble(2, 122.34); ps.setString(3, "pk004"); @@ -421,12 +421,13 @@ public class OnDuplicateKey2IT extends ParallelStatsDisabledIT { private static void validateAtomicUpsertReturnRow(String tableName, Connection conn, BsonDocument bsonDocument1, BsonDocument bsonDocument2) throws SQLException { String upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3, COUNTER1, COL3, COL4)" - + " VALUES('pk000', -123.98, 'pk003', 1011.202, ?, 123) ON DUPLICATE KEY " + "IGNORE"; + + " VALUES('pk000', -123.98, 'pk003', 1011.202, ?, 123) ON DUPLICATE KEY " + "IGNORE " + + "RETURNING *"; validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 1011.202, null, true, bsonDocument1, bsonDocument1, 123); upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3, COUNTER1) " - + "VALUES('pk000', -123.98, 'pk003', 0) ON DUPLICATE KEY IGNORE"; + + "VALUES('pk000', -123.98, 'pk003', 0) ON DUPLICATE KEY IGNORE " + "RETURNING *"; validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 1011.202, null, false, null, bsonDocument1, 123); @@ -440,14 +441,15 @@ public class OnDuplicateKey2IT extends ParallelStatsDisabledIT { + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON DUPLICATE KEY UPDATE " + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 + 1999.99 ELSE COUNTER1" + " END, " + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 'col2_001' ELSE COUNTER2 " + "END, " - + "COL3 = ?, " + "COL4 = 234"; + + "COL3 = ?, " + "COL4 = 234 RETURNING *"; validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 2233.99, "col2_001", true, bsonDocument2, bsonDocument2, 234); upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON DUPLICATE KEY UPDATE " + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 + 1999.99 ELSE COUNTER1" + " END," - + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 'col2_001' ELSE COUNTER2 " + "END"; + + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 'col2_001' ELSE COUNTER2 " + "END " + + "RETURNING *"; validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 2233.99, "col2_001", false, null, bsonDocument2, 234); } @@ -455,12 +457,13 @@ public class OnDuplicateKey2IT extends ParallelStatsDisabledIT { private static void validateAtomicUpsertOnlyReturnRow(String tableName, Connection conn, BsonDocument bsonDocument1, BsonDocument bsonDocument2) throws SQLException { String upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3, COUNTER1, COL3, COL4)" - + " VALUES('pk000', -123.98, 'pk003', 1011.202, ?, 123) ON DUPLICATE KEY " + "IGNORE"; + + " VALUES('pk000', -123.98, 'pk003', 1011.202, ?, 123) ON DUPLICATE KEY " + "IGNORE " + + "RETURNING *"; validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 1011.202, null, true, bsonDocument1, bsonDocument1, 123); upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3, COUNTER1) " - + "VALUES('pk000', -123.98, 'pk003', 0) ON DUPLICATE KEY IGNORE"; + + "VALUES('pk000', -123.98, 'pk003', 0) ON DUPLICATE KEY IGNORE " + "RETURNING *"; validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 1011.202, null, false, null, bsonDocument1, 123); @@ -474,14 +477,15 @@ public class OnDuplicateKey2IT extends ParallelStatsDisabledIT { + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON DUPLICATE KEY UPDATE_ONLY " + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 + 1999.99 ELSE COUNTER1" + " END, " + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 'col2_001' ELSE COUNTER2 " + "END, " - + "COL3 = ?, " + "COL4 = 234"; + + "COL3 = ?, " + "COL4 = 234 RETURNING *"; validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 2233.99, "col2_001", true, bsonDocument2, bsonDocument2, 234); upsertSql = "UPSERT INTO " + tableName + " (PK1, PK2, PK3) VALUES('pk000', -123.98, 'pk003') ON DUPLICATE KEY UPDATE_ONLY " + "COUNTER1 = CASE WHEN COUNTER1 < 2000 THEN COUNTER1 + 1999.99 ELSE COUNTER1" + " END," - + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 'col2_001' ELSE COUNTER2 " + "END"; + + "COUNTER2 = CASE WHEN COUNTER2 = 'col2_000' THEN 'col2_001' ELSE COUNTER2 " + "END " + + "RETURNING *"; validateReturnedRowAfterUpsert(conn, upsertSql, tableName, 2233.99, "col2_001", false, null, bsonDocument2, 234); } @@ -505,13 +509,15 @@ public class OnDuplicateKey2IT extends ParallelStatsDisabledIT { private static void validateReturnedRowAfterDelete(PreparedStatement ps, String col2, boolean isSinglePointLookup, boolean atomicDeleteSuccessful, BsonDocument expectedDoc, Integer col4) throws SQLException { - final Pair<Integer, ResultSet> resultPair = - ps.unwrap(PhoenixPreparedStatement.class).executeAtomicUpdateReturnRow(); - ResultSet resultSet = resultPair.getSecond(); + ps.executeUpdate(); + ResultSet resultSet = ps.getResultSet(); if (!isSinglePointLookup) { assertNull(resultSet); return; } + if (resultSet != null) { + assertTrue(resultSet.next()); + } if (!atomicDeleteSuccessful) { assertTrue(resultSet == null || resultSet.getObject(4) == null); return; @@ -553,8 +559,9 @@ public class OnDuplicateKey2IT extends ParallelStatsDisabledIT { resultSet = stmt.execute(upsertSql) ? stmt.getResultSet() : null; updateCount = stmt.getUpdateCount(); } - boolean isOnDuplicateKey = upsertSql.toUpperCase().contains("ON DUPLICATE KEY"); - if (conn.getAutoCommit() && isOnDuplicateKey) { + boolean isReturningRow = upsertSql.toUpperCase().contains("RETURNING *"); + if (conn.getAutoCommit() && isReturningRow) { + assertTrue(resultSet.next()); assertEquals(success ? 1 : 0, updateCount); assertEquals("pk000", resultSet.getString(1)); assertEquals(-123.98, resultSet.getDouble(2), 0.0); @@ -591,8 +598,7 @@ public class OnDuplicateKey2IT extends ParallelStatsDisabledIT { updateCount = resultPair.getFirst(); resultSet = resultPair.getSecond(); } - boolean isOnDuplicateKey = upsertSql.toUpperCase().contains("ON DUPLICATE KEY"); - if (conn.getAutoCommit() && isOnDuplicateKey) { + if (conn.getAutoCommit()) { assertEquals(success ? 1 : 0, updateCount); if (resultSet != null) { assertEquals("pk000", resultSet.getString(1)); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java index 8ac0978864..d1e5b33e8e 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java @@ -23,6 +23,7 @@ import static org.apache.phoenix.util.TestUtil.closeStatement; import static org.apache.phoenix.util.TestUtil.closeStmtAndConn; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -101,6 +102,32 @@ public class UpsertValuesIT extends ParallelStatsDisabledIT { conn.close(); } + @Test + public void testPlainUpsertWithReturning() throws Exception { + String tableName = generateUniqueName(); + ensureTableCreated(getUrl(), tableName, TestUtil.PTSDB_NAME, null, null, null); + Properties props = new Properties(); + props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE)); + props.put("hbase.client.scanner.timeout.period", "6000000"); + props.put("phoenix.query.timeoutMs", "6000000"); + props.put("zookeeper.session.timeout", "6000000"); + props.put("hbase.rpc.timeout", "6000000"); + Connection conn = DriverManager.getConnection(getUrl(), props); + conn.setAutoCommit(true); + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + + " (inst,host,\"DATE\") VALUES(?,'b',CURRENT_DATE()) RETURNING *"); + stmt.setString(1, "a"); + stmt.execute(); + ResultSet rs = stmt.getResultSet(); + assertNotNull(rs); + assertTrue(rs.next()); + assertEquals(1, stmt.getUpdateCount()); + assertEquals("a", rs.getString(1)); + assertEquals("b", rs.getString(2)); + assertFalse(rs.next()); + conn.close(); + } + @Test public void testUpsertDateValues() throws Exception { String tableName = generateUniqueName(); @@ -865,7 +892,8 @@ public class UpsertValuesIT extends ParallelStatsDisabledIT { createTableStatement.execute(); } - testGlobalSequenceUpsertWithTenantConnection(tableName); + testGlobalSequenceUpsertWithTenantConnection(tableName, false); + testGlobalSequenceUpsertWithTenantConnection(tableName, true); testGlobalSequenceUpsertWithGlobalConnection(tableName); testTenantSequenceUpsertWithSameTenantConnection(tableName); testTenantSequenceUpsertWithDifferentTenantConnection(tableName); @@ -970,7 +998,8 @@ public class UpsertValuesIT extends ParallelStatsDisabledIT { } } - private void testGlobalSequenceUpsertWithTenantConnection(String tableName) throws Exception { + private void testGlobalSequenceUpsertWithTenantConnection(String tableName, + boolean withReturningRow) throws Exception { String sequenceName = generateUniqueSequenceName(); try (Connection conn = DriverManager.getConnection(getUrl())) { @@ -984,11 +1013,20 @@ public class UpsertValuesIT extends ParallelStatsDisabledIT { tenantConn.setAutoCommit(true); Statement executeUpdateStatement = tenantConn.createStatement(); - executeUpdateStatement - .execute(String.format("UPSERT INTO %s ( SEQUENCE_NUMBER) VALUES " + "( NEXT VALUE FOR %s)", - tableName, sequenceName)); + String sql = String.format("UPSERT INTO %s (SEQUENCE_NUMBER) VALUES " + "(NEXT VALUE FOR %s)", + tableName, sequenceName); + if (withReturningRow) { + sql += " RETURNING *"; + } + executeUpdateStatement.execute(sql); - ResultSet rs = executeUpdateStatement.executeQuery("select * from " + tableName); + ResultSet rs; + if (withReturningRow) { + rs = executeUpdateStatement.getResultSet(); + } else { + rs = executeUpdateStatement.executeQuery("select * from " + tableName); + } + assertNotNull(rs); assertTrue(rs.next()); assertEquals("1", rs.getString(1)); assertFalse(rs.next()); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java index 7eecceeb7c..1b5c8008f7 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/parse/QueryParserTest.java @@ -18,6 +18,7 @@ package org.apache.phoenix.parse; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -706,6 +707,75 @@ public class QueryParserTest { parseQuery(sql); } + @Test + public void testPlainUpsertReturningRow() throws Exception { + String sql = "upsert into t (k, v) values ( 1, 2 ) RETURNING *"; + UpsertStatement stmt = parseQuery(sql, UpsertStatement.class); + assertTrue(stmt.isReturningRow()); + } + + @Test + public void testPlainUpsertNotReturningRow() throws Exception { + String sql = "upsert into t (k, v) values ( 1, 2 )"; + UpsertStatement stmt = parseQuery(sql, UpsertStatement.class); + assertFalse(stmt.isReturningRow()); + } + + @Test + public void testUpsertWithOnDuplicateKey() throws Exception { + String sql = "upsert into t (k, v) values ( 1, 2 ) " + "ON DUPLICATE KEY UPDATE k = k + 1"; + parseQuery(sql); + } + + @Test + public void testUpsertInvalidReturningProjections() throws Exception { + String sql = + "upsert into t (k, v) values ( 1, 2 ) " + "ON DUPLICATE KEY UPDATE k = k + 1 RETURNING k"; + try { + parseQuery(sql); + fail(); + } catch (PhoenixParserException e) { + assertEquals(SQLExceptionCode.MISMATCHED_TOKEN.getErrorCode(), e.getErrorCode()); + } + } + + @Test + public void testUpsertReturningRow() throws Exception { + String sql = + "upsert into t (k, v) values ( 1, 2 ) " + "ON DUPLICATE KEY UPDATE k = k + 1 RETURNING *"; + UpsertStatement stmt = parseQuery(sql, UpsertStatement.class); + assertTrue(stmt.isReturningRow()); + } + + @Test + public void testDeleteReturningRow() throws Exception { + String sql = "delete from t RETURNING *"; + parseQuery(sql); + } + + @Test + public void testDeleteWhereReturningRow() throws Exception { + String sql = "DELETE FROM T WHERE PK1 = ? AND PK2 = ? RETURNING *"; + parseQuery(sql); + } + + @Test + public void testDeleteWithOrderLimitWhereReturningRow() throws Exception { + String sql = "DELETE FROM T WHERE PK1 = ? AND PK2 = ? ORDER BY PK2 LIMIT 1 RETURNING *"; + parseQuery(sql); + } + + @Test + public void testDeleteInvalidReturningRow() throws Exception { + String sql = "DELETE FROM T RETURNING PK1"; + try { + parseQuery(sql); + fail(); + } catch (PhoenixParserException e) { + assertEquals(SQLExceptionCode.MISMATCHED_TOKEN.getErrorCode(), e.getErrorCode()); + } + } + @Test public void testHavingWithNot() throws Exception { String sql = (("select\n" + "\"WEB_STAT_ALIAS\".\"DOMAIN\" as \"c0\"\n"