PHOENIX-3823 Force cache update on MetaDataEntityNotFoundException (Maddineni Sukumar)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c169802e Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c169802e Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c169802e Branch: refs/heads/4.x-HBase-0.98 Commit: c169802ef9c7ac5da9403c499cc2665096883f2f Parents: 189a55e Author: James Taylor <[email protected]> Authored: Fri Jun 2 09:24:27 2017 -0700 Committer: James Taylor <[email protected]> Committed: Fri Jun 2 10:23:02 2017 -0700 ---------------------------------------------------------------------- .../apache/phoenix/end2end/AlterTableIT.java | 2 +- .../UpdateCacheAcrossDifferentClientsIT.java | 313 +++++++++++++ .../apache/phoenix/compile/DeleteCompiler.java | 3 +- .../apache/phoenix/compile/FromCompiler.java | 59 ++- .../phoenix/compile/ProjectionCompiler.java | 4 +- .../apache/phoenix/compile/UpsertCompiler.java | 457 +++++++++---------- .../apache/phoenix/compile/WhereCompiler.java | 4 +- .../phoenix/exception/SQLExceptionCode.java | 2 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 41 +- .../apache/phoenix/optimize/QueryOptimizer.java | 4 +- .../query/ConnectionQueryServicesImpl.java | 4 +- .../schema/ColumnFamilyNotFoundException.java | 4 +- .../phoenix/schema/ColumnNotFoundException.java | 14 +- .../schema/FunctionNotFoundException.java | 2 +- .../apache/phoenix/schema/MetaDataClient.java | 2 +- .../schema/MetaDataEntityNotFoundException.java | 20 +- .../org/apache/phoenix/schema/PTableImpl.java | 26 +- .../phoenix/schema/SchemaNotFoundException.java | 10 +- .../schema/SequenceNotFoundException.java | 14 +- .../phoenix/schema/TableNotFoundException.java | 19 +- .../UpsertColumnsValuesMismatchException.java | 41 ++ .../org/apache/phoenix/util/UpgradeUtil.java | 2 +- 22 files changed, 714 insertions(+), 333 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c169802e/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java index 685f8f8..7bfb0b9 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java @@ -955,7 +955,7 @@ public class AlterTableIT extends ParallelStatsDisabledIT { fail(); } catch (SQLException e) { assertEquals(SQLExceptionCode.COLUMN_NOT_FOUND.getErrorCode(), e.getErrorCode()); - assertTrue(e.getMessage(), e.getMessage().contains("ERROR 504 (42703): Undefined column. columnName=COL5")); + assertTrue(e.getMessage(), e.getMessage().contains("ERROR 504 (42703): Undefined column. columnName="+dataTableFullName+".COL5")); } ddl = "ALTER TABLE " + dataTableFullName + " DROP COLUMN IF EXISTS col1"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/c169802e/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpdateCacheAcrossDifferentClientsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpdateCacheAcrossDifferentClientsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpdateCacheAcrossDifferentClientsIT.java new file mode 100644 index 0000000..d2715e3 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpdateCacheAcrossDifferentClientsIT.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.Map; +import java.util.Properties; + +import org.apache.phoenix.jdbc.PhoenixDriver; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class UpdateCacheAcrossDifferentClientsIT extends BaseUniqueNamesOwnClusterIT { + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> props = Maps.newConcurrentMap(); + props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.TRUE.toString()); + props.put(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3000)); + //When we run all tests together we are using global cluster(driver) + //so to make drop work we need to re register driver with DROP_METADATA_ATTRIB property + destroyDriver(); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + //Registering real Phoenix driver to have multiple ConnectionQueryServices created across connections + //so that metadata changes doesn't get propagated across connections + DriverManager.registerDriver(PhoenixDriver.INSTANCE); + } + + @Test + public void testUpdateCacheFrequencyWithAddAndDropTable() throws Exception { + // Create connections 1 and 2 + Properties longRunningProps = new Properties(); // Must update config before starting server + longRunningProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, + QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + longRunningProps.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.TRUE.toString()); + Connection conn1 = DriverManager.getConnection(getUrl(), longRunningProps); + String url2 = getUrl() + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + "LongRunningQueries"; + Connection conn2 = DriverManager.getConnection(url2, longRunningProps); + conn1.setAutoCommit(true); + conn2.setAutoCommit(true); + String tableName = generateUniqueName(); + String tableCreateQuery = + "create table "+tableName+" (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + + " UPDATE_CACHE_FREQUENCY=1000000000"; + String dropTableQuery = "DROP table "+tableName; + try { + conn1.createStatement().execute(tableCreateQuery); + conn1.createStatement() + .execute("upsert into "+tableName+" values ('row1', 'value1', 'key1')"); + conn1.createStatement() + .execute("upsert into "+tableName+" values ('row2', 'value2', 'key2')"); + conn1.commit(); + ResultSet rs =conn1.createStatement() + .executeQuery("select * from "+tableName); + assertTrue(rs.next()); + assertTrue(rs.next()); + rs = conn2.createStatement().executeQuery("select * from "+tableName); + assertTrue(rs.next()); + assertTrue(rs.next()); + //Drop table from conn1 + conn1.createStatement().execute(dropTableQuery); + try { + rs = conn1.createStatement().executeQuery("select * from "+tableName); + fail("Should throw TableNotFoundException after dropping table"); + } catch (TableNotFoundException e) { + //Expected + } + try { + rs = conn2.createStatement().executeQuery("select * from "+tableName); + fail("Should throw TableNotFoundException after dropping table"); + } catch (TableNotFoundException e) { + //Expected + } + } finally { + conn1.close(); + conn2.close(); + } + } + + @Test + public void testUpdateCacheFrequencyWithAddColumn() throws Exception { + // Create connections 1 and 2 + Properties longRunningProps = new Properties(); // Must update config before starting server + Connection conn1 = DriverManager.getConnection(getUrl(), longRunningProps); + Connection conn2 = DriverManager.getConnection(getUrl(), longRunningProps); + conn1.setAutoCommit(true); + conn2.setAutoCommit(true); + String tableName = generateUniqueName(); + String createTableQuery = + "create table "+tableName+" (k UNSIGNED_DOUBLE not null primary key, " + + "v1 UNSIGNED_DOUBLE, v2 UNSIGNED_DOUBLE, v3 UNSIGNED_DOUBLE, " + + "v4 UNSIGNED_DOUBLE) UPDATE_CACHE_FREQUENCY=1000000000"; + try { + conn1.createStatement().execute(createTableQuery); + conn1.createStatement() + .execute("upsert into "+tableName+" values (1, 2, 3, 4, 5)"); + conn1.createStatement() + .execute("upsert into "+tableName+" values (6, 7, 8, 9, 10)"); + conn1.commit(); + ResultSet rs = conn1.createStatement() + .executeQuery("select k,v1,v2,v3 from "+tableName); + assertTrue(rs.next()); + assertTrue(rs.next()); + rs = conn2.createStatement() + .executeQuery("select k,v1,v2,v3 from "+tableName); + assertTrue(rs.next()); + assertTrue(rs.next()); + PreparedStatement alterStatement = conn1.prepareStatement( + "ALTER TABLE "+tableName+" ADD v9 UNSIGNED_DOUBLE"); + alterStatement.execute(); + rs = conn1.createStatement() + .executeQuery("select k,v1,v2,v3,v9 from "+tableName); + assertTrue(rs.next()); + assertTrue(rs.next()); + rs = conn2.createStatement() + .executeQuery("select k,v1,v2,v3,V9 from "+tableName); + assertTrue(rs.next()); + assertTrue(rs.next()); + } finally { + conn1.close(); + conn2.close(); + } + } + + @Test + public void testUpdateCacheFrequencyWithAddAndDropIndex() throws Exception { + // Create connections 1 and 2 + Properties longRunningProps = new Properties(); + longRunningProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, + QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + Connection conn1 = DriverManager.getConnection(getUrl(), longRunningProps); + String url2 = getUrl() + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + "LongRunningQueries"; + Connection conn2 = DriverManager.getConnection(url2, longRunningProps); + conn1.setAutoCommit(true); + conn2.setAutoCommit(true); + String tableName = generateUniqueName(); + String indexName = "I_"+tableName; + String tableCreateQuery = + "create table "+tableName+" (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + + " UPDATE_CACHE_FREQUENCY=1000000000"; + String value1SelQuery = "SELECT v2 FROM "+tableName+" WHERE v1 = 'value1'"; + String indexCreateQuery = "CREATE INDEX "+indexName+" ON "+tableName+" (v1) INCLUDE (v2)"; + String indexDropQuery = "DROP INDEX "+indexName+" ON "+tableName; + try { + conn1.createStatement().execute(tableCreateQuery); + conn1.createStatement() + .execute("upsert into "+tableName+" values ('row1', 'value1', 'key1')"); + conn1.createStatement() + .execute("upsert into "+tableName+" values ('row2', 'value2', 'key2')"); + conn1.commit(); + ResultSet rs =conn1.createStatement() + .executeQuery("select k,v1,v2 from "+tableName); + assertTrue(rs.next()); + assertTrue(rs.next()); + rs = conn2.createStatement().executeQuery("select k,v1,v2 from "+tableName); + assertTrue(rs.next()); + assertTrue(rs.next()); + PreparedStatement createIndexStatement =conn1.prepareStatement(indexCreateQuery); + createIndexStatement.execute(); + rs = conn1.createStatement().executeQuery(value1SelQuery); + assertTrue(rs.next()); + rs = conn2.createStatement().executeQuery(value1SelQuery); + assertTrue(rs.next()); + PreparedStatement dropIndexStatement = conn1.prepareStatement(indexDropQuery); + dropIndexStatement.execute(); + rs = conn2.createStatement().executeQuery(value1SelQuery); + assertTrue(rs.next()); + rs = conn1.createStatement().executeQuery(value1SelQuery); + assertTrue(rs.next()); + } finally { + conn1.close(); + conn2.close(); + } + } + + @Test + public void testUpdateCacheFrequencyWithAddAndDropView() throws Exception { + // Create connections 1 and 2 + Properties longRunningProps = new Properties(); + longRunningProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, + QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + Connection conn1 = DriverManager.getConnection(getUrl(), longRunningProps); + String url2 = getUrl() + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + "LongRunningQueries"; + Connection conn2 = DriverManager.getConnection(url2, longRunningProps); + conn1.setAutoCommit(true); + conn2.setAutoCommit(true); + String tableName = generateUniqueName(); + String viewName = "V_"+tableName; + String createQry = "create table "+tableName+" (k VARCHAR PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)" + + " UPDATE_CACHE_FREQUENCY=1000000000"; + String valueSelQuery = "SELECT * FROM "+tableName+" WHERE v1 = 'value1'"; + String viewCreateQuery = + "CREATE VIEW "+viewName+" (v43 VARCHAR) AS SELECT * FROM "+tableName+" WHERE v1 = 'value1'"; + try { + conn1.createStatement().execute(createQry); + conn1.createStatement() + .execute("upsert into "+tableName+" values ('row1', 'value1', 'key1')"); + conn1.createStatement() + .execute("upsert into "+tableName+" values ('row2', 'value2', 'key2')"); + conn1.commit(); + ResultSet rs = conn1.createStatement().executeQuery("select k,v1,v2 from "+tableName); + assertTrue(rs.next()); + assertTrue(rs.next()); + rs = conn2.createStatement().executeQuery("select k,v1,v2 from "+tableName); + assertTrue(rs.next()); + assertTrue(rs.next()); + conn1.createStatement().execute(viewCreateQuery); + rs = conn2.createStatement().executeQuery(valueSelQuery); + assertTrue(rs.next()); + rs = conn1.createStatement().executeQuery(valueSelQuery); + assertTrue(rs.next()); + conn1.createStatement().execute("DROP VIEW "+viewName); + rs = conn2.createStatement().executeQuery(valueSelQuery); + assertTrue(rs.next()); + rs = conn1.createStatement().executeQuery(valueSelQuery); + assertTrue(rs.next()); + } finally { + conn1.close(); + conn2.close(); + } + } + + @Test + public void testUpsertSelectOnSameTableWithFutureData() throws Exception { + String tableName = generateUniqueName(); + Properties longRunningProps = new Properties(); + longRunningProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, + QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + longRunningProps.put(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3000)); + Connection conn = DriverManager.getConnection(getUrl(), longRunningProps); + conn.setAutoCommit(false); + longRunningProps = new Properties(); + longRunningProps.put(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3000)); + conn.createStatement().execute("CREATE TABLE " + tableName + "(" + + "a VARCHAR PRIMARY KEY, b VARCHAR) UPDATE_CACHE_FREQUENCY=1000000000"); + String payload; + StringBuilder buf = new StringBuilder(); + for (int i = 0; i < 1; i++) { + buf.append('a'); + } + payload = buf.toString(); + int MIN_CHAR = 'a'; + int MAX_CHAR = 'c'; + PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?)"); + int rowCount = 0; + for (int c1 = MIN_CHAR; c1 <= MAX_CHAR; c1++) { + for (int c2 = MIN_CHAR; c2 <= MAX_CHAR; c2++) { + String pk = Character.toString((char)c1) + Character.toString((char)c2); + stmt.setString(1, pk); + stmt.setString(2, payload); + stmt.execute(); + rowCount++; + } + } + conn.commit(); + int count = conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " SELECT a, b FROM " + tableName); + assertEquals(rowCount, count); + + //Upsert some data with future timestamp + longRunningProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(System.currentTimeMillis()+(24*60*60*1000))); + Connection conn2 = DriverManager.getConnection(getUrl(), longRunningProps); + conn2.setAutoCommit(false); + stmt = conn2.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?)"); + MAX_CHAR = 'f'; + int rowCount2=0; + for (int c1 = MIN_CHAR; c1 <= MAX_CHAR; c1++) { + for (int c2 = MIN_CHAR; c2 <= MAX_CHAR; c2++) { + String pk = "2--"+Character.toString((char)c1) + Character.toString((char)c2); + stmt.setString(1, pk); + stmt.setString(2, payload); + stmt.execute(); + rowCount2++; + } + } + conn2.commit(); + + //Open new connection with future timestamp to see all data + longRunningProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(System.currentTimeMillis()+(25*60*60*1000))); + conn2 = DriverManager.getConnection(getUrl(), longRunningProps); + conn2.setAutoCommit(false); + count = conn2.createStatement().executeUpdate("UPSERT INTO " + tableName + " SELECT * FROM " + tableName); + assertEquals(rowCount+rowCount2, count); + conn.close(); + longRunningProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, Long.toString(System.currentTimeMillis())); + conn = DriverManager.getConnection(getUrl(), longRunningProps); + //This connection should see data only upto current time + count = conn.createStatement().executeUpdate("UPSERT INTO " + tableName + " SELECT * FROM " + tableName); + assertEquals(rowCount, count); + conn2.close(); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/c169802e/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 5654d59..71dc76a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -62,6 +62,7 @@ import org.apache.phoenix.parse.NamedTableNode; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.ParseNodeFactory; import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.parse.TableName; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; @@ -394,7 +395,7 @@ public class DeleteCompiler { select = StatementNormalizer.normalize(select, resolverToBe); SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolverToBe, connection); if (transformedSelect != select) { - resolverToBe = FromCompiler.getResolverForQuery(transformedSelect, connection); + resolverToBe = FromCompiler.getResolverForQuery(transformedSelect, connection, false, delete.getTable().getName()); select = StatementNormalizer.normalize(transformedSelect, resolverToBe); } parallelIteratorFactory = hasLimit ? null : new DeletingParallelIteratorFactory(connection); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c169802e/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java index 7bd1aa7..c582df9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java @@ -202,6 +202,11 @@ public class FromCompiler { } } + public static ColumnResolver getResolverForQuery(SelectStatement statement, PhoenixConnection connection) + throws SQLException{ + return getResolverForQuery(statement, connection, false, null); + } + /** * Iterate through the nodes in the FROM clause to build a column resolver used to lookup a column given the name * and alias. @@ -215,15 +220,15 @@ public class FromCompiler { * @throws TableNotFoundException * if table name not found in schema */ - public static ColumnResolver getResolverForQuery(SelectStatement statement, PhoenixConnection connection) + public static ColumnResolver getResolverForQuery(SelectStatement statement, PhoenixConnection connection, boolean alwaysHitServer, TableName mutatingTableName) throws SQLException { TableNode fromNode = statement.getFrom(); if (fromNode == null) return EMPTY_TABLE_RESOLVER; if (fromNode instanceof NamedTableNode) - return new SingleTableColumnResolver(connection, (NamedTableNode) fromNode, true, 1, statement.getUdfParseNodes()); + return new SingleTableColumnResolver(connection, (NamedTableNode) fromNode, true, 1, statement.getUdfParseNodes(), alwaysHitServer, mutatingTableName); - MultiTableColumnResolver visitor = new MultiTableColumnResolver(connection, 1, statement.getUdfParseNodes()); + MultiTableColumnResolver visitor = new MultiTableColumnResolver(connection, 1, statement.getUdfParseNodes(), mutatingTableName); fromNode.accept(visitor); return visitor; } @@ -283,7 +288,7 @@ public class FromCompiler { public static ColumnResolver getResolver(PhoenixConnection connection, TableRef tableRef, Map<String, UDFParseNode> udfParseNodes) throws SQLException { - SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, tableRef, udfParseNodes); + SingleTableColumnResolver visitor = new SingleTableColumnResolver(connection, tableRef, udfParseNodes, null); return visitor; } @@ -306,7 +311,7 @@ public class FromCompiler { public SchemaResolver(PhoenixConnection conn, String schemaName, boolean updateCacheImmediately) throws SQLException { - super(conn, 0); + super(conn, 0, null); schemaName = connection.getSchema() != null && schemaName == null ? connection.getSchema() : schemaName; schemas = ImmutableList.of(createSchemaRef(schemaName, updateCacheImmediately)); } @@ -344,7 +349,7 @@ public class FromCompiler { private final List<PSchema> schemas; public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode table, long timeStamp, Map<String, UDFParseNode> udfParseNodes, boolean isNamespaceMapped) throws SQLException { - super(connection, 0, false, udfParseNodes); + super(connection, 0, false, udfParseNodes, null); List<PColumnFamily> families = Lists.newArrayListWithExpectedSize(table.getDynamicColumns().size()); for (ColumnDef def : table.getDynamicColumns()) { if (def.getColumnDefName().getFamilyName() != null) { @@ -371,18 +376,18 @@ public class FromCompiler { } public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately, boolean alwaysHitServer) throws SQLException { - this(connection, tableNode, updateCacheImmediately, 0, new HashMap<String,UDFParseNode>(1), alwaysHitServer); + this(connection, tableNode, updateCacheImmediately, 0, new HashMap<String,UDFParseNode>(1), alwaysHitServer, null); } public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately, int tsAddition, Map<String, UDFParseNode> udfParseNodes) throws SQLException { - this(connection, tableNode, updateCacheImmediately, tsAddition, udfParseNodes, false); + this(connection, tableNode, updateCacheImmediately, tsAddition, udfParseNodes, false, null); } public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately, int tsAddition, - Map<String, UDFParseNode> udfParseNodes, boolean alwaysHitServer) throws SQLException { - super(connection, tsAddition, updateCacheImmediately, udfParseNodes); + Map<String, UDFParseNode> udfParseNodes, boolean alwaysHitServer, TableName mutatingTableName) throws SQLException { + super(connection, tsAddition, updateCacheImmediately, udfParseNodes, mutatingTableName); alias = tableNode.getAlias(); TableRef tableRef = createTableRef(tableNode.getName().getSchemaName(), tableNode, updateCacheImmediately, alwaysHitServer); PSchema schema = new PSchema(tableRef.getTable().getSchemaName().toString()); @@ -391,21 +396,21 @@ public class FromCompiler { } public SingleTableColumnResolver(PhoenixConnection connection, TableRef tableRef) { - super(connection, 0); + super(connection, 0, null); alias = tableRef.getTableAlias(); tableRefs = ImmutableList.of(tableRef); schemas = ImmutableList.of(new PSchema(tableRef.getTable().getSchemaName().toString())); } - public SingleTableColumnResolver(PhoenixConnection connection, TableRef tableRef, Map<String, UDFParseNode> udfParseNodes) throws SQLException { - super(connection, 0, false, udfParseNodes); + public SingleTableColumnResolver(PhoenixConnection connection, TableRef tableRef, Map<String, UDFParseNode> udfParseNodes, TableName mutatingTableName) throws SQLException { + super(connection, 0, false, udfParseNodes, mutatingTableName); alias = tableRef.getTableAlias(); tableRefs = ImmutableList.of(tableRef); schemas = ImmutableList.of(new PSchema(tableRef.getTable().getSchemaName().toString())); } public SingleTableColumnResolver(TableRef tableRef) throws SQLException { - super(null, 0); + super(null, 0, null); alias = tableRef.getTableAlias(); tableRefs = ImmutableList.of(tableRef); schemas = ImmutableList.of(new PSchema(tableRef.getTable().getSchemaName().toString())); @@ -494,16 +499,20 @@ public class FromCompiler { private final int tsAddition; protected final Map<String, PFunction> functionMap; protected List<PFunction> functions; + //PHOENIX-3823 : Force update cache when mutating table and select table are same + //(UpsertSelect or Delete with select on same table) + protected TableName mutatingTableName = null; - private BaseColumnResolver(PhoenixConnection connection, int tsAddition) { + private BaseColumnResolver(PhoenixConnection connection, int tsAddition, TableName mutatingTableName) { this.connection = connection; this.client = connection == null ? null : new MetaDataClient(connection); this.tsAddition = tsAddition; functionMap = new HashMap<String, PFunction>(1); this.functions = Collections.<PFunction>emptyList(); + this.mutatingTableName = mutatingTableName; } - private BaseColumnResolver(PhoenixConnection connection, int tsAddition, boolean updateCacheImmediately, Map<String, UDFParseNode> udfParseNodes) throws SQLException { + private BaseColumnResolver(PhoenixConnection connection, int tsAddition, boolean updateCacheImmediately, Map<String, UDFParseNode> udfParseNodes, TableName mutatingTableName) throws SQLException { this.connection = connection; this.client = connection == null ? null : new MetaDataClient(connection); this.tsAddition = tsAddition; @@ -516,6 +525,7 @@ public class FromCompiler { functionMap.put(function.getFunctionName(), function); } } + this.mutatingTableName = mutatingTableName; } protected PSchema createSchemaRef(String schemaName, boolean updateCacheImmediately) throws SQLException { @@ -555,6 +565,11 @@ public class FromCompiler { PName tenantId = connection.getTenantId(); PTable theTable = null; if (updateCacheImmediately) { + if(mutatingTableName!=null && tableNode!=null ){ + if(tableNode.getName().equals(mutatingTableName)){ + alwaysHitServer = true; + } + } MetaDataMutationResult result = client.updateCache(tenantId, schemaName, tableName, alwaysHitServer); timeStamp = TransactionUtil.getResolvedTimestamp(connection, result); theTable = result.getTable(); @@ -724,7 +739,7 @@ public class FromCompiler { private String connectionSchemaName; private MultiTableColumnResolver(PhoenixConnection connection, int tsAddition) { - super(connection, tsAddition); + super(connection, tsAddition, null); tableMap = ArrayListMultimap.<String, TableRef> create(); tables = Lists.newArrayList(); try { @@ -734,8 +749,8 @@ public class FromCompiler { } } - private MultiTableColumnResolver(PhoenixConnection connection, int tsAddition, Map<String, UDFParseNode> udfParseNodes) throws SQLException { - super(connection, tsAddition, false, udfParseNodes); + private MultiTableColumnResolver(PhoenixConnection connection, int tsAddition, Map<String, UDFParseNode> udfParseNodes, TableName mutatingTableName) throws SQLException { + super(connection, tsAddition, false, udfParseNodes, mutatingTableName); tableMap = ArrayListMultimap.<String, TableRef> create(); tables = Lists.newArrayList(); } @@ -891,7 +906,7 @@ public class FromCompiler { } } if (theTableRef != null) { return new ColumnRef(theTableRef, theColumnPosition); } - throw new ColumnNotFoundException(colName); + throw new ColumnNotFoundException(schemaName, tableName, null, colName); } else { try { TableRef tableRef = resolveTable(schemaName, tableName); @@ -924,7 +939,7 @@ public class FromCompiler { private final List<TableRef> theTableRefs; private final Map<ColumnRef, Integer> columnRefMap; private ProjectedTableColumnResolver(PTable projectedTable, PhoenixConnection conn, Map<String, UDFParseNode> udfParseNodes) throws SQLException { - super(conn, 0, udfParseNodes); + super(conn, 0, udfParseNodes, null); Preconditions.checkArgument(projectedTable.getType() == PTableType.PROJECTED); this.isLocalIndex = projectedTable.getIndexType() == IndexType.LOCAL; this.columnRefMap = new HashMap<ColumnRef, Integer>(); @@ -983,7 +998,7 @@ public class FromCompiler { } Integer position = columnRefMap.get(colRef); if (position == null) - throw new ColumnNotFoundException(colName); + throw new ColumnNotFoundException(schemaName, tableName, null, colName); return new ColumnRef(theTableRefs.get(0), position); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c169802e/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java index 200b06c..4c4fb16 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ProjectionCompiler.java @@ -196,7 +196,9 @@ public class ProjectionCompiler { if (index.getIndexType() != IndexType.LOCAL) { if (index.getColumns().size()-minIndexPKOffset != dataTable.getColumns().size()-minTablePKOffset) { // We'll end up not using this by the optimizer, so just throw - throw new ColumnNotFoundException(WildcardParseNode.INSTANCE.toString()); + String schemaNameStr = dataTable.getSchemaName()==null?null:dataTable.getSchemaName().getString(); + String tableNameStr = dataTable.getTableName()==null?null:dataTable.getTableName().getString(); + throw new ColumnNotFoundException(schemaNameStr, tableNameStr,null, WildcardParseNode.INSTANCE.toString()); } } for (int i = tableOffset, j = tableOffset; i < dataTable.getColumns().size(); i++) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c169802e/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 67619fb..69dab66 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -73,6 +73,7 @@ import org.apache.phoenix.parse.NamedTableNode; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.parse.SequenceValueParseNode; +import org.apache.phoenix.parse.TableName; import org.apache.phoenix.parse.UpsertStatement; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; @@ -94,6 +95,7 @@ import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ReadOnlyTableException; +import org.apache.phoenix.schema.UpsertColumnsValuesMismatchException; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.TypeMismatchException; @@ -345,258 +347,237 @@ public class UpsertCompiler { // update the cache up front when we create the resolver in that case. boolean retryOnce = !connection.getAutoCommit(); boolean useServerTimestampToBe = false; - while (true) { - try { - resolver = FromCompiler.getResolverForMutation(upsert, connection); - tableRefToBe = resolver.getTables().get(0); - table = tableRefToBe.getTable(); - // Cannot update: - // - read-only VIEW - // - transactional table with a connection having an SCN - if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) { - throw new ReadOnlyTableException(schemaName,tableName); - } - else if (table.isTransactional() && connection.getSCN() != null) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPECIFY_SCN_FOR_TXN_TABLE).setSchemaName(schemaName) - .setTableName(tableName).build().buildException(); - } - boolean isSalted = table.getBucketNum() != null; - isTenantSpecific = table.isMultiTenant() && connection.getTenantId() != null; - isSharedViewIndex = table.getViewIndexId() != null; - tenantIdStr = isTenantSpecific ? connection.getTenantId().getString() : null; - int posOffset = isSalted ? 1 : 0; - // Setup array of column indexes parallel to values that are going to be set - allColumnsToBe = table.getColumns(); - nColumnsToSet = 0; - if (table.getViewType() == ViewType.UPDATABLE) { - addViewColumnsToBe = Sets.newLinkedHashSetWithExpectedSize(allColumnsToBe.size()); - for (PColumn column : allColumnsToBe) { - if (column.getViewConstant() != null) { - addViewColumnsToBe.add(column); - } - } + + resolver = FromCompiler.getResolverForMutation(upsert, connection); + tableRefToBe = resolver.getTables().get(0); + table = tableRefToBe.getTable(); + // Cannot update: + // - read-only VIEW + // - transactional table with a connection having an SCN + if (table.getType() == PTableType.VIEW && table.getViewType().isReadOnly()) { + throw new ReadOnlyTableException(schemaName,tableName); + } + else if (table.isTransactional() && connection.getSCN() != null) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SPECIFY_SCN_FOR_TXN_TABLE).setSchemaName(schemaName) + .setTableName(tableName).build().buildException(); + } + boolean isSalted = table.getBucketNum() != null; + isTenantSpecific = table.isMultiTenant() && connection.getTenantId() != null; + isSharedViewIndex = table.getViewIndexId() != null; + tenantIdStr = isTenantSpecific ? connection.getTenantId().getString() : null; + int posOffset = isSalted ? 1 : 0; + // Setup array of column indexes parallel to values that are going to be set + allColumnsToBe = table.getColumns(); + + nColumnsToSet = 0; + if (table.getViewType() == ViewType.UPDATABLE) { + addViewColumnsToBe = Sets.newLinkedHashSetWithExpectedSize(allColumnsToBe.size()); + for (PColumn column : allColumnsToBe) { + if (column.getViewConstant() != null) { + addViewColumnsToBe.add(column); } - ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - // Allow full row upsert if no columns or only dynamic ones are specified and values count match - if (columnNodes.isEmpty() || columnNodes.size() == upsert.getTable().getDynamicColumns().size()) { - nColumnsToSet = allColumnsToBe.size() - posOffset; - columnIndexesToBe = new int[nColumnsToSet]; - pkSlotIndexesToBe = new int[columnIndexesToBe.length]; - targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length); - targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null)); - int minPKPos = 0; - if (isSharedViewIndex) { - PColumn indexIdColumn = table.getPKColumns().get(minPKPos); - columnIndexesToBe[minPKPos] = indexIdColumn.getPosition(); - targetColumns.set(minPKPos, indexIdColumn); - minPKPos++; - } - if (isTenantSpecific) { - PColumn tenantColumn = table.getPKColumns().get(minPKPos); - columnIndexesToBe[minPKPos] = tenantColumn.getPosition(); - targetColumns.set(minPKPos, tenantColumn); - minPKPos++; - } - for (int i = posOffset, j = 0; i < allColumnsToBe.size(); i++) { - PColumn column = allColumnsToBe.get(i); - if (SchemaUtil.isPKColumn(column)) { - pkSlotIndexesToBe[i-posOffset] = j + posOffset; - if (j++ < minPKPos) { // Skip, as it's already been set above - continue; - } - minPKPos = 0; - } - columnIndexesToBe[i-posOffset+minPKPos] = i; - targetColumns.set(i-posOffset+minPKPos, column); - } - if (!addViewColumnsToBe.isEmpty()) { - // All view columns overlap in this case - overlapViewColumnsToBe = addViewColumnsToBe; - addViewColumnsToBe = Collections.emptySet(); - } - } else { - // Size for worse case - int numColsInUpsert = columnNodes.size(); - nColumnsToSet = numColsInUpsert + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) + + (isSharedViewIndex ? 1 : 0); - columnIndexesToBe = new int[nColumnsToSet]; - pkSlotIndexesToBe = new int[columnIndexesToBe.length]; - targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length); - targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null)); - Arrays.fill(columnIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced - Arrays.fill(pkSlotIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced - BitSet pkColumnsSet = new BitSet(table.getPKColumns().size()); - int i = 0; - if (isSharedViewIndex) { - PColumn indexIdColumn = table.getPKColumns().get(i + posOffset); - columnIndexesToBe[i] = indexIdColumn.getPosition(); - pkColumnsSet.set(pkSlotIndexesToBe[i] = i + posOffset); - targetColumns.set(i, indexIdColumn); - i++; - } - // Add tenant column directly, as we don't want to resolve it as this will fail - if (isTenantSpecific) { - PColumn tenantColumn = table.getPKColumns().get(i + posOffset); - columnIndexesToBe[i] = tenantColumn.getPosition(); - pkColumnsSet.set(pkSlotIndexesToBe[i] = i + posOffset); - targetColumns.set(i, tenantColumn); - i++; - } - for (ColumnName colName : columnNodes) { - ColumnRef ref = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName()); - PColumn column = ref.getColumn(); - if (IndexUtil.getViewConstantValue(column, ptr)) { - if (overlapViewColumnsToBe.isEmpty()) { - overlapViewColumnsToBe = Sets.newHashSetWithExpectedSize(addViewColumnsToBe.size()); - } - nColumnsToSet--; - overlapViewColumnsToBe.add(column); - addViewColumnsToBe.remove(column); - } - columnIndexesToBe[i] = ref.getColumnPosition(); - targetColumns.set(i, column); - if (SchemaUtil.isPKColumn(column)) { - pkColumnsSet.set(pkSlotIndexesToBe[i] = ref.getPKSlotPosition()); - } - i++; - } - for (PColumn column : addViewColumnsToBe) { - columnIndexesToBe[i] = column.getPosition(); - targetColumns.set(i, column); - if (SchemaUtil.isPKColumn(column)) { - pkColumnsSet.set(pkSlotIndexesToBe[i] = SchemaUtil.getPKPosition(table, column)); - } - i++; - } - // If a table has rowtimestamp col, then we always set it. - useServerTimestampToBe = table.getRowTimestampColPos() != -1 && !isRowTimestampSet(pkSlotIndexesToBe, table); - if (useServerTimestampToBe) { - PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos()); - // Need to resize columnIndexesToBe and pkSlotIndexesToBe to include this extra column. - columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, columnIndexesToBe.length + 1); - pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, pkSlotIndexesToBe.length + 1); - columnIndexesToBe[i] = rowTimestampCol.getPosition(); - pkColumnsSet.set(pkSlotIndexesToBe[i] = table.getRowTimestampColPos()); - targetColumns.add(rowTimestampCol); - if (valueNodes != null && !valueNodes.isEmpty()) { - valueNodes.add(getNodeForRowTimestampColumn(rowTimestampCol)); - } - nColumnsToSet++; - } - for (i = posOffset; i < table.getPKColumns().size(); i++) { - PColumn pkCol = table.getPKColumns().get(i); - if (!pkColumnsSet.get(i)) { - if (!pkCol.isNullable() && pkCol.getExpressionStr() == null) { - throw new ConstraintViolationException(table.getName().getString() + "." + pkCol.getName().getString() + " may not be null"); - } - } + } + } + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + // Allow full row upsert if no columns or only dynamic ones are specified and values count match + if (columnNodes.isEmpty() || columnNodes.size() == upsert.getTable().getDynamicColumns().size()) { + nColumnsToSet = allColumnsToBe.size() - posOffset; + columnIndexesToBe = new int[nColumnsToSet]; + pkSlotIndexesToBe = new int[columnIndexesToBe.length]; + targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length); + targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null)); + int minPKPos = 0; + if (isSharedViewIndex) { + PColumn indexIdColumn = table.getPKColumns().get(minPKPos); + columnIndexesToBe[minPKPos] = indexIdColumn.getPosition(); + targetColumns.set(minPKPos, indexIdColumn); + minPKPos++; + } + if (isTenantSpecific) { + PColumn tenantColumn = table.getPKColumns().get(minPKPos); + columnIndexesToBe[minPKPos] = tenantColumn.getPosition(); + targetColumns.set(minPKPos, tenantColumn); + minPKPos++; + } + for (int i = posOffset, j = 0; i < allColumnsToBe.size(); i++) { + PColumn column = allColumnsToBe.get(i); + if (SchemaUtil.isPKColumn(column)) { + pkSlotIndexesToBe[i-posOffset] = j + posOffset; + if (j++ < minPKPos) { // Skip, as it's already been set above + continue; } + minPKPos = 0; } - boolean isAutoCommit = connection.getAutoCommit(); - if (valueNodes == null) { - SelectStatement select = upsert.getSelect(); - assert(select != null); - select = SubselectRewriter.flatten(select, connection); - ColumnResolver selectResolver = FromCompiler.getResolverForQuery(select, connection); - select = StatementNormalizer.normalize(select, selectResolver); - select = prependTenantAndViewConstants(table, select, tenantIdStr, addViewColumnsToBe, useServerTimestampToBe); - SelectStatement transformedSelect = SubqueryRewriter.transform(select, selectResolver, connection); - if (transformedSelect != select) { - selectResolver = FromCompiler.getResolverForQuery(transformedSelect, connection); - select = StatementNormalizer.normalize(transformedSelect, selectResolver); - } - sameTable = !select.isJoin() - && tableRefToBe.equals(selectResolver.getTables().get(0)); - tableRefToBe = adjustTimestampToMinOfSameTable(tableRefToBe, selectResolver.getTables()); - /* We can run the upsert in a coprocessor if: - * 1) from has only 1 table or server UPSERT SELECT is enabled - * 2) the select query isn't doing aggregation (which requires a client-side final merge) - * 3) autoCommit is on - * 4) the table is not immutable with indexes, as the client is the one that figures out the additional - * puts for index tables. - * 5) no limit clause, as the limit clause requires client-side post processing - * 6) no sequences, as sequences imply that the order of upsert must match the order of - * selection. TODO: change this and only force client side if there's a ORDER BY on the sequence value - * Otherwise, run the query to pull the data from the server - * and populate the MutationState (upto a limit). - */ - if (! (select.isAggregate() || select.isDistinct() || select.getLimit() != null || select.hasSequence()) ) { - // We can pipeline the upsert select instead of spooling everything to disk first, - // if we don't have any post processing that's required. - parallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe, useServerTimestampToBe); - // If we're in the else, then it's not an aggregate, distinct, limited, or sequence using query, - // so we might be able to run it entirely on the server side. - // region space managed by region servers. So we bail out on executing on server side. - runOnServer = (sameTable || serverUpsertSelectEnabled) && isAutoCommit && !table.isTransactional() - && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) - && !select.isJoin() && table.getRowTimestampColPos() == -1; - } - // If we may be able to run on the server, add a hint that favors using the data table - // if all else is equal. - // TODO: it'd be nice if we could figure out in advance if the PK is potentially changing, - // as this would disallow running on the server. We currently use the row projector we - // get back to figure this out. - HintNode hint = upsert.getHint(); - if (!upsert.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) { - hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE); - } - select = SelectStatement.create(select, hint); - // Pass scan through if same table in upsert and select so that projection is computed correctly - // Use optimizer to choose the best plan - try { - QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), false); - queryPlanToBe = compiler.compile(); - // This is post-fix: if the tableRef is a projected table, this means there are post-processing - // steps and parallelIteratorFactory did not take effect. - if (queryPlanToBe.getTableRef().getTable().getType() == PTableType.PROJECTED || queryPlanToBe.getTableRef().getTable().getType() == PTableType.SUBQUERY) { - parallelIteratorFactoryToBe = null; - } - } catch (MetaDataEntityNotFoundException e) { - retryOnce = false; // don't retry if select clause has meta data entities that aren't found, as we already updated the cache - throw e; + columnIndexesToBe[i-posOffset+minPKPos] = i; + targetColumns.set(i-posOffset+minPKPos, column); + } + if (!addViewColumnsToBe.isEmpty()) { + // All view columns overlap in this case + overlapViewColumnsToBe = addViewColumnsToBe; + addViewColumnsToBe = Collections.emptySet(); + } + } else { + // Size for worse case + int numColsInUpsert = columnNodes.size(); + nColumnsToSet = numColsInUpsert + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) + + (isSharedViewIndex ? 1 : 0); + columnIndexesToBe = new int[nColumnsToSet]; + pkSlotIndexesToBe = new int[columnIndexesToBe.length]; + targetColumns = Lists.newArrayListWithExpectedSize(columnIndexesToBe.length); + targetColumns.addAll(Collections.<PColumn>nCopies(columnIndexesToBe.length, null)); + Arrays.fill(columnIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced + Arrays.fill(pkSlotIndexesToBe, -1); // TODO: necessary? So we'll get an AIOB exception if it's not replaced + BitSet pkColumnsSet = new BitSet(table.getPKColumns().size()); + int i = 0; + if (isSharedViewIndex) { + PColumn indexIdColumn = table.getPKColumns().get(i + posOffset); + columnIndexesToBe[i] = indexIdColumn.getPosition(); + pkColumnsSet.set(pkSlotIndexesToBe[i] = i + posOffset); + targetColumns.set(i, indexIdColumn); + i++; + } + // Add tenant column directly, as we don't want to resolve it as this will fail + if (isTenantSpecific) { + PColumn tenantColumn = table.getPKColumns().get(i + posOffset); + columnIndexesToBe[i] = tenantColumn.getPosition(); + pkColumnsSet.set(pkSlotIndexesToBe[i] = i + posOffset); + targetColumns.set(i, tenantColumn); + i++; + } + for (ColumnName colName : columnNodes) { + ColumnRef ref = resolver.resolveColumn(null, colName.getFamilyName(), colName.getColumnName()); + PColumn column = ref.getColumn(); + if (IndexUtil.getViewConstantValue(column, ptr)) { + if (overlapViewColumnsToBe.isEmpty()) { + overlapViewColumnsToBe = Sets.newHashSetWithExpectedSize(addViewColumnsToBe.size()); } - nValuesToSet = queryPlanToBe.getProjector().getColumnCount(); - // Cannot auto commit if doing aggregation or topN or salted - // Salted causes problems because the row may end up living on a different region - } else { - nValuesToSet = valueNodes.size() + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) + (isSharedViewIndex ? 1 : 0); + nColumnsToSet--; + overlapViewColumnsToBe.add(column); + addViewColumnsToBe.remove(column); } - // Resize down to allow a subset of columns to be specifiable - if (columnNodes.isEmpty() && columnIndexesToBe.length >= nValuesToSet) { - nColumnsToSet = nValuesToSet; - columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, nValuesToSet); - pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, nValuesToSet); + columnIndexesToBe[i] = ref.getColumnPosition(); + targetColumns.set(i, column); + if (SchemaUtil.isPKColumn(column)) { + pkColumnsSet.set(pkSlotIndexesToBe[i] = ref.getPKSlotPosition()); } - - if (nValuesToSet != nColumnsToSet) { - // We might have added columns, so refresh cache and try again if stale. - // Note that this check is not really sufficient, as a column could have - // been removed and the added back and we wouldn't detect that here. - if (retryOnce) { - retryOnce = false; - if (new MetaDataClient(connection).updateCache(schemaName, tableName).wasUpdated()) { - continue; - } - } - throw new SQLExceptionInfo.Builder(SQLExceptionCode.UPSERT_COLUMN_NUMBERS_MISMATCH) - .setMessage("Numbers of columns: " + nColumnsToSet + ". Number of values: " + nValuesToSet) - .build().buildException(); + i++; + } + for (PColumn column : addViewColumnsToBe) { + columnIndexesToBe[i] = column.getPosition(); + targetColumns.set(i, column); + if (SchemaUtil.isPKColumn(column)) { + pkColumnsSet.set(pkSlotIndexesToBe[i] = SchemaUtil.getPKPosition(table, column)); } - } catch (MetaDataEntityNotFoundException e) { - // Catch column/column family not found exception, as our meta data may - // be out of sync. Update the cache once and retry if we were out of sync. - // Otherwise throw, as we'll just get the same error next time. - if (retryOnce) { - retryOnce = false; - if (new MetaDataClient(connection).updateCache(schemaName, tableName).wasUpdated()) { - continue; + i++; + } + // If a table has rowtimestamp col, then we always set it. + useServerTimestampToBe = table.getRowTimestampColPos() != -1 && !isRowTimestampSet(pkSlotIndexesToBe, table); + if (useServerTimestampToBe) { + PColumn rowTimestampCol = table.getPKColumns().get(table.getRowTimestampColPos()); + // Need to resize columnIndexesToBe and pkSlotIndexesToBe to include this extra column. + columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, columnIndexesToBe.length + 1); + pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, pkSlotIndexesToBe.length + 1); + columnIndexesToBe[i] = rowTimestampCol.getPosition(); + pkColumnsSet.set(pkSlotIndexesToBe[i] = table.getRowTimestampColPos()); + targetColumns.add(rowTimestampCol); + if (valueNodes != null && !valueNodes.isEmpty()) { + valueNodes.add(getNodeForRowTimestampColumn(rowTimestampCol)); + } + nColumnsToSet++; + } + for (i = posOffset; i < table.getPKColumns().size(); i++) { + PColumn pkCol = table.getPKColumns().get(i); + if (!pkColumnsSet.get(i)) { + if (!pkCol.isNullable() && pkCol.getExpressionStr() == null) { + throw new ConstraintViolationException(table.getName().getString() + "." + pkCol.getName().getString() + " may not be null"); } } + } + } + boolean isAutoCommit = connection.getAutoCommit(); + if (valueNodes == null) { + SelectStatement select = upsert.getSelect(); + assert(select != null); + select = SubselectRewriter.flatten(select, connection); + ColumnResolver selectResolver = FromCompiler.getResolverForQuery(select, connection, false, upsert.getTable().getName()); + select = StatementNormalizer.normalize(select, selectResolver); + select = prependTenantAndViewConstants(table, select, tenantIdStr, addViewColumnsToBe, useServerTimestampToBe); + SelectStatement transformedSelect = SubqueryRewriter.transform(select, selectResolver, connection); + if (transformedSelect != select) { + selectResolver = FromCompiler.getResolverForQuery(transformedSelect, connection, false, upsert.getTable().getName()); + select = StatementNormalizer.normalize(transformedSelect, selectResolver); + } + sameTable = !select.isJoin() + && tableRefToBe.equals(selectResolver.getTables().get(0)); + tableRefToBe = adjustTimestampToMinOfSameTable(tableRefToBe, selectResolver.getTables()); + /* We can run the upsert in a coprocessor if: + * 1) from has only 1 table or server UPSERT SELECT is enabled + * 2) the select query isn't doing aggregation (which requires a client-side final merge) + * 3) autoCommit is on + * 4) the table is not immutable with indexes, as the client is the one that figures out the additional + * puts for index tables. + * 5) no limit clause, as the limit clause requires client-side post processing + * 6) no sequences, as sequences imply that the order of upsert must match the order of + * selection. TODO: change this and only force client side if there's a ORDER BY on the sequence value + * Otherwise, run the query to pull the data from the server + * and populate the MutationState (upto a limit). + */ + if (! (select.isAggregate() || select.isDistinct() || select.getLimit() != null || select.hasSequence()) ) { + // We can pipeline the upsert select instead of spooling everything to disk first, + // if we don't have any post processing that's required. + parallelIteratorFactoryToBe = new UpsertingParallelIteratorFactory(connection, tableRefToBe, useServerTimestampToBe); + // If we're in the else, then it's not an aggregate, distinct, limited, or sequence using query, + // so we might be able to run it entirely on the server side. + // region space managed by region servers. So we bail out on executing on server side. + runOnServer = (sameTable || serverUpsertSelectEnabled) && isAutoCommit && !table.isTransactional() + && !(table.isImmutableRows() && !table.getIndexes().isEmpty()) + && !select.isJoin() && table.getRowTimestampColPos() == -1; + } + // If we may be able to run on the server, add a hint that favors using the data table + // if all else is equal. + // TODO: it'd be nice if we could figure out in advance if the PK is potentially changing, + // as this would disallow running on the server. We currently use the row projector we + // get back to figure this out. + HintNode hint = upsert.getHint(); + if (!upsert.getHint().hasHint(Hint.USE_INDEX_OVER_DATA_TABLE)) { + hint = HintNode.create(hint, Hint.USE_DATA_OVER_INDEX_TABLE); + } + select = SelectStatement.create(select, hint); + // Pass scan through if same table in upsert and select so that projection is computed correctly + // Use optimizer to choose the best plan + try { + QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), false); + queryPlanToBe = compiler.compile(); + // This is post-fix: if the tableRef is a projected table, this means there are post-processing + // steps and parallelIteratorFactory did not take effect. + if (queryPlanToBe.getTableRef().getTable().getType() == PTableType.PROJECTED || queryPlanToBe.getTableRef().getTable().getType() == PTableType.SUBQUERY) { + parallelIteratorFactoryToBe = null; + } + } catch (MetaDataEntityNotFoundException e) { + retryOnce = false; // don't retry if select clause has meta data entities that aren't found, as we already updated the cache throw e; } - break; + nValuesToSet = queryPlanToBe.getProjector().getColumnCount(); + // Cannot auto commit if doing aggregation or topN or salted + // Salted causes problems because the row may end up living on a different region + } else { + nValuesToSet = valueNodes.size() + addViewColumnsToBe.size() + (isTenantSpecific ? 1 : 0) + (isSharedViewIndex ? 1 : 0); + } + // Resize down to allow a subset of columns to be specifiable + if (columnNodes.isEmpty() && columnIndexesToBe.length >= nValuesToSet) { + nColumnsToSet = nValuesToSet; + columnIndexesToBe = Arrays.copyOf(columnIndexesToBe, nValuesToSet); + pkSlotIndexesToBe = Arrays.copyOf(pkSlotIndexesToBe, nValuesToSet); } + if (nValuesToSet != nColumnsToSet) { + // We might have added columns, so refresh cache and try again if stale. + // We have logic to catch MetaNotFoundException and refresh cache in PhoenixStatement + // Note that this check is not really sufficient, as a column could have + // been removed and the added back and we wouldn't detect that here. + throw new UpsertColumnsValuesMismatchException(schemaName, tableName, + "Numbers of columns: " + nColumnsToSet + ". Number of values: " + nValuesToSet); + } final QueryPlan originalQueryPlan = queryPlanToBe; RowProjector projectorToBe = null; // Optimize only after all checks have been performed http://git-wip-us.apache.org/repos/asf/phoenix/blob/c169802e/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java index fb24557..b34551d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/WhereCompiler.java @@ -198,7 +198,9 @@ public class WhereCompiler { // inefficient. Then we can skip this plan. if (context.getCurrentTable().getTable().getIndexType() == IndexType.LOCAL && (table.getIndexType() == null || table.getIndexType() == IndexType.GLOBAL)) { - throw new ColumnNotFoundException(ref.getColumn().getName().getString()); + String schemaNameStr = table.getSchemaName()==null?null:table.getSchemaName().getString(); + String tableNameStr = table.getTableName()==null?null:table.getTableName().getString(); + throw new ColumnNotFoundException(schemaNameStr, tableNameStr, null, ref.getColumn().getName().getString()); } // Track if we need to compare KeyValue during filter evaluation // using column family. If the column qualifier is enough, we http://git-wip-us.apache.org/repos/asf/phoenix/blob/c169802e/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java index 35ba187..ecbb285 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java @@ -194,7 +194,7 @@ public enum SQLExceptionCode { COLUMN_FAMILY_NOT_FOUND(1001, "42I01", "Undefined column family.", new Factory() { @Override public SQLException newException(SQLExceptionInfo info) { - return new ColumnFamilyNotFoundException(info.getFamilyName()); + return new ColumnFamilyNotFoundException(info.getSchemaName(), info.getTableName(), info.getFamilyName()); } }), PROPERTIES_FOR_FAMILY(1002, "42I02","Properties may not be defined for an unused family name."), http://git-wip-us.apache.org/repos/asf/phoenix/blob/c169802e/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 3688455..8b00113 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -79,6 +79,7 @@ import org.apache.phoenix.compile.SubselectRewriter; import org.apache.phoenix.compile.TraceQueryPlan; import org.apache.phoenix.compile.UpsertCompiler; import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.exception.BatchUpdateExecution; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -148,6 +149,7 @@ import org.apache.phoenix.schema.ExecuteQueryNotApplicableException; import org.apache.phoenix.schema.ExecuteUpdateNotApplicableException; import org.apache.phoenix.schema.FunctionNotFoundException; import org.apache.phoenix.schema.MetaDataClient; +import org.apache.phoenix.schema.MetaDataEntityNotFoundException; import org.apache.phoenix.schema.PColumnImpl; import org.apache.phoenix.schema.PDatum; import org.apache.phoenix.schema.PIndexState; @@ -174,6 +176,7 @@ import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ServerUtil; +import org.mortbay.log.Log; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -275,6 +278,10 @@ public class PhoenixStatement implements Statement, SQLCloseable { } protected PhoenixResultSet executeQuery(final CompilableStatement stmt) throws SQLException { + return executeQuery(stmt,true); + } + private PhoenixResultSet executeQuery(final CompilableStatement stmt, + final boolean doRetryOnMetaNotFoundError) throws SQLException { GLOBAL_SELECT_SQL_COUNTER.increment(); try { return CallRunner.run( @@ -317,7 +324,19 @@ public class PhoenixStatement implements Statement, SQLCloseable { } connection.incrementStatementExecutionCounter(); return rs; - } catch (RuntimeException e) { + } + //Force update cache and retry if meta not found error occurs + catch (MetaDataEntityNotFoundException e) { + if(doRetryOnMetaNotFoundError && e.getTableName()!=null){ + if(logger.isDebugEnabled()) + logger.debug("Reloading table "+ e.getTableName()+" data from server"); + if(new MetaDataClient(connection).updateCache(connection.getTenantId(), + e.getSchemaName(), e.getTableName(), true).wasUpdated()){ + return executeQuery(stmt, false); + } + } + throw e; + }catch (RuntimeException e) { // FIXME: Expression.evaluate does not throw SQLException // so this will unwrap throws from that. if (e.getCause() instanceof SQLException) { @@ -332,7 +351,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { } } }, PhoenixContextExecutor.inContext()); - } catch (Exception e) { + }catch (Exception e) { Throwables.propagateIfInstanceOf(e, SQLException.class); Throwables.propagate(e); throw new IllegalStateException(); // Can't happen as Throwables.propagate() always throws @@ -340,6 +359,10 @@ public class PhoenixStatement implements Statement, SQLCloseable { } protected int executeMutation(final CompilableStatement stmt) throws SQLException { + return executeMutation(stmt, true); + } + + private int executeMutation(final CompilableStatement stmt, final boolean doRetryOnMetaNotFoundError) throws SQLException { if (connection.isReadOnly()) { throw new SQLExceptionInfo.Builder( SQLExceptionCode.READ_ONLY_CONNECTION). @@ -380,7 +403,19 @@ public class PhoenixStatement implements Statement, SQLCloseable { setLastUpdateOperation(stmt.getOperation()); connection.incrementStatementExecutionCounter(); return lastUpdateCount; - } catch (RuntimeException e) { + } + //Force update cache and retry if meta not found error occurs + catch (MetaDataEntityNotFoundException e) { + if(doRetryOnMetaNotFoundError && e.getTableName()!=null){ + if(logger.isDebugEnabled()) + logger.debug("Reloading table "+ e.getTableName()+" data from server"); + if(new MetaDataClient(connection).updateCache(connection.getTenantId(), + e.getSchemaName(), e.getTableName(), true).wasUpdated()){ + return executeMutation(stmt, false); + } + } + throw e; + }catch (RuntimeException e) { // FIXME: Expression.evaluate does not throw SQLException // so this will unwrap throws from that. if (e.getCause() instanceof SQLException) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c169802e/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java index d77b14b..14b7945 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java @@ -249,7 +249,9 @@ public class QueryOptimizer { if (plan.getProjector().getColumnCount() == nColumns) { return plan; } else if (index.getIndexType() == IndexType.GLOBAL) { - throw new ColumnNotFoundException("*"); + String schemaNameStr = index.getSchemaName()==null?null:index.getSchemaName().getString(); + String tableNameStr = index.getTableName()==null?null:index.getTableName().getString(); + throw new ColumnNotFoundException(schemaNameStr, tableNameStr, null, "*"); } } } catch (ColumnNotFoundException e) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c169802e/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 105b646..d69f911 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -2068,7 +2068,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // and merge them with the common family properties. for (String f : stmtFamiliesPropsMap.keySet()) { if (!addingColumns && !existingColumnFamilies.contains(f)) { - throw new ColumnFamilyNotFoundException(f); + String schemaNameStr = table.getSchemaName()==null?null:table.getSchemaName().getString(); + String tableNameStr = table.getTableName()==null?null:table.getTableName().getString(); + throw new ColumnFamilyNotFoundException(schemaNameStr, tableNameStr, f); } if (addingColumns && !colFamiliesForPColumnsToBeAdded.contains(f)) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_SET_PROPERTY_FOR_COLUMN_NOT_ADDED).build().buildException(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c169802e/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnFamilyNotFoundException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnFamilyNotFoundException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnFamilyNotFoundException.java index d2b6be7..d6902e2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnFamilyNotFoundException.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnFamilyNotFoundException.java @@ -32,9 +32,9 @@ public class ColumnFamilyNotFoundException extends MetaDataEntityNotFoundExcepti private static SQLExceptionCode code = SQLExceptionCode.COLUMN_FAMILY_NOT_FOUND; private final String familyName; - public ColumnFamilyNotFoundException(String familyName) { + public ColumnFamilyNotFoundException(String schemaName, String tableName, String familyName) { super(new SQLExceptionInfo.Builder(code).setFamilyName(familyName).build().toString(), - code.getSQLState(), code.getErrorCode(), null); + code.getSQLState(), code.getErrorCode(), schemaName, tableName,null); this.familyName = familyName; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c169802e/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnNotFoundException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnNotFoundException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnNotFoundException.java index 4a0fb85..00b65f2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnNotFoundException.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/ColumnNotFoundException.java @@ -32,8 +32,6 @@ import org.apache.phoenix.exception.SQLExceptionInfo; public class ColumnNotFoundException extends MetaDataEntityNotFoundException { private static final long serialVersionUID = 1L; private static SQLExceptionCode code = SQLExceptionCode.COLUMN_NOT_FOUND; - private final String schemaName; - private final String tableName; private final String columnName; public ColumnNotFoundException(String columnName) { @@ -43,20 +41,10 @@ public class ColumnNotFoundException extends MetaDataEntityNotFoundException { public ColumnNotFoundException(String schemaName, String tableName, String familyName, String columnName) { super(new SQLExceptionInfo.Builder(code).setSchemaName(schemaName).setTableName(tableName) .setFamilyName(familyName).setColumnName(columnName).build().toString(), - code.getSQLState(), code.getErrorCode(), null); - this.schemaName = schemaName; - this.tableName = tableName; + code.getSQLState(), code.getErrorCode(), schemaName, tableName, null); this.columnName = columnName; } - public String getTableName() { - return tableName; - } - - public String getSchemaName() { - return schemaName; - } - public String getColumnName() { return columnName; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c169802e/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionNotFoundException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionNotFoundException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionNotFoundException.java index 73e23be..09763cd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionNotFoundException.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/FunctionNotFoundException.java @@ -37,7 +37,7 @@ public class FunctionNotFoundException extends MetaDataEntityNotFoundException { public FunctionNotFoundException(String functionName, long timestamp) { super(new SQLExceptionInfo.Builder(code).setFunctionName(functionName).build().toString(), - code.getSQLState(), code.getErrorCode(), null); + code.getSQLState(), code.getErrorCode(), null, null, null); this.functionName = functionName; this.timestamp = timestamp; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/c169802e/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 96f7944..f69b5f9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -672,7 +672,7 @@ public class MetaDataClient { return result; } - private MetaDataMutationResult updateCache(PName tenantId, List<String> functionNames, + public MetaDataMutationResult updateCache(PName tenantId, List<String> functionNames, boolean alwaysHitServer) throws SQLException { // TODO: pass byte[] herez long clientTimeStamp = getClientTimeStamp(); List<PFunction> functions = new ArrayList<PFunction>(functionNames.size()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c169802e/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataEntityNotFoundException.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataEntityNotFoundException.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataEntityNotFoundException.java index 078e65c..a52381c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataEntityNotFoundException.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataEntityNotFoundException.java @@ -22,7 +22,21 @@ import java.sql.SQLException; public abstract class MetaDataEntityNotFoundException extends SQLException { private static final long serialVersionUID = 1L; - public MetaDataEntityNotFoundException(String reason, String sqlState, int code, Throwable cause) { - super(reason, sqlState, code, cause); - } + private final String schemaName; + private final String tableName; + + public MetaDataEntityNotFoundException(String reason, String sqlState, int code, + String schemaName, String tableName, Throwable cause) { + super(reason, sqlState, code, cause); + this.schemaName = schemaName; + this.tableName = tableName; + } + + public String getSchemaName() { + return schemaName; + } + + public String getTableName() { + return tableName; + } }
